You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/01 03:45:56 UTC

[inlong] branch master updated: [INLONG-5303][TubeMQ] Simplify MsgFileStore code (#5304)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new aec7f658a [INLONG-5303][TubeMQ] Simplify MsgFileStore code (#5304)
aec7f658a is described below

commit aec7f658a3c8f2b5a87189eea0004d89ada28929
Author: AloysZhang <lo...@gmail.com>
AuthorDate: Mon Aug 1 11:45:50 2022 +0800

    [INLONG-5303][TubeMQ] Simplify MsgFileStore code (#5304)
---
 .../server/broker/msgstore/disk/MsgFileStore.java  | 23 +++++++++++-----------
 1 file changed, 11 insertions(+), 12 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index af5eebfcb..9c9723418 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -141,9 +141,9 @@ public class MsgFileStore implements Closeable {
         // Various parameters that trigger data refresh
         boolean isDataSegFlushed = false;
         boolean isIndexSegFlushed = false;
-        boolean isMsgCntFlushed = false;
-        boolean isMsgDataFlushed = false;
-        boolean isMsgTimeFlushed = false;
+        boolean pendingMsgCntExceed = false;
+        boolean pendingMsgSizeExceed = false;
+        boolean pendingMsgTimeExceed = false;
         boolean isForceMetadata = false;
         // flushed message message count and data size info
         long flushedMsgCnt = 0;
@@ -197,15 +197,14 @@ public class MsgFileStore implements Closeable {
             }
             // check whether need to flush to disk.
             currTime = System.currentTimeMillis();
-            isMsgDataFlushed = (messageStore.getUnflushDataHold() > 0)
+            pendingMsgSizeExceed = (messageStore.getUnflushDataHold() > 0)
                     && (curUnflushSize.get() >= messageStore.getUnflushDataHold());
-            if ((isMsgCntFlushed =
-                    (this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold()))
-                || (isMsgTimeFlushed =
-                    (currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval()))
-                || isMsgDataFlushed || isDataSegFlushed || isIndexSegFlushed) {
-                isForceMetadata = (isDataSegFlushed || isIndexSegFlushed
-                    || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR));
+            pendingMsgCntExceed = this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold();
+            pendingMsgTimeExceed = currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval();
+            boolean isSegmentRollOver =  isDataSegFlushed || isIndexSegFlushed;
+
+            if (pendingMsgCntExceed || pendingMsgTimeExceed || pendingMsgSizeExceed || isSegmentRollOver) {
+                isForceMetadata = isSegmentRollOver || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR);
                 if (!isDataSegFlushed) {
                     curDataSeg.flush(isForceMetadata);
                 }
@@ -246,7 +245,7 @@ public class MsgFileStore implements Closeable {
             // add statistics.
             msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize,
                     flushedMsgCnt, flushedDataSize, isDataSegFlushed, isIndexSegFlushed,
-                    isMsgDataFlushed, isMsgCntFlushed, isMsgTimeFlushed, isForceMetadata);
+                    pendingMsgSizeExceed, pendingMsgCntExceed, pendingMsgTimeExceed, isForceMetadata);
             if (isDataSegFlushed) {
                 logger.info(sb.append("[File Store] Created data segment ")
                         .append(newDataFilePath).toString());