You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/27 06:17:09 UTC

[inlong] branch master updated: [INLONG-6035][TubeMQ] Add Broker's message append and file refresh delay statistics (#6036)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bbb5d99d [INLONG-6035][TubeMQ] Add Broker's message append and file refresh delay statistics (#6036)
6bbb5d99d is described below

commit 6bbb5d99d09f4de5f26f79f2b04ab9905014743f
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Sep 27 14:17:04 2022 +0800

    [INLONG-6035][TubeMQ] Add Broker's message append and file refresh delay statistics (#6036)
---
 .../inlong/tubemq/server/broker/TubeBroker.java    | 63 +++++++++---------
 .../server/broker/metadata/TopicMetadata.java      |  2 +-
 .../server/broker/msgstore/MessageStore.java       | 47 +++++++++-----
 .../broker/msgstore/MessageStoreManager.java       | 23 +++----
 .../server/broker/msgstore/disk/MsgFileStore.java  | 75 +++++++++++++---------
 .../server/broker/msgstore/mem/MsgMemStore.java    | 17 ++---
 .../server/broker/offset/DefaultOffsetManager.java | 10 +--
 .../server/broker/stats/MsgStoreStatsHolder.java   | 45 +++++++------
 .../broker/stats/MsgStoreStatsHolderTest.java      | 58 +++++++++--------
 9 files changed, 191 insertions(+), 149 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
index fb248059f..397ccfd1f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
@@ -320,24 +320,24 @@ public class TubeBroker implements Stoppable {
                 .append(TubeServerVersion.SERVER_VERSION).toString();
     }
 
-    private void procConfigFromHeartBeat(StringBuilder sBuilder,
+    private void procConfigFromHeartBeat(StringBuilder strBuff,
                                          HeartResponseM2B response) {
         // process service status
         ServiceStatusHolder
                 .setReadWriteServiceStatus(response.getStopRead(),
                         response.getStopWrite(), "Master");
-        // process flow controller rules
-        FlowCtrlRuleHandler flowCtrlRuleHandler =
+        // update default flow controller rules
+        FlowCtrlRuleHandler defFlowCtrlHandler =
                 metadataManager.getFlowCtrlRuleHandler();
-        long flowCheckId = flowCtrlRuleHandler.getFlowCtrlId();
-        int qryPriorityId = flowCtrlRuleHandler.getQryPriorityId();
+        long flowCheckId = defFlowCtrlHandler.getFlowCtrlId();
+        int qryPriorityId = defFlowCtrlHandler.getQryPriorityId();
         if (response.hasFlowCheckId()) {
             qryPriorityId = response.hasQryPriorityId()
                     ? response.getQryPriorityId() : qryPriorityId;
             if (response.getFlowCheckId() != flowCheckId) {
                 flowCheckId = response.getFlowCheckId();
                 try {
-                    flowCtrlRuleHandler
+                    defFlowCtrlHandler
                             .updateFlowCtrlInfo(qryPriorityId,
                                     flowCheckId, response.getFlowControlInfo());
                 } catch (Exception e1) {
@@ -345,8 +345,8 @@ public class TubeBroker implements Stoppable {
                             "[HeartBeat response] found parse flowCtrl rules failure", e1);
                 }
             }
-            if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
-                flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+            if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) {
+                defFlowCtrlHandler.setQryPriorityId(qryPriorityId);
             }
         }
         // update configure report requirement
@@ -356,36 +356,37 @@ public class TubeBroker implements Stoppable {
             long configId = response.getClsConfig().getConfigId();
             if (configId != ClusterConfigHolder.getConfigId()) {
                 ClusterConfigHolder.updClusterSetting(response.getClsConfig());
-                logger.info(sBuilder
-                        .append("[HeartBeat response] received cluster configure changed,")
+                logger.info(strBuff
+                        .append("[HeartBeat response] received cluster configure changed")
                         .append(",hasClsConfig=").append(response.hasClsConfig())
                         .append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId())
                         .append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize())
                         .append(",minMemCacheSize=")
                         .append(ClusterConfigHolder.getMinMemCacheSize())
                         .toString());
-                sBuilder.delete(0, sBuilder.length());
+                strBuff.delete(0, strBuff.length());
             }
         }
         if (response.getTakeConfInfo()) {
-            logger.info(sBuilder
+            logger.info(strBuff
                     .append("[HeartBeat response] received broker metadata info: brokerConfId=")
                     .append(response.getCurBrokerConfId())
                     .append(",stopWrite=").append(response.getStopWrite())
                     .append(",stopRead=").append(response.getStopRead())
                     .append(",configCheckSumId=").append(response.getConfCheckSumId())
                     .append(",hasFlowCtrl=").append(response.hasFlowCheckId())
-                    .append(",curFlowCtrlId=").append(flowCheckId)
-                    .append(",curQryPriorityId=").append(qryPriorityId)
+                    .append(",curFlowCtrlId=").append(defFlowCtrlHandler.getFlowCtrlId())
+                    .append(",curQryPriorityId=").append(defFlowCtrlHandler.getQryPriorityId())
+                    .append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize())
                     .append(",brokerDefaultConfInfo=")
                     .append(response.getBrokerDefaultConfInfo())
                     .append(",brokerTopicSetConfList=")
-                    .append(response.getBrokerTopicSetConfInfoList().toString()).toString());
-            sBuilder.delete(0, sBuilder.length());
+                    .append(response.getBrokerTopicSetConfInfoList()).toString());
+            strBuff.delete(0, strBuff.length());
             metadataManager
                     .updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
                             response.getConfCheckSumId(), response.getBrokerDefaultConfInfo(),
-                            response.getBrokerTopicSetConfInfoList(), false, sBuilder);
+                            response.getBrokerTopicSetConfInfoList(), false, strBuff);
         }
         // update auth info
         if (response.hasBrokerAuthorizedInfo()) {
@@ -395,7 +396,7 @@ public class TubeBroker implements Stoppable {
         boolean needProcess =
                 metadataManager.updateBrokerRemoveTopicMap(
                         response.getTakeRemoveTopicInfo(),
-                        response.getRemoveTopicConfInfoList(), sBuilder);
+                        response.getRemoveTopicConfInfoList(), strBuff);
         if (needProcess) {
             new Thread() {
                 @Override
@@ -440,29 +441,29 @@ public class TubeBroker implements Stoppable {
         }
     }
 
-    private void procConfigFromRegister(StringBuilder sBuilder,
+    private void procConfigFromRegister(StringBuilder strBuff,
                                         final RegisterResponseM2B response) {
         // process service status
         ServiceStatusHolder
                 .setReadWriteServiceStatus(response.getStopRead(),
                         response.getStopWrite(), "Master");
-        // process flow controller rules
-        FlowCtrlRuleHandler flowCtrlRuleHandler =
+        // process default flow controller rules
+        FlowCtrlRuleHandler defFlowCtrlHandler =
                 metadataManager.getFlowCtrlRuleHandler();
         if (response.hasFlowCheckId()) {
             int qryPriorityId = response.hasQryPriorityId()
-                    ? response.getQryPriorityId() : flowCtrlRuleHandler.getQryPriorityId();
-            if (response.getFlowCheckId() != flowCtrlRuleHandler.getFlowCtrlId()) {
+                    ? response.getQryPriorityId() : defFlowCtrlHandler.getQryPriorityId();
+            if (response.getFlowCheckId() != defFlowCtrlHandler.getFlowCtrlId()) {
                 try {
-                    flowCtrlRuleHandler
+                    defFlowCtrlHandler
                             .updateFlowCtrlInfo(response.getQryPriorityId(),
                                     response.getFlowCheckId(), response.getFlowControlInfo());
                 } catch (Exception e1) {
                     logger.warn("[Register response] found parse flowCtrl rules failure", e1);
                 }
             }
-            if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
-                flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+            if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) {
+                defFlowCtrlHandler.setQryPriorityId(qryPriorityId);
             }
         }
         // update auth info
@@ -477,14 +478,14 @@ public class TubeBroker implements Stoppable {
                 ClusterConfigHolder.updClusterSetting(response.getClsConfig());
             }
         }
-        sBuilder.append("[Register response] received broker metadata info: brokerConfId=")
+        strBuff.append("[Register response] received broker metadata info: brokerConfId=")
                 .append(response.getCurBrokerConfId())
                 .append(",stopWrite=").append(response.getStopWrite())
                 .append(",stopRead=").append(response.getStopRead())
                 .append(",configCheckSumId=").append(response.getConfCheckSumId())
                 .append(",hasFlowCtrl=").append(response.hasFlowCheckId())
-                .append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
-                .append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
+                .append(",curFlowCtrlId=").append(defFlowCtrlHandler.getFlowCtrlId())
+                .append(",curQryPriorityId=").append(defFlowCtrlHandler.getQryPriorityId())
                 .append(",hasClsConfig=").append(response.hasClsConfig())
                 .append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId())
                 .append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize())
@@ -501,10 +502,10 @@ public class TubeBroker implements Stoppable {
                 .append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo())
                 .append(",brokerTopicSetConfList=")
                 .append(response.getBrokerTopicSetConfInfoList().toString()).toString();
-        sBuilder.delete(0, sBuilder.length());
+        strBuff.delete(0, strBuff.length());
         metadataManager.updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
                 response.getConfCheckSumId(), response.getBrokerDefaultConfInfo(),
-                response.getBrokerTopicSetConfInfoList(), true, sBuilder);
+                response.getBrokerTopicSetConfInfoList(), true, strBuff);
     }
 
     // build cluster configure info
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java
index 77116a3f2..c4ed4b32a 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java
@@ -78,7 +78,7 @@ public class TopicMetadata {
             return;
         }
         String[] topicConfInfoArr =
-                topicMetaConfInfo.split(TokenConstants.ATTR_SEP);
+                topicMetaConfInfo.split(TokenConstants.ATTR_SEP, -1);
         this.topic = topicConfInfoArr[0];
         if (TStringUtils.isBlank(topicConfInfoArr[1])) {
             this.numPartitions = brokerDefMetadata.getNumPartitions();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 8cc968a93..c147f4858 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -333,7 +333,8 @@ public class MessageStore implements Closeable {
                     .append("[Data Store] Closed MessageStore for storeKey ")
                     .append(this.storeKey).toString());
         }
-        if (timestamp <= this.msgFileStore.getIndexMaxAppendTime()) {
+        if (timestamp <= this.msgFileStore.getIndexMaxAppendTime()
+                || !tubeConfig.isEnableMemStore()) {
             return this.msgFileStore.getStartOffsetByTimeStamp(timestamp);
         }
         this.writeCacheMutex.readLock().lock();
@@ -428,20 +429,27 @@ public class MessageStore implements Closeable {
         indexBuffer.putLong(receivedTime);
         indexBuffer.flip();
         appendResult.putReceivedInfo(messageId, receivedTime);
+        boolean appendSuss = true;
+        long startTime = System.currentTimeMillis();
         if (this.tubeConfig.isEnableMemStore()) {
             do {
                 this.writeCacheMutex.readLock().lock();
                 try {
-                    if (this.msgMemStore.appendMsg(msgStoreStatsHolder,
+                    appendSuss = this.msgMemStore.appendMsg(msgStoreStatsHolder,
                             partitionId, msgTypeCode, receivedTime, indexBuffer,
-                            msgBufLen, dataBuffer, appendResult)) {
-                        return true;
-                    }
+                            msgBufLen, dataBuffer, appendResult);
                 } finally {
                     this.writeCacheMutex.readLock().unlock();
                 }
+                if (appendSuss) {
+                    msgStoreStatsHolder.addMsgWriteSuccess(msgBufLen,
+                            System.currentTimeMillis() - startTime);
+                    return true;
+                }
                 if (triggerFlushAndAddMsg(true, false, partitionId, msgTypeCode,
                         receivedTime, indexBuffer, msgBufLen, dataBuffer, appendResult)) {
+                    msgStoreStatsHolder.addMsgWriteSuccess(msgBufLen,
+                            System.currentTimeMillis() - startTime);
                     return true;
                 }
                 ThreadUtils.sleep(waitRetryMs);
@@ -452,11 +460,17 @@ public class MessageStore implements Closeable {
             StringBuilder strBuffer =
                     new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
             Tuple3<Boolean, Long, Long> appendRet =
-                    this.msgFileStore.appendMsg(strBuffer, 1,
+                    this.msgFileStore.appendMsg(false, startTime, strBuffer, 1,
                             DataStoreUtils.STORE_INDEX_HEAD_LEN, indexBuffer,
-                            msgBufLen, dataBuffer,receivedTime, receivedTime);
+                            msgBufLen, dataBuffer, receivedTime, receivedTime);
             appendResult.putAppendResult(appendRet.getF1(), appendRet.getF2());
-            return true;
+            if (appendRet.getF0()) {
+                msgStoreStatsHolder.addMsgWriteSuccess(msgBufLen,
+                        System.currentTimeMillis() - startTime);
+            } else {
+                msgStoreStatsHolder.addMsgWriteFailure();
+            }
+            return appendRet.getF0();
         }
     }
 
@@ -562,7 +576,7 @@ public class MessageStore implements Closeable {
             strBuffer.delete(0, strBuffer.length());
             if (tubeConfig.isEnableMemStore()) {
                 ThreadUtils.sleep(100);
-                flush(System.currentTimeMillis(), strBuffer);
+                flush(strBuffer);
                 this.msgMemStore.close();
                 this.msgMemStoreBeingFlush.close();
                 this.executor.shutdown();
@@ -672,7 +686,7 @@ public class MessageStore implements Closeable {
      */
     public long getIndexStoreSize() {
         long totalSize = 0L;
-        if (!tubeConfig.isEnableMemStore()) {
+        if (tubeConfig.isEnableMemStore()) {
             this.writeCacheMutex.readLock().lock();
             try {
                 if (this.msgMemStore.getCurMsgCount() > 0) {
@@ -698,7 +712,7 @@ public class MessageStore implements Closeable {
      */
     public long getDataStoreSize() {
         long totalSize = 0L;
-        if (!tubeConfig.isEnableMemStore()) {
+        if (tubeConfig.isEnableMemStore()) {
             this.writeCacheMutex.readLock().lock();
             try {
                 if (this.msgMemStore.getCurMsgCount() > 0) {
@@ -777,15 +791,15 @@ public class MessageStore implements Closeable {
                 this.executor.execute(new Runnable() {
                     @Override
                     public void run() {
-                        long startTime2 = System.currentTimeMillis();
                         try {
                             final StringBuilder strBuffer = new StringBuilder(512);
-                            flush(startTime2, strBuffer);
+                            flush(strBuffer);
                         } catch (Throwable e) {
                             logger.error("[Data Store] Error during flush", e);
                         } finally {
-                            msgStoreStatsHolder.addCacheTimeoutFlush(
-                                    (System.currentTimeMillis() - startTime2), isTimeTrigger);
+                            if (isTimeTrigger) {
+                                msgStoreStatsHolder.addCacheTimeoutFlush();
+                            }
                         }
                     }
                 });
@@ -819,7 +833,8 @@ public class MessageStore implements Closeable {
         return false;
     }
 
-    private void flush(long startTime, StringBuilder strBuffer) throws IOException {
+    private void flush(StringBuilder strBuffer) throws IOException {
+        long startTime = System.currentTimeMillis();
         flushMutex.lock();
         this.lastMemFlushTime.set(System.currentTimeMillis());
         try {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 1539c9294..5d9299058 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -421,16 +421,18 @@ public class MessageStoreManager implements StoreService {
         if (qryTopicSet.isEmpty()) {
             return topicPubStoreInfoMap;
         }
+        Map<Integer, MessageStore> storeMap;
+        Map<Integer, TopicPubStoreInfo> storeInfoMap;
         for (String topic : qryTopicSet) {
             topicMetadata = confTopicInfo.get(topic);
             if (topicMetadata == null) {
                 continue;
             }
-            Map<Integer, MessageStore> storeMap = dataStores.get(topic);
+            storeMap = dataStores.get(topic);
             if (storeMap == null) {
                 continue;
             }
-            Map<Integer, TopicPubStoreInfo> storeInfoMap = new HashMap<>();
+            storeInfoMap = new HashMap<>();
             for (Map.Entry<Integer, MessageStore> entry : storeMap.entrySet()) {
                 if (entry == null
                         || entry.getKey() == null
@@ -439,11 +441,10 @@ public class MessageStoreManager implements StoreService {
                 }
                 store = entry.getValue();
                 for (Integer partitionId : topicMetadata.getPartIdsByStoreId(entry.getKey())) {
-                    TopicPubStoreInfo storeInfo =
-                            new TopicPubStoreInfo(topic, entry.getKey(), partitionId,
-                                    store.getIndexMinOffset(), store.getIndexMaxOffset(),
-                                    store.getDataMinOffset(), store.getDataMaxOffset());
-                    storeInfoMap.put(partitionId, storeInfo);
+                    storeInfoMap.put(partitionId, new TopicPubStoreInfo(topic,
+                            entry.getKey(), partitionId, store.getIndexMinOffset(),
+                            store.getIndexMaxOffset(), store.getDataMinOffset(),
+                            store.getDataMaxOffset()));
                 }
             }
             topicPubStoreInfoMap.put(topic, storeInfoMap);
@@ -460,12 +461,13 @@ public class MessageStoreManager implements StoreService {
     @Override
     public void getTopicPublishInfos(Map<String, OffsetRecordInfo> groupOffsetMap) {
         MessageStore store = null;
+        Map<Integer, MessageStore> storeMap;
+        Map<String, Map<Integer, RecordItem>> topicOffsetMap;
         for (Map.Entry<String, OffsetRecordInfo> entry : groupOffsetMap.entrySet()) {
             if (entry == null || entry.getKey() == null || entry.getValue() == null) {
                 continue;
             }
-            Map<String, Map<Integer, RecordItem>> topicOffsetMap =
-                    entry.getValue().getOffsetMap();
+            topicOffsetMap = entry.getValue().getOffsetMap();
             // Get offset records by topic
             for (Map.Entry<String, Map<Integer, RecordItem>> entryTopic
                     : topicOffsetMap.entrySet()) {
@@ -475,8 +477,7 @@ public class MessageStoreManager implements StoreService {
                     continue;
                 }
                 // Get message store instance
-                Map<Integer, MessageStore> storeMap =
-                        dataStores.get(entryTopic.getKey());
+                storeMap = dataStores.get(entryTopic.getKey());
                 if (storeMap == null) {
                     continue;
                 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 9c9723418..f7fe541eb 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -118,6 +118,8 @@ public class MsgFileStore implements Closeable {
     /**
      * Append message to file segment
      *
+     * @param fromMem        whether data from memory directly
+     * @param currTime       the current time
      * @param sb             string buffer
      * @param msgCnt         the record count to append
      * @param indexSize      the index buffer length
@@ -128,7 +130,8 @@ public class MsgFileStore implements Closeable {
      * @param rightTime      the latest record timestamp
      * @return      file storage status, the index and data offsets of the added message
      */
-    public Tuple3<Boolean, Long, Long> appendMsg(StringBuilder sb, int msgCnt,
+    public Tuple3<Boolean, Long, Long> appendMsg(boolean fromMem, long currTime,
+                                                 StringBuilder sb, int msgCnt,
                                                  int indexSize, ByteBuffer indexBuffer,
                                                  int dataSize, ByteBuffer dataBuffer,
                                                  long leftTime, long rightTime) {
@@ -145,7 +148,7 @@ public class MsgFileStore implements Closeable {
         boolean pendingMsgSizeExceed = false;
         boolean pendingMsgTimeExceed = false;
         boolean isForceMetadata = false;
-        // flushed message message count and data size info
+        // flushed message count and data size info
         long flushedMsgCnt = 0;
         long flushedDataSize = 0;
         // Temporary variables in calculations
@@ -155,19 +158,29 @@ public class MsgFileStore implements Closeable {
         long inDataOffset;
         Segment curIndexSeg;
         long indexOffset = -1;
-        long currTime;
         // new file paths of creating
         String newDataFilePath = null;
         String newIndexFilePath = null;
         boolean fileStoreOK = false;
         this.writeLock.lock();
         try {
-            inIndexOffset = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
-            // filling data segment.
+            // position last segments
             curDataSeg = this.dataSegments.last();
+            curIndexSeg = this.indexSegments.last();
+            // get inputted offsets
+            if (fromMem) {
+                inIndexOffset = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
+                inDataOffset = indexBuffer.getLong(DataStoreUtils.INDEX_POS_DATAOFFSET);
+            } else {
+                inIndexOffset = curIndexSeg.getLast();
+                inDataOffset = curDataSeg.getLast();
+                indexBuffer.putLong(DataStoreUtils.INDEX_POS_DATAOFFSET, inDataOffset);
+                dataBuffer.putLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF, inIndexOffset);
+            }
+            // filling data segment.
             this.curUnflushSize.addAndGet(dataSize);
             dataOffset = curDataSeg.append(dataBuffer, leftTime, rightTime);
-            // judge whether need to create a new data segment.
+            // judge whether we need to create a new data segment.
             if (curDataSeg.getCachedSize() >= this.tubeConfig.getMaxSegmentSize()) {
                 isDataSegFlushed = true;
                 long newDataOffset = curDataSeg.flush(true);
@@ -179,10 +192,8 @@ public class MsgFileStore implements Closeable {
                 this.dataSegments.append(new FileSegment(newDataOffset, newDataFile, SegmentType.DATA));
             }
             // filling index data.
-            inDataOffset = indexBuffer.getLong(DataStoreUtils.INDEX_POS_DATAOFFSET);
-            curIndexSeg = this.indexSegments.last();
             indexOffset = curIndexSeg.append(indexBuffer, leftTime, rightTime);
-            // judge whether need to create a new index segment.
+            // judge whether we need to create a new index segment.
             if (curIndexSeg.getCachedSize()
                 >= this.tubeConfig.getMaxIndexSegmentSize()) {
                 isIndexSegFlushed = true;
@@ -195,16 +206,17 @@ public class MsgFileStore implements Closeable {
                 this.indexSegments.append(new FileSegment(newIndexOffset,
                     newIndexFile, SegmentType.INDEX));
             }
-            // check whether need to flush to disk.
-            currTime = System.currentTimeMillis();
+            // check whether we need to flush to disk.
             pendingMsgSizeExceed = (messageStore.getUnflushDataHold() > 0)
                     && (curUnflushSize.get() >= messageStore.getUnflushDataHold());
-            pendingMsgCntExceed = this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold();
-            pendingMsgTimeExceed = currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval();
-            boolean isSegmentRollOver =  isDataSegFlushed || isIndexSegFlushed;
-
-            if (pendingMsgCntExceed || pendingMsgTimeExceed || pendingMsgSizeExceed || isSegmentRollOver) {
-                isForceMetadata = isSegmentRollOver || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR);
+            pendingMsgCntExceed =
+                    (this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold());
+            pendingMsgTimeExceed =
+                    (currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval());
+            if (pendingMsgCntExceed || pendingMsgTimeExceed
+                    || pendingMsgSizeExceed || isDataSegFlushed || isIndexSegFlushed) {
+                isForceMetadata = (isDataSegFlushed || isIndexSegFlushed
+                    || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR));
                 if (!isDataSegFlushed) {
                     curDataSeg.flush(isForceMetadata);
                 }
@@ -213,7 +225,7 @@ public class MsgFileStore implements Closeable {
                 }
                 flushedMsgCnt = this.curUnflushed.getAndSet(0);
                 flushedDataSize = this.curUnflushSize.getAndSet(0);
-                this.lastFlushTime.set(System.currentTimeMillis());
+                this.lastFlushTime.set(currTime);
                 if (isForceMetadata) {
                     this.lastMetaFlushTime.set(this.lastFlushTime.get());
                 }
@@ -243,18 +255,21 @@ public class MsgFileStore implements Closeable {
         } finally {
             this.writeLock.unlock();
             // add statistics.
-            msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize,
-                    flushedMsgCnt, flushedDataSize, isDataSegFlushed, isIndexSegFlushed,
-                    pendingMsgSizeExceed, pendingMsgCntExceed, pendingMsgTimeExceed, isForceMetadata);
-            if (isDataSegFlushed) {
-                logger.info(sb.append("[File Store] Created data segment ")
-                        .append(newDataFilePath).toString());
-                sb.delete(0, sb.length());
-            }
-            if (isIndexSegFlushed) {
-                logger.info(sb.append("[File Store] Created index segment ")
-                        .append(newIndexFilePath).toString());
-                sb.delete(0, sb.length());
+            if (fileStoreOK) {
+                msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize,
+                        flushedMsgCnt, flushedDataSize, isDataSegFlushed, isIndexSegFlushed,
+                        pendingMsgSizeExceed, pendingMsgCntExceed, pendingMsgTimeExceed,
+                        isForceMetadata, System.currentTimeMillis() - currTime);
+                if (isDataSegFlushed) {
+                    logger.info(sb.append("[File Store] Created data segment ")
+                            .append(newDataFilePath).toString());
+                    sb.delete(0, sb.length());
+                }
+                if (isIndexSegFlushed) {
+                    logger.info(sb.append("[File Store] Created index segment ")
+                            .append(newIndexFilePath).toString());
+                    sb.delete(0, sb.length());
+                }
             }
         }
         return new Tuple3<>(fileStoreOK, indexOffset, dataOffset);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index e8a432b73..40bba25c2 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -138,10 +138,13 @@ public class MsgMemStore implements Closeable {
         this.writeLock.lock();
         try {
             // judge whether can write to memory or not.
-            if ((fullDataSize = (this.cacheDataOffset.get() + dataEntryLength > this.maxDataCacheSize))
-                    || (fullCount = (this.curMessageCount.get() + 1 > maxAllowedMsgCount))
-                    || (fullIndexSize =
-                    (this.cacheIndexOffset.get() + DataStoreUtils.STORE_INDEX_HEAD_LEN > this.maxIndexCacheSize))) {
+            fullDataSize =
+                    (this.cacheDataOffset.get() + dataEntryLength > this.maxDataCacheSize);
+            fullCount =
+                    (this.curMessageCount.get() + 1 > maxAllowedMsgCount);
+            fullIndexSize =
+                    (this.cacheIndexOffset.get() + DataStoreUtils.STORE_INDEX_HEAD_LEN > this.maxIndexCacheSize);
+            if (fullDataSize || fullCount || fullIndexSize) {
                 isAppended = false;
                 return false;
             }
@@ -163,9 +166,7 @@ public class MsgMemStore implements Closeable {
             }
         } finally {
             this.writeLock.unlock();
-            if (isAppended) {
-                memStatsHolder.addMsgWriteSuccess(dataEntryLength);
-            } else {
+            if (!isAppended) {
                 memStatsHolder.addCacheFullType(fullDataSize, fullIndexSize, fullCount);
             }
         }
@@ -329,7 +330,7 @@ public class MsgMemStore implements Closeable {
         tmpIndexBuffer.flip();
         tmpDataReadBuf.flip();
         long startTime = System.currentTimeMillis();
-        msgFileStore.appendMsg(strBuffer, curMessageCount.get(),
+        msgFileStore.appendMsg(true, startTime, strBuffer, curMessageCount.get(),
             cacheIndexOffset.get(), tmpIndexBuffer, cacheDataOffset.get(),
                 tmpDataReadBuf, leftAppendTime.get(), rightAppendTime.get());
         BrokerSrvStatsHolder.updDiskSyncDataDlt(System.currentTimeMillis() - startTime);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
index cb467528c..cb9cdb353 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -255,8 +255,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         OffsetStorageInfo regInfo =
                 loadOrCreateOffset(group, topic, partitionId, offsetCacheKey, 0);
         if ((tmpOffset == 0) && (!regInfo.isFirstCreate())) {
-            updatedOffset = regInfo.getOffset();
-            return updatedOffset;
+            return regInfo.getOffset();
         }
         updatedOffset = regInfo.addAndGetOffset(tmpOffset);
         if (logger.isDebugEnabled()) {
@@ -462,6 +461,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
      */
     @Override
     public Map<String, OffsetRecordInfo> getOnlineGroupOffsetInfo() {
+        OffsetRecordInfo recordInfo;
+        Map<String, OffsetStorageInfo> storeMap;
         Map<String, OffsetRecordInfo> result = new HashMap<>();
         for (Map.Entry<String,
                 ConcurrentHashMap<String, OffsetStorageInfo>> entry : cfmOffsetMap.entrySet()) {
@@ -469,7 +470,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
                 continue;
             }
             // read offset map information
-            Map<String, OffsetStorageInfo> storeMap = entry.getValue();
+            storeMap = entry.getValue();
             if (storeMap.isEmpty()) {
                 continue;
             }
@@ -477,7 +478,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
                 if (storageInfo == null) {
                     continue;
                 }
-                OffsetRecordInfo recordInfo = result.get(entry.getKey());
+                recordInfo = result.get(entry.getKey());
                 if (recordInfo == null) {
                     recordInfo = new OffsetRecordInfo(
                             brokerConfig.getBrokerId(), entry.getKey());
@@ -740,5 +741,4 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         return (lagValue > TServerConstants.CFG_OFFSET_RESET_MID_ALARM_CHECK)
                 ? 2 : (lagValue > TServerConstants.CFG_OFFSET_RESET_MIN_ALARM_CHECK) ? 1 : 0;
     }
-
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
index ae8ac4ca3..a238d93d8 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
@@ -67,12 +67,15 @@ public class MsgStoreStatsHolder {
      * Add write message success statistics.
      *
      * @param msgSize   the message size
+     * @param writeDlt  write duration
      */
-    public void addMsgWriteSuccess(int msgSize) {
+    public void addMsgWriteSuccess(int msgSize, long writeDlt) {
         if (isClosed) {
             return;
         }
-        msgStoreStatsSets[getIndex()].msgAppendSizeStats.update(msgSize);
+        MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
+        tmStatsSet.msgAppendSizeStats.update(msgSize);
+        tmStatsSet.msgAppendDurStats.update(writeDlt);
     }
 
     /**
@@ -133,18 +136,12 @@ public class MsgStoreStatsHolder {
     /**
      * Add cache timeout flush statistic.
      *
-     * @param flushTime          the flush time
-     * @param isTimeoutFlush     whether is timeout flush
      */
-    public void addCacheTimeoutFlush(long flushTime, boolean isTimeoutFlush) {
+    public void addCacheTimeoutFlush() {
         if (isClosed) {
             return;
         }
-        MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
-        tmStatsSet.cacheSyncStats.update(flushTime);
-        if (isTimeoutFlush) {
-            tmStatsSet.cacheTimeFullCnt.incValue();
-        }
+        msgStoreStatsSets[getIndex()].cacheTimeFullCnt.incValue();
     }
 
     /**
@@ -161,12 +158,14 @@ public class MsgStoreStatsHolder {
      * @param isMsgCntFull       whether the cached message count is full
      * @param isCacheTimeFull    whether the cached time is full
      * @param isForceMetadata    whether force push metadata
+     * @param dltAppendTime      the duration of the append operation
      */
     public void addFileFlushStatsInfo(int msgCnt, int msgIndexSize, int msgDataSize,
                                       long flushedMsgCnt, long flushedDataSize,
                                       boolean isDataSegFlush, boolean isIndexSegFlush,
                                       boolean isDataSizeFull, boolean isMsgCntFull,
-                                      boolean isCacheTimeFull, boolean isForceMetadata) {
+                                      boolean isCacheTimeFull, boolean isForceMetadata,
+                                      long dltAppendTime) {
         if (isClosed) {
             return;
         }
@@ -174,6 +173,7 @@ public class MsgStoreStatsHolder {
         tmStatsSet.fileAccumMsgCnt.addValue(msgCnt);
         tmStatsSet.fileAccumMsgIndexSize.addValue(msgIndexSize);
         tmStatsSet.fileAccumMsgDataSize.addValue(msgDataSize);
+        tmStatsSet.fileFlusheDurStats.update(dltAppendTime);
         if (flushedDataSize > 0) {
             tmStatsSet.fileFlushedDataSize.update(flushedDataSize);
         }
@@ -401,6 +401,7 @@ public class MsgStoreStatsHolder {
         statsMap.put("isClosed", (isStatsClosed() ? 1L : 0L));
         // for memory store
         statsSet.msgAppendSizeStats.getValue(statsMap, false);
+        statsSet.msgAppendDurStats.getValue(statsMap, false);
         statsMap.put(statsSet.msgAppendFailCnt.getFullName(),
                 statsSet.msgAppendFailCnt.getValue());
         statsMap.put(statsSet.cacheDataSizeFullCnt.getFullName(),
@@ -415,7 +416,6 @@ public class MsgStoreStatsHolder {
                 statsSet.cacheFlushPendingCnt.getValue());
         statsMap.put(statsSet.cacheReAllocCnt.getFullName(),
                 statsSet.cacheReAllocCnt.getValue());
-        statsSet.cacheSyncStats.getValue(statsMap, false);
         // for file store
         statsMap.put(statsSet.fileAccumMsgCnt.getFullName(),
                 statsSet.fileAccumMsgCnt.getValue());
@@ -423,6 +423,7 @@ public class MsgStoreStatsHolder {
                 statsSet.fileAccumMsgDataSize.getValue());
         statsMap.put(statsSet.fileAccumMsgIndexSize.getFullName(),
                 statsSet.fileAccumMsgIndexSize.getValue());
+        statsSet.fileFlusheDurStats.getValue(statsMap, false);
         statsSet.fileFlushedDataSize.getValue(statsMap, false);
         statsSet.fileFlushedMsgCnt.getValue(statsMap, false);
         statsMap.put(statsSet.fileDataSegAddCnt.getFullName(),
@@ -460,6 +461,8 @@ public class MsgStoreStatsHolder {
                 .append("\":\"").append(statsSet.resetTime.getStrSinceTime())
                 .append("\",\"isClosed\":").append(isStatsClosed()).append(",");
         statsSet.msgAppendSizeStats.getValue(strBuff, false);
+        strBuff.append(",");
+        statsSet.msgAppendDurStats.getValue(strBuff, false);
         strBuff.append(",\"").append(statsSet.msgAppendFailCnt.getFullName())
                 .append("\":").append(statsSet.msgAppendFailCnt.getValue())
                 .append(",\"").append(statsSet.cacheDataSizeFullCnt.getFullName())
@@ -474,15 +477,15 @@ public class MsgStoreStatsHolder {
                 .append("\":").append(statsSet.cacheReAllocCnt.getValue())
                 .append(",\"").append(statsSet.cacheDataSizeFullCnt.getFullName())
                 .append("\":").append(statsSet.cacheDataSizeFullCnt.getValue())
-                .append(",");
-        statsSet.cacheSyncStats.getValue(strBuff, false);
-        strBuff.append(",\"").append(statsSet.fileAccumMsgCnt.getFullName())
+                .append(",\"").append(statsSet.fileAccumMsgCnt.getFullName())
                 .append("\":").append(statsSet.fileAccumMsgCnt.getValue())
                 .append(",\"").append(statsSet.fileAccumMsgDataSize.getFullName())
                 .append("\":").append(statsSet.fileAccumMsgDataSize.getValue())
                 .append(",\"").append(statsSet.fileAccumMsgIndexSize.getFullName())
                 .append("\":").append(statsSet.fileAccumMsgIndexSize.getValue())
                 .append(",");
+        statsSet.fileFlusheDurStats.getValue(strBuff, false);
+        strBuff.append(",");
         statsSet.fileFlushedDataSize.getValue(strBuff, false);
         strBuff.append(",");
         statsSet.fileFlushedMsgCnt.getValue(strBuff, false);
@@ -519,6 +522,9 @@ public class MsgStoreStatsHolder {
         // The message size received
         protected final SimpleHistogram msgAppendSizeStats =
                 new SimpleHistogram("msg_append_size", null);
+        // The duration of message written
+        protected final ESTHistogram msgAppendDurStats =
+                new ESTHistogram("msg_append_dlt", null);
         // The count of message append failures
         protected final LongStatsCounter msgAppendFailCnt =
                 new LongStatsCounter("msg_append_fail", null);
@@ -540,9 +546,6 @@ public class MsgStoreStatsHolder {
         // The cache re-alloc count
         protected final LongStatsCounter cacheReAllocCnt =
                 new LongStatsCounter("cache_realloc", null);
-        // The cache persistence duration statistics
-        protected final ESTHistogram cacheSyncStats =
-                new ESTHistogram("cache_flush_dlt", null);
         // for file store
         // The accumulate message count statistics
         protected final LongStatsCounter fileAccumMsgCnt =
@@ -553,6 +556,9 @@ public class MsgStoreStatsHolder {
         // The accumulate message index statistics
         protected final LongStatsCounter fileAccumMsgIndexSize =
                 new LongStatsCounter("file_total_index_size", null);
+        // statistics on file flush time
+        protected final ESTHistogram fileFlusheDurStats =
+                new ESTHistogram("file_flush_dlt", null);
         // The data flushed statistics
         protected final SimpleHistogram fileFlushedDataSize =
                 new SimpleHistogram("file_flush_data_size", null);
@@ -594,6 +600,7 @@ public class MsgStoreStatsHolder {
             // for file metric items
             this.fileAccumMsgCnt.clear();
             this.fileAccumMsgDataSize.clear();
+            this.fileFlusheDurStats.clear();
             this.fileFlushedDataSize.clear();
             this.fileAccumMsgIndexSize.clear();
             this.fileFlushedMsgCnt.clear();
@@ -605,6 +612,7 @@ public class MsgStoreStatsHolder {
             this.fileCachedTimeFullCnt.clear();
             // for message metric items
             this.msgAppendSizeStats.clear();
+            this.msgAppendDurStats.clear();
             this.msgAppendFailCnt.clear();
             // for cache metric items
             this.cacheDataSizeFullCnt.clear();
@@ -613,7 +621,6 @@ public class MsgStoreStatsHolder {
             this.cacheFlushPendingCnt.clear();
             this.cacheReAllocCnt.clear();
             this.cacheTimeFullCnt.clear();
-            this.cacheSyncStats.clear();
             this.resetTime.reset();
         }
     }
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
index c7a81312f..cea7777bb 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
@@ -32,12 +32,11 @@ public class MsgStoreStatsHolderTest {
     public void testMemPartStats() {
         MsgStoreStatsHolder msgStoreStatsHolder = new MsgStoreStatsHolder();
         // case 1, not started
-        msgStoreStatsHolder.addMsgWriteSuccess(50);
+        msgStoreStatsHolder.addMsgWriteSuccess(50, 2);
         msgStoreStatsHolder.addCacheFullType(true, false, false);
         msgStoreStatsHolder.addCacheFullType(false, true, false);
         msgStoreStatsHolder.addCacheFullType(false, false, true);
-        msgStoreStatsHolder.addCacheTimeoutFlush(50, false);
-        msgStoreStatsHolder.addCacheTimeoutFlush(10, true);
+        msgStoreStatsHolder.addCacheTimeoutFlush();
         msgStoreStatsHolder.addMsgWriteFailure();
         Map<String, Long> retMap = new LinkedHashMap<>();
         msgStoreStatsHolder.getValue(retMap);
@@ -52,9 +51,6 @@ public class MsgStoreStatsHolderTest {
         Assert.assertEquals(0, retMap.get("cache_time_full").longValue());
         Assert.assertEquals(0, retMap.get("cache_flush_pending").longValue());
         Assert.assertEquals(0, retMap.get("cache_realloc").longValue());
-        Assert.assertEquals(0, retMap.get("cache_flush_dlt_count").longValue());
-        Assert.assertEquals(Long.MIN_VALUE, retMap.get("cache_flush_dlt_max").longValue());
-        Assert.assertEquals(Long.MAX_VALUE, retMap.get("cache_flush_dlt_min").longValue());
         Assert.assertNotNull(retMap.get("end_time"));
         retMap.clear();
         // get content by StringBuilder
@@ -66,16 +62,14 @@ public class MsgStoreStatsHolderTest {
         // System.out.println("getAllMemStatsInfo : " + strBuff);
         strBuff.delete(0, strBuff.length());
         // case 2 started
-        msgStoreStatsHolder.addMsgWriteSuccess(50);
-        msgStoreStatsHolder.addMsgWriteSuccess(500);
-        msgStoreStatsHolder.addMsgWriteSuccess(5);
+        msgStoreStatsHolder.addMsgWriteSuccess(50, 10);
+        msgStoreStatsHolder.addMsgWriteSuccess(500, 20);
+        msgStoreStatsHolder.addMsgWriteSuccess(5, 3);
         msgStoreStatsHolder.addCacheFullType(true, false, false);
         msgStoreStatsHolder.addCacheFullType(false, true, false);
         msgStoreStatsHolder.addCacheFullType(false, false, true);
-        msgStoreStatsHolder.addCacheTimeoutFlush(50, false);
-        msgStoreStatsHolder.addCacheTimeoutFlush(10, true);
-        msgStoreStatsHolder.addCacheTimeoutFlush(100, true);
-        msgStoreStatsHolder.addCacheTimeoutFlush(1, false);
+        msgStoreStatsHolder.addCacheTimeoutFlush();
+        msgStoreStatsHolder.addCacheTimeoutFlush();
         msgStoreStatsHolder.addMsgWriteFailure();
         msgStoreStatsHolder.addMsgWriteFailure();
         msgStoreStatsHolder.addCacheReAlloc();
@@ -95,16 +89,9 @@ public class MsgStoreStatsHolderTest {
         Assert.assertEquals(2, retMap.get("cache_time_full").longValue());
         Assert.assertEquals(3, retMap.get("cache_flush_pending").longValue());
         Assert.assertEquals(2, retMap.get("cache_realloc").longValue());
-        Assert.assertEquals(4, retMap.get("cache_flush_dlt_count").longValue());
-        Assert.assertEquals(100, retMap.get("cache_flush_dlt_max").longValue());
-        Assert.assertEquals(1, retMap.get("cache_flush_dlt_min").longValue());
-        Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_0t2").longValue());
-        Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_8t16").longValue());
-        Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_32t64").longValue());
-        Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_64t128").longValue());
         Assert.assertNotNull(retMap.get("end_time"));
         msgStoreStatsHolder.getMsgStoreStatsInfo(false, strBuff);
-        // System.out.println("\n the second is : " + strBuff.toString());
+        System.out.println("\n the second is : " + strBuff.toString());
         strBuff.delete(0, strBuff.length());
     }
 
@@ -114,7 +101,7 @@ public class MsgStoreStatsHolderTest {
         // case 1, not started
         msgStoreStatsHolder.addFileFlushStatsInfo(2, 30, 500,
                 0, 0, true, true,
-                true, true, true, true);
+                true, true, true, true, 5);
         msgStoreStatsHolder.addFileTimeoutFlushStats(1, 500, false);
         Map<String, Long> retMap = new LinkedHashMap<>();
         msgStoreStatsHolder.getValue(retMap);
@@ -133,6 +120,9 @@ public class MsgStoreStatsHolderTest {
         Assert.assertEquals(0, retMap.get("file_data_full").longValue());
         Assert.assertEquals(0, retMap.get("file_count_full").longValue());
         Assert.assertEquals(0, retMap.get("file_time_full").longValue());
+        Assert.assertEquals(0, retMap.get("file_flush_dlt_count").longValue());
+        Assert.assertEquals(Long.MAX_VALUE, retMap.get("file_flush_dlt_min").longValue());
+        Assert.assertEquals(Long.MIN_VALUE, retMap.get("file_flush_dlt_max").longValue());
         Assert.assertNotNull(retMap.get("end_time"));
         retMap.clear();
         // get content by StringBuilder
@@ -159,27 +149,36 @@ public class MsgStoreStatsHolderTest {
         Assert.assertEquals(0, retMap.get("file_data_full").longValue());
         Assert.assertEquals(0, retMap.get("file_count_full").longValue());
         Assert.assertEquals(1, retMap.get("file_time_full").longValue());
+        Assert.assertEquals(0, retMap.get("file_flush_dlt_count").longValue());
+        Assert.assertEquals(Long.MAX_VALUE, retMap.get("file_flush_dlt_min").longValue());
+        Assert.assertEquals(Long.MIN_VALUE, retMap.get("file_flush_dlt_max").longValue());
         Assert.assertNotNull(retMap.get("end_time"));
         retMap.clear();
         // get value when started
         msgStoreStatsHolder.addFileFlushStatsInfo(1, 1, 1,
                 1, 1, true, false,
-                false, false, false, false);
+                false, false, false, false,
+                6);
         msgStoreStatsHolder.addFileFlushStatsInfo(6, 6, 6,
                 6, 6, false, false,
-                false, false, false, true);
+                false, false, false, true,
+                100);
         msgStoreStatsHolder.addFileFlushStatsInfo(2, 2, 2,
                 2, 2, false, true,
-                false, false, false, false);
+                false, false, false, false,
+                10);
         msgStoreStatsHolder.addFileFlushStatsInfo(5, 5, 5,
                 5, 5, false, false,
-                false, false, true, false);
+                false, false, true, false,
+                200);
         msgStoreStatsHolder.addFileFlushStatsInfo(4, 4, 4,
                 4, 4, false, false,
-                false, true, false, false);
+                false, true, false, false,
+                50);
         msgStoreStatsHolder.addFileFlushStatsInfo(3, 3, 3,
                 3, 3, false, false,
-                true, false, false, false);
+                true, false, false, false,
+                150);
         msgStoreStatsHolder.snapShort(retMap);
         Assert.assertNotNull(retMap.get("reset_time"));
         Assert.assertEquals(21, retMap.get("file_total_msg_cnt").longValue());
@@ -197,6 +196,9 @@ public class MsgStoreStatsHolderTest {
         Assert.assertEquals(2, retMap.get("file_meta_flush").longValue());
         Assert.assertEquals(1, retMap.get("file_count_full").longValue());
         Assert.assertEquals(2, retMap.get("file_time_full").longValue());
+        Assert.assertEquals(6, retMap.get("file_flush_dlt_count").longValue());
+        Assert.assertEquals(6, retMap.get("file_flush_dlt_min").longValue());
+        Assert.assertEquals(200, retMap.get("file_flush_dlt_max").longValue());
         Assert.assertNotNull(retMap.get("end_time"));
         retMap.clear();
         msgStoreStatsHolder.getMsgStoreStatsInfo(true, strBuff);