You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/01 03:45:56 UTC
[inlong] branch master updated: [INLONG-5303][TubeMQ] Simplify MsgFileStore code (#5304)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new aec7f658a [INLONG-5303][TubeMQ] Simplify MsgFileStore code (#5304)
aec7f658a is described below
commit aec7f658a3c8f2b5a87189eea0004d89ada28929
Author: AloysZhang <lo...@gmail.com>
AuthorDate: Mon Aug 1 11:45:50 2022 +0800
[INLONG-5303][TubeMQ] Simplify MsgFileStore code (#5304)
---
.../server/broker/msgstore/disk/MsgFileStore.java | 23 +++++++++++-----------
1 file changed, 11 insertions(+), 12 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index af5eebfcb..9c9723418 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -141,9 +141,9 @@ public class MsgFileStore implements Closeable {
// Various parameters that trigger data refresh
boolean isDataSegFlushed = false;
boolean isIndexSegFlushed = false;
- boolean isMsgCntFlushed = false;
- boolean isMsgDataFlushed = false;
- boolean isMsgTimeFlushed = false;
+ boolean pendingMsgCntExceed = false;
+ boolean pendingMsgSizeExceed = false;
+ boolean pendingMsgTimeExceed = false;
boolean isForceMetadata = false;
// flushed message message count and data size info
long flushedMsgCnt = 0;
@@ -197,15 +197,14 @@ public class MsgFileStore implements Closeable {
}
// check whether need to flush to disk.
currTime = System.currentTimeMillis();
- isMsgDataFlushed = (messageStore.getUnflushDataHold() > 0)
+ pendingMsgSizeExceed = (messageStore.getUnflushDataHold() > 0)
&& (curUnflushSize.get() >= messageStore.getUnflushDataHold());
- if ((isMsgCntFlushed =
- (this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold()))
- || (isMsgTimeFlushed =
- (currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval()))
- || isMsgDataFlushed || isDataSegFlushed || isIndexSegFlushed) {
- isForceMetadata = (isDataSegFlushed || isIndexSegFlushed
- || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR));
+ pendingMsgCntExceed = this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold();
+ pendingMsgTimeExceed = currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval();
+ boolean isSegmentRollOver = isDataSegFlushed || isIndexSegFlushed;
+
+ if (pendingMsgCntExceed || pendingMsgTimeExceed || pendingMsgSizeExceed || isSegmentRollOver) {
+ isForceMetadata = isSegmentRollOver || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR);
if (!isDataSegFlushed) {
curDataSeg.flush(isForceMetadata);
}
@@ -246,7 +245,7 @@ public class MsgFileStore implements Closeable {
// add statistics.
msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize,
flushedMsgCnt, flushedDataSize, isDataSegFlushed, isIndexSegFlushed,
- isMsgDataFlushed, isMsgCntFlushed, isMsgTimeFlushed, isForceMetadata);
+ pendingMsgSizeExceed, pendingMsgCntExceed, pendingMsgTimeExceed, isForceMetadata);
if (isDataSegFlushed) {
logger.info(sb.append("[File Store] Created data segment ")
.append(newDataFilePath).toString());