You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/12 07:52:33 UTC

[incubator-inlong] branch master updated: [INLONG-2478][TubeMQ] Optimize GroupCountService logic implementation (#2479)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b2d2c08  [INLONG-2478][TubeMQ] Optimize GroupCountService logic implementation (#2479)
b2d2c08 is described below

commit b2d2c08e41891ceb70c242e495b863aa987ec5d7
Author: gosonzhang <46...@qq.com>
AuthorDate: Sat Feb 12 15:52:29 2022 +0800

    [INLONG-2478][TubeMQ] Optimize GroupCountService logic implementation (#2479)
---
 .../corebase/daemon/AbstractDaemonService.java     |   6 +-
 .../tubemq/server/broker/BrokerServiceServer.java  |  11 +-
 .../server/broker/msgstore/MessageStore.java       |   4 +-
 .../broker/msgstore/disk/GetMessageResult.java     |  12 +-
 .../server/broker/msgstore/disk/MsgFileStore.java  |   4 +-
 .../server/broker/stats/GroupCountService.java     | 146 ------------
 .../stats/{CountItem.java => TrafficInfo.java}     |  38 ++--
 .../{CountService.java => TrafficService.java}     |  29 ++-
 .../server/broker/stats/TrafficStatsService.java   | 245 +++++++++++++++++++++
 .../tubemq/server/broker/utils/DataStoreUtils.java |  16 +-
 .../server/broker/stats/GroupCountServiceTest.java |  39 ----
 .../broker/stats/ServiceStatsHolderTest.java       |   2 +
 .../broker/stats/TrafficStatsServiceTest.java      |  61 +++++
 13 files changed, 379 insertions(+), 234 deletions(-)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
index e76817b..d99006a 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
@@ -22,11 +22,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class AbstractDaemonService implements Service, Runnable {
-    private static final Logger logger = LoggerFactory.getLogger(AbstractDaemonService.class);
+    private static final Logger logger =
+            LoggerFactory.getLogger(AbstractDaemonService.class);
     private final String name;
     private final long intervalMs;
     private final Thread daemon;
-    private AtomicBoolean shutdown = new AtomicBoolean(false);
+    private final AtomicBoolean shutdown =
+            new AtomicBoolean(false);
 
     public AbstractDaemonService(final String serviceName, final long intervalMs) {
         this.name = serviceName;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 75b363d..74144f8 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -66,9 +66,8 @@ import org.apache.inlong.tubemq.server.broker.msgstore.disk.GetMessageResult;
 import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo;
 import org.apache.inlong.tubemq.server.broker.offset.OffsetService;
-import org.apache.inlong.tubemq.server.broker.stats.CountService;
-import org.apache.inlong.tubemq.server.broker.stats.GroupCountService;
 import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficStatsService;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
 import org.apache.inlong.tubemq.server.common.TStatusConstants;
 import org.apache.inlong.tubemq.server.common.aaaserver.CertificateBrokerHandler;
@@ -107,9 +106,9 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
     // row lock.
     private final RowLock brokerRowLock;
     // statistics of produce.
-    private final CountService putCounterGroup;
+    private final TrafficStatsService putCounterGroup;
     // statistics of consume.
-    private final CountService getCounterGroup;
+    private final TrafficStatsService getCounterGroup;
     // certificate handler.
     private final CertificateBrokerHandler serverAuthHandler;
     // consumer timeout listener.
@@ -128,8 +127,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
         this.serverAuthHandler = tubeBroker.getServerAuthHandler();
         ServiceStatusHolder.setStatisParameters(tubeConfig.getAllowedReadIOExcptCnt(),
                 tubeConfig.getAllowedWriteIOExcptCnt(), tubeConfig.getIoExcptStatsDurationMs());
-        this.putCounterGroup = new GroupCountService("PutCounterGroup", "Producer", 60 * 1000);
-        this.getCounterGroup = new GroupCountService("GetCounterGroup", "Consumer", 60 * 1000);
+        this.putCounterGroup = new TrafficStatsService("PutCounterGroup", "Producer", 60 * 1000);
+        this.getCounterGroup = new TrafficStatsService("GetCounterGroup", "Consumer", 60 * 1000);
         this.heartbeatManager = new HeartbeatManager();
         this.brokerRowLock =
                 new RowLock("Broker-RowLock", this.tubeConfig.getRowLockWaitDurMs());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 3764559..61b94f4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -47,7 +47,7 @@ import org.apache.inlong.tubemq.server.broker.msgstore.mem.GetCacheMsgResult;
 import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStatisInfo;
 import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStore;
 import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.inlong.tubemq.server.broker.stats.CountItem;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
 import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.inlong.tubemq.server.common.utils.AppendResult;
 import org.apache.inlong.tubemq.server.common.utils.IdWorker;
@@ -219,7 +219,7 @@ public class MessageStore implements Closeable {
             if (inMemCache) {
                 // return not found when data is under memory sink operation.
                 if (memMsgRlt.isSuccess) {
-                    HashMap<String, CountItem> countMap =
+                    HashMap<String, TrafficInfo> countMap =
                             new HashMap<>();
                     List<ClientBroker.TransferedMessage> transferedMessageList =
                             new ArrayList<>();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java
index 5cd6fef..0c0d903 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker.TransferedMessage;
-import org.apache.inlong.tubemq.server.broker.stats.CountItem;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
 
 /**
  * Broker's reply to Consumer's GetMessage request.
@@ -38,14 +38,14 @@ public class GetMessageResult {
     public long waitTime = -1;
     public boolean isSlowFreq = false;
     public boolean isFromSsdFile = false;
-    public HashMap<String, CountItem> tmpCounters = new HashMap<>();
+    public HashMap<String, TrafficInfo> tmpCounters = new HashMap<>();
     public List<TransferedMessage> transferedMessageList = new ArrayList<>();
     public long maxOffset = TBaseConstants.META_VALUE_UNDEFINED;
 
     public GetMessageResult(boolean isSuccess, int retCode, final String errInfo,
                             final long reqOffset, final int lastReadOffset,
                             final long lastRdDataOffset, final int totalSize,
-                            HashMap<String, CountItem> tmpCounters,
+                            HashMap<String, TrafficInfo> tmpCounters,
                             List<TransferedMessage> transferedMessageList) {
         this(isSuccess, retCode, errInfo, reqOffset, lastReadOffset,
                 lastRdDataOffset, totalSize, tmpCounters, transferedMessageList, false);
@@ -54,7 +54,7 @@ public class GetMessageResult {
     public GetMessageResult(boolean isSuccess, int retCode, final String errInfo,
                             final long reqOffset, final int lastReadOffset,
                             final long lastRdDataOffset, final int totalSize,
-                            HashMap<String, CountItem> tmpCounters,
+                            HashMap<String, TrafficInfo> tmpCounters,
                             List<TransferedMessage> transferedMessageList,
                             boolean isFromSsdFile) {
         this.isSuccess = isSuccess;
@@ -108,11 +108,11 @@ public class GetMessageResult {
         this.waitTime = waitTime;
     }
 
-    public HashMap<String, CountItem> getTmpCounters() {
+    public HashMap<String, TrafficInfo> getTmpCounters() {
         return tmpCounters;
     }
 
-    public void setTmpCounters(HashMap<String, CountItem> tmpCounters) {
+    public void setTmpCounters(HashMap<String, TrafficInfo> tmpCounters) {
         this.tmpCounters = tmpCounters;
     }
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index f2c5c0d..64ad394 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -35,8 +35,8 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
 import org.apache.inlong.tubemq.corebase.utils.ServiceStatusHolder;
 import org.apache.inlong.tubemq.server.broker.BrokerConfig;
 import org.apache.inlong.tubemq.server.broker.msgstore.MessageStore;
-import org.apache.inlong.tubemq.server.broker.stats.CountItem;
 import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
 import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.inlong.tubemq.server.broker.utils.DiskSamplePrint;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
@@ -263,7 +263,7 @@ public class MsgFileStore implements Closeable {
         final StringBuilder sBuilder = new StringBuilder(512);
         final long curDataMaxOffset = getDataMaxOffset();
         final long curDataMinOffset = getDataMinOffset();
-        HashMap<String, CountItem> countMap = new HashMap<>();
+        HashMap<String, TrafficInfo> countMap = new HashMap<>();
         ByteBuffer dataBuffer =
                 ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
         List<ClientBroker.TransferedMessage> transferedMessageList =
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountService.java
deleted file mode 100644
index a162dd3..0000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountService.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Statistics of broker. It use two CountSet alternatively print statistics to log.
- */
-public class GroupCountService extends AbstractDaemonService implements CountService {
-    private final Logger logger;
-    private final String cntHdr;
-    private final CountSet[] countSets = new CountSet[2];
-    private AtomicInteger index = new AtomicInteger(0);
-
-    public GroupCountService(String logFileName, String countType, long scanIntervalMs) {
-        super(logFileName, scanIntervalMs);
-        this.cntHdr = countType;
-        if (logFileName == null) {
-            this.logger = LoggerFactory.getLogger(GroupCountService.class);
-        } else {
-            this.logger = LoggerFactory.getLogger(logFileName);
-        }
-        countSets[0] = new CountSet();
-        countSets[1] = new CountSet();
-        super.start();
-    }
-
-    @Override
-    protected void loopProcess(long intervalMs) {
-        int tmpIndex = 0;
-        int befIndex = 0;
-        AtomicLong curRunCnt;
-        ConcurrentHashMap<String, CountItem> counters;
-        while (!super.isStopped()) {
-            try {
-                Thread.sleep(intervalMs);
-                befIndex = tmpIndex = index.get();
-                if (index.compareAndSet(befIndex, (++tmpIndex) % 2)) {
-                    curRunCnt = countSets[befIndex].refCnt;
-                    do {
-                        try {
-                            Thread.sleep(10);
-                        } catch (InterruptedException e) {
-                            return;
-                        }
-                    } while (curRunCnt.get() > 0);
-                    counters = countSets[befIndex].counterItem;
-                    if (counters != null) {
-                        for (Map.Entry<String, CountItem> entry : counters.entrySet()) {
-                            logger.info("{}#{}#{}#{}", new Object[]{cntHdr, entry.getKey(),
-                                    entry.getValue().getMsgCount(), entry.getValue().getMsgSize()});
-                        }
-                        counters.clear();
-                    }
-                }
-            } catch (InterruptedException e) {
-                return;
-            } catch (Throwable t) {
-                //
-            }
-        }
-    }
-
-    @Override
-    public void close(long waitTimeMs) {
-        if (super.stop()) {
-            return;
-        }
-        int befIndex = index.get();
-        ConcurrentHashMap<String, CountItem> counters;
-        for (int i = 0; i < countSets.length; i++) {
-            counters = countSets[(++befIndex) % 2].counterItem;
-            if (counters != null) {
-                for (Map.Entry<String, CountItem> entry : counters.entrySet()) {
-                    logger.info("{}#{}#{}#{}", new Object[]{cntHdr, entry.getKey(),
-                            entry.getValue().getMsgCount(), entry.getValue().getMsgSize()});
-                }
-                counters.clear();
-            }
-        }
-    }
-
-    @Override
-    public void add(Map<String, CountItem> counterGroup) {
-        CountSet countSet = countSets[index.get()];
-        countSet.refCnt.incrementAndGet();
-        ConcurrentHashMap<String, CountItem> counters = countSet.counterItem;
-        for (Entry<String, CountItem> entry : counterGroup.entrySet()) {
-            CountItem currData = counters.get(entry.getKey());
-            if (currData == null) {
-                CountItem tmpData = new CountItem(0L, 0L);
-                currData = counters.putIfAbsent(entry.getKey(), tmpData);
-                if (currData == null) {
-                    currData = tmpData;
-                }
-            }
-            currData.appendMsg(entry.getValue().getMsgCount(), entry.getValue().getMsgSize());
-        }
-        countSet.refCnt.decrementAndGet();
-    }
-
-    @Override
-    public void add(String name, Long delta, int msgSize) {
-        CountSet countSet = countSets[index.get()];
-        countSet.refCnt.incrementAndGet();
-        CountItem currData = countSet.counterItem.get(name);
-        if (currData == null) {
-            CountItem tmpData = new CountItem(0L, 0L);
-            currData = countSet.counterItem.putIfAbsent(name, tmpData);
-            if (currData == null) {
-                currData = tmpData;
-            }
-        }
-        currData.appendMsg(delta, msgSize);
-        countSet.refCnt.decrementAndGet();
-    }
-
-    private static class CountSet {
-        public AtomicLong refCnt = new AtomicLong(0);
-        public ConcurrentHashMap<String, CountItem> counterItem =
-                new ConcurrentHashMap<>();
-    }
-}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountItem.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficInfo.java
similarity index 61%
rename from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountItem.java
rename to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficInfo.java
index 58ed92f..0968552 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountItem.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficInfo.java
@@ -17,39 +17,37 @@
 
 package org.apache.inlong.tubemq.server.broker.stats;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * Statistic of message, contains message's count and message's size.
  */
-public class CountItem {
-    AtomicLong msgCount = new AtomicLong(0);
-    AtomicLong msgSize = new AtomicLong(0);
-
-    public CountItem(long msgCount, long msgSize) {
-        this.msgCount.set(msgCount);
-        this.msgSize.set(msgSize);
-    }
+public class TrafficInfo {
+    private long msgCnt = 0L;
+    private long msgSize = 0L;
 
-    public long getMsgSize() {
-        return msgSize.get();
+    public TrafficInfo() {
+        clear();
     }
 
-    public void setMsgSize(long msgSize) {
-        this.msgSize.set(msgSize);
+    public TrafficInfo(long msgCount, long msgSize) {
+        this.msgCnt = msgCount;
+        this.msgSize = msgSize;
     }
 
     public long getMsgCount() {
-        return msgCount.get();
+        return msgCnt;
     }
 
-    public void setMsgCount(long msgCount) {
-        this.msgCount.set(msgCount);
+    public long getMsgSize() {
+        return msgSize;
     }
 
-    public void appendMsg(final long msgCount, final long msgSize) {
-        this.msgCount.addAndGet(msgCount);
-        this.msgSize.addAndGet(msgSize);
+    public void addMsgCntAndSize(long msgCount, long msgSize) {
+        this.msgCnt += msgCount;
+        this.msgSize += msgSize;
     }
 
+    public void clear() {
+        this.msgCnt = 0L;
+        this.msgSize = 0L;
+    }
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficService.java
similarity index 54%
rename from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountService.java
rename to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficService.java
index d502a11..ea6a7b2 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficService.java
@@ -19,11 +19,34 @@ package org.apache.inlong.tubemq.server.broker.stats;
 
 import java.util.Map;
 
-public interface CountService {
+/**
+ * TrafficService, incoming and outgoing traffic statistics service
+ *
+ * Supports adding new metric data one by one or in batches,
+ * and outputting metric data to a file at specified intervals.
+ */
+public interface TrafficService {
 
+    /**
+     * Close service.
+     *
+     * @param waitTimeMs  the wait time
+     */
     void close(long waitTimeMs);
 
-    void add(Map<String, CountItem> counterGroup);
+    /**
+     * Add traffic information in batches
+     *
+     * @param trafficInfos  the traffic information
+     */
+    void add(Map<String, TrafficInfo> trafficInfos);
 
-    void add(String name, Long delta, int msgSize);
+    /**
+     * Add a traffic information record
+     *
+     * @param statsKey  the statistical key
+     * @param msgCnt    the total message count
+     * @param msgSize   the total message size
+     */
+    void add(String statsKey, long msgCnt, long msgSize);
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
new file mode 100644
index 0000000..34c678b
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.stats;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TrafficStatsService, Input and Output traffic statistics Service
+ *
+ *  Due to the large amount of traffic-related metric data, this statistics service uses
+ *  a daemon thread to periodically refresh the data to the special metric file
+ *  for metric data collection.
+ */
+public class TrafficStatsService extends AbstractDaemonService implements TrafficService {
+    // Maximum write wait time
+    private static final long MAX_WRITING_WAIT_DLT = 5000L;
+    // Statistics output log file
+    private final Logger logger;
+    // Statistic category
+    private final String statsCat;
+    // Switchable traffic statistic units
+    private final WritableUnit[] switchableUnits = new WritableUnit[2];
+    // Current writable index
+    private final AtomicInteger writableIndex = new AtomicInteger(0);
+
+    /**
+     * Initial traffic statistics service
+     *
+     * @param logFileName      the output file name
+     * @param countType        the statistic type
+     * @param scanIntervalMs   the snapshot interval
+     */
+    public TrafficStatsService(String logFileName, String countType, long scanIntervalMs) {
+        super(logFileName, scanIntervalMs);
+        this.statsCat = countType;
+        if (logFileName == null) {
+            this.logger = LoggerFactory.getLogger(TrafficStatsService.class);
+        } else {
+            this.logger = LoggerFactory.getLogger(logFileName);
+        }
+        switchableUnits[0] = new WritableUnit();
+        switchableUnits[1] = new WritableUnit();
+        super.start();
+    }
+
+    @Override
+    protected void loopProcess(long intervalMs) {
+        int befIndex;
+        while (!super.isStopped()) {
+            try {
+                Thread.sleep(intervalMs);
+                // Snapshot metric data
+                befIndex = writableIndex.getAndIncrement();
+                // Output 2 file
+                output2file(befIndex);
+            } catch (InterruptedException e) {
+                return;
+            } catch (Throwable t) {
+                //
+            }
+        }
+    }
+
+    @Override
+    public void close(long waitTimeMs) {
+        if (super.stop()) {
+            return;
+        }
+        // Output remain information
+        int index = writableIndex.get();
+        for (int i = 0; i < switchableUnits.length; i++) {
+            output2file(++index);
+        }
+    }
+
+    @Override
+    public void add(Map<String, TrafficInfo> trafficInfos) {
+        TrafficStatsSet tmpStatsSet;
+        TrafficStatsSet trafficStatsSet;
+        // Increment write reference count
+        switchableUnits[getIndex()].refCnt.incValue();
+        try {
+            // Accumulate statistics information
+            ConcurrentHashMap<String, TrafficStatsSet> tmpStatsSetMap =
+                    switchableUnits[getIndex()].statsUnitMap;
+            for (Entry<String, TrafficInfo> entry : trafficInfos.entrySet()) {
+                trafficStatsSet = tmpStatsSetMap.get(entry.getKey());
+                if (trafficStatsSet == null) {
+                    tmpStatsSet = new TrafficStatsSet();
+                    trafficStatsSet = tmpStatsSetMap.putIfAbsent(entry.getKey(), tmpStatsSet);
+                    if (trafficStatsSet == null) {
+                        trafficStatsSet = tmpStatsSet;
+                    }
+                }
+                trafficStatsSet.addMsgCntAndSize(
+                        entry.getValue().getMsgCount(), entry.getValue().getMsgSize());
+            }
+        } finally {
+            // Decrement write reference count
+            switchableUnits[getIndex()].refCnt.decValue();
+        }
+    }
+
+    @Override
+    public void add(String statsKey, long msgCnt, long msgSize) {
+        // Increment write reference count
+        switchableUnits[getIndex()].refCnt.incValue();
+        try {
+            // Accumulate statistics information
+            ConcurrentHashMap<String, TrafficStatsSet> tmpStatsSetMap =
+                    switchableUnits[getIndex()].statsUnitMap;
+            TrafficStatsSet trafficStatsSet = tmpStatsSetMap.get(statsKey);
+            if (trafficStatsSet == null) {
+                TrafficStatsSet tmpStatsSet = new TrafficStatsSet();
+                trafficStatsSet = tmpStatsSetMap.putIfAbsent(statsKey, tmpStatsSet);
+                if (trafficStatsSet == null) {
+                    trafficStatsSet = tmpStatsSet;
+                }
+            }
+            trafficStatsSet.addMsgCntAndSize(msgCnt, msgSize);
+        } finally {
+            // Decrement write reference count
+            switchableUnits[getIndex()].refCnt.decValue();
+        }
+    }
+
+    /**
+     * Print statistics data to file
+     *
+     * @param readIndex   the readable index
+     */
+    private void output2file(int readIndex) {
+        WritableUnit selectedUnit =
+                switchableUnits[getIndex(readIndex)];
+        if (selectedUnit == null) {
+            return;
+        }
+        // Wait for the data update operation to complete
+        long startTime = System.currentTimeMillis();
+        do {
+            if (System.currentTimeMillis() - startTime >= MAX_WRITING_WAIT_DLT) {
+                break;
+            }
+            try {
+                Thread.sleep(20);
+            } catch (InterruptedException e) {
+                break;
+            }
+        } while (selectedUnit.refCnt.getValue() > 0);
+        // Output data to file
+        Map<String, TrafficStatsSet> statsMap = selectedUnit.statsUnitMap;
+        for (Entry<String, TrafficStatsSet> entry : statsMap.entrySet()) {
+            logger.info("{}#{}#{}#{}", statsCat, entry.getKey(),
+                    entry.getValue().msgCnt.getAndResetValue(),
+                    entry.getValue().msgSize.getAndResetValue());
+        }
+        statsMap.clear();
+    }
+
+    /**
+     * Get current writable block index.
+     *
+     * @return the writable block index
+     */
+    private int getIndex() {
+        return getIndex(this.writableIndex.get());
+    }
+
+    /**
+     * Gets the metric block index based on the specified value.
+     *
+     * @param origIndex    the specified value
+     * @return the metric block index
+     */
+    private int getIndex(int origIndex) {
+        return Math.abs(origIndex % 2);
+    }
+
+    /**
+     * StatsItemSet, Metric Statistics item set
+     *
+     * Currently includes the total number of messages and bytes
+     * according to the statistics dimension, which can be expanded later as needed
+     */
+    private static class TrafficStatsSet {
+        protected LongStatsCounter msgCnt =
+                new LongStatsCounter("msgCount", null);
+        protected LongStatsCounter msgSize =
+                new LongStatsCounter("msgSize", null);
+
+        public TrafficStatsSet() {
+            //
+        }
+
+        /**
+         * Accumulate the count of messages and message bytes.
+         *
+         * @param msgCount  the specified message count
+         * @param msgSize   the specified message size
+         */
+        public void addMsgCntAndSize(long msgCount, long msgSize) {
+            this.msgCnt.addValue(msgCount);
+            this.msgSize.addValue(msgSize);
+        }
+    }
+
+    /**
+     * WritableUnit,
+     *
+     * This class is mainly defined to facilitate reading and writing of
+     * statistic set through array operations, which contains a Map of
+     * statistic dimensions and corresponding metric values
+     */
+    private static class WritableUnit {
+        // Current writing thread count
+        public LongOnlineCounter refCnt =
+                new LongOnlineCounter("ref_count", null);
+        // statistic unit map
+        protected ConcurrentHashMap<String, TrafficStatsSet> statsUnitMap =
+                new ConcurrentHashMap<>(512);
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
index 5c1eabd..f74cd3f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
@@ -27,7 +27,7 @@ import org.apache.inlong.tubemq.corebase.TokenConstants;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
 import org.apache.inlong.tubemq.corebase.utils.MessageFlagUtils;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
-import org.apache.inlong.tubemq.server.broker.stats.CountItem;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
 
 /**
  * Storage util. Used for data and index file storage format.
@@ -113,10 +113,10 @@ public class DataStoreUtils {
      * @param sBuilder        the string buffer
      * @return                the converted messages
      */
-    public static ClientBroker.TransferedMessage getTransferMsg(final ByteBuffer dataBuffer, int dataTotalSize,
-                                                                final HashMap<String, CountItem> countMap,
-                                                                final String statisKeyBase,
-                                                                final StringBuilder sBuilder) {
+    public static ClientBroker.TransferedMessage getTransferMsg(ByteBuffer dataBuffer, int dataTotalSize,
+                                                                HashMap<String, TrafficInfo> countMap,
+                                                                String statisKeyBase,
+                                                                StringBuilder sBuilder) {
         if (dataBuffer.array().length < dataTotalSize) {
             return null;
         }
@@ -187,11 +187,11 @@ public class DataStoreUtils {
         String baseKey = sBuilder.append(statisKeyBase)
                 .append("#").append(messageTime).toString();
         sBuilder.delete(0, sBuilder.length());
-        CountItem getCount = countMap.get(baseKey);
+        TrafficInfo getCount = countMap.get(baseKey);
         if (getCount == null) {
-            countMap.put(baseKey, new CountItem(1L, payLoadLen2));
+            countMap.put(baseKey, new TrafficInfo(1L, payLoadLen2));
         } else {
-            getCount.appendMsg(1L, payLoadLen2);
+            getCount.addMsgCntAndSize(1L, payLoadLen2);
         }
         ClientBroker.TransferedMessage transferedMessage = dataBuilder.build();
         dataBuilder.clear();
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountServiceTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountServiceTest.java
deleted file mode 100644
index b9e80fe..0000000
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountServiceTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.Test;
-
-/**
- * GroupCountService test.
- */
-public class GroupCountServiceTest {
-
-    @Test
-    public void add() {
-        GroupCountService groupCountService = new GroupCountService("PutCounterGroup", "Producer", 60 * 1000);
-        groupCountService.add("key", 1L, 100);
-        Map<String, CountItem> items = new HashMap<>();
-        items.put("key1", new CountItem(1L, 1024));
-        items.put("key2", new CountItem(1L, 1024));
-        // add counts
-        groupCountService.add(items);
-    }
-}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java
index f8ae24b..4da079d 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java
@@ -75,6 +75,7 @@ public class ServiceStatsHolderTest {
         Assert.assertEquals(10, retMap.get("file_sync_dlt_min").longValue());
         Assert.assertEquals(1, retMap.get("file_sync_dlt_cell_8t16").longValue());
         Assert.assertEquals(1, retMap.get("file_sync_dlt_cell_64t128").longValue());
+        final long sinceTime1 = retMap.get("reset_time");
         // verify snapshot
         ServiceStatsHolder.snapShort(retMap);
         retMap.clear();
@@ -83,6 +84,7 @@ public class ServiceStatsHolderTest {
         // add disk sync data, add 1
         ServiceStatsHolder.updDiskSyncDataDlt(999);
         ServiceStatsHolder.snapShort(retMap);
+        Assert.assertNotEquals(sinceTime1, retMap.get("reset_time").longValue());
         Assert.assertEquals(2, retMap.get("consumer_online_cnt").longValue());
         Assert.assertEquals(0, retMap.get("consumer_timeout_cnt").longValue());
         Assert.assertEquals(0, retMap.get("broker_hb_exc_cnt").longValue());
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsServiceTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsServiceTest.java
new file mode 100644
index 0000000..e8e08ea
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsServiceTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * TrafficStatsService test.
+ */
+public class TrafficStatsServiceTest {
+
+    @Test
+    public void testTrafficInfo() {
+        // case 1
+        TrafficInfo trafficInfo1 = new TrafficInfo();
+        trafficInfo1.addMsgCntAndSize(1, 100);
+        trafficInfo1.addMsgCntAndSize(3, 500);
+        Assert.assertEquals(4, trafficInfo1.getMsgCount());
+        Assert.assertEquals(600, trafficInfo1.getMsgSize());
+        trafficInfo1.clear();
+        trafficInfo1.addMsgCntAndSize(50, 5000);
+        Assert.assertEquals(50, trafficInfo1.getMsgCount());
+        Assert.assertEquals(5000, trafficInfo1.getMsgSize());
+        // case 2
+        TrafficInfo trafficInfo2 = new TrafficInfo(99, 1000);
+        trafficInfo2.addMsgCntAndSize(1, 100);
+        Assert.assertEquals(100, trafficInfo2.getMsgCount());
+        Assert.assertEquals(1100, trafficInfo2.getMsgSize());
+    }
+
+    @Test
+    public void testTrafficStatsService() {
+        TrafficStatsService trafficService =
+                new TrafficStatsService("PutCounterGroup", "Producer", 60 * 1000L);
+        trafficService.add("key", 1L, 100);
+        Map<String, TrafficInfo> items = new HashMap<>();
+        items.put("key1", new TrafficInfo(1L, 1024));
+        items.put("key2", new TrafficInfo(1L, 1024));
+        // add counts
+        trafficService.add(items);
+        trafficService.add("key3", 3L, 500L);
+    }
+}