You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/27 06:17:09 UTC
[inlong] branch master updated: [INLONG-6035][TubeMQ] Add Broker's message append and file refresh delay statistics (#6036)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6bbb5d99d [INLONG-6035][TubeMQ] Add Broker's message append and file refresh delay statistics (#6036)
6bbb5d99d is described below
commit 6bbb5d99d09f4de5f26f79f2b04ab9905014743f
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Sep 27 14:17:04 2022 +0800
[INLONG-6035][TubeMQ] Add Broker's message append and file refresh delay statistics (#6036)
---
.../inlong/tubemq/server/broker/TubeBroker.java | 63 +++++++++---------
.../server/broker/metadata/TopicMetadata.java | 2 +-
.../server/broker/msgstore/MessageStore.java | 47 +++++++++-----
.../broker/msgstore/MessageStoreManager.java | 23 +++----
.../server/broker/msgstore/disk/MsgFileStore.java | 75 +++++++++++++---------
.../server/broker/msgstore/mem/MsgMemStore.java | 17 ++---
.../server/broker/offset/DefaultOffsetManager.java | 10 +--
.../server/broker/stats/MsgStoreStatsHolder.java | 45 +++++++------
.../broker/stats/MsgStoreStatsHolderTest.java | 58 +++++++++--------
9 files changed, 191 insertions(+), 149 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
index fb248059f..397ccfd1f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
@@ -320,24 +320,24 @@ public class TubeBroker implements Stoppable {
.append(TubeServerVersion.SERVER_VERSION).toString();
}
- private void procConfigFromHeartBeat(StringBuilder sBuilder,
+ private void procConfigFromHeartBeat(StringBuilder strBuff,
HeartResponseM2B response) {
// process service status
ServiceStatusHolder
.setReadWriteServiceStatus(response.getStopRead(),
response.getStopWrite(), "Master");
- // process flow controller rules
- FlowCtrlRuleHandler flowCtrlRuleHandler =
+ // update default flow controller rules
+ FlowCtrlRuleHandler defFlowCtrlHandler =
metadataManager.getFlowCtrlRuleHandler();
- long flowCheckId = flowCtrlRuleHandler.getFlowCtrlId();
- int qryPriorityId = flowCtrlRuleHandler.getQryPriorityId();
+ long flowCheckId = defFlowCtrlHandler.getFlowCtrlId();
+ int qryPriorityId = defFlowCtrlHandler.getQryPriorityId();
if (response.hasFlowCheckId()) {
qryPriorityId = response.hasQryPriorityId()
? response.getQryPriorityId() : qryPriorityId;
if (response.getFlowCheckId() != flowCheckId) {
flowCheckId = response.getFlowCheckId();
try {
- flowCtrlRuleHandler
+ defFlowCtrlHandler
.updateFlowCtrlInfo(qryPriorityId,
flowCheckId, response.getFlowControlInfo());
} catch (Exception e1) {
@@ -345,8 +345,8 @@ public class TubeBroker implements Stoppable {
"[HeartBeat response] found parse flowCtrl rules failure", e1);
}
}
- if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
- flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+ if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) {
+ defFlowCtrlHandler.setQryPriorityId(qryPriorityId);
}
}
// update configure report requirement
@@ -356,36 +356,37 @@ public class TubeBroker implements Stoppable {
long configId = response.getClsConfig().getConfigId();
if (configId != ClusterConfigHolder.getConfigId()) {
ClusterConfigHolder.updClusterSetting(response.getClsConfig());
- logger.info(sBuilder
- .append("[HeartBeat response] received cluster configure changed,")
+ logger.info(strBuff
+ .append("[HeartBeat response] received cluster configure changed")
.append(",hasClsConfig=").append(response.hasClsConfig())
.append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId())
.append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize())
.append(",minMemCacheSize=")
.append(ClusterConfigHolder.getMinMemCacheSize())
.toString());
- sBuilder.delete(0, sBuilder.length());
+ strBuff.delete(0, strBuff.length());
}
}
if (response.getTakeConfInfo()) {
- logger.info(sBuilder
+ logger.info(strBuff
.append("[HeartBeat response] received broker metadata info: brokerConfId=")
.append(response.getCurBrokerConfId())
.append(",stopWrite=").append(response.getStopWrite())
.append(",stopRead=").append(response.getStopRead())
.append(",configCheckSumId=").append(response.getConfCheckSumId())
.append(",hasFlowCtrl=").append(response.hasFlowCheckId())
- .append(",curFlowCtrlId=").append(flowCheckId)
- .append(",curQryPriorityId=").append(qryPriorityId)
+ .append(",curFlowCtrlId=").append(defFlowCtrlHandler.getFlowCtrlId())
+ .append(",curQryPriorityId=").append(defFlowCtrlHandler.getQryPriorityId())
+ .append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize())
.append(",brokerDefaultConfInfo=")
.append(response.getBrokerDefaultConfInfo())
.append(",brokerTopicSetConfList=")
- .append(response.getBrokerTopicSetConfInfoList().toString()).toString());
- sBuilder.delete(0, sBuilder.length());
+ .append(response.getBrokerTopicSetConfInfoList()).toString());
+ strBuff.delete(0, strBuff.length());
metadataManager
.updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
response.getConfCheckSumId(), response.getBrokerDefaultConfInfo(),
- response.getBrokerTopicSetConfInfoList(), false, sBuilder);
+ response.getBrokerTopicSetConfInfoList(), false, strBuff);
}
// update auth info
if (response.hasBrokerAuthorizedInfo()) {
@@ -395,7 +396,7 @@ public class TubeBroker implements Stoppable {
boolean needProcess =
metadataManager.updateBrokerRemoveTopicMap(
response.getTakeRemoveTopicInfo(),
- response.getRemoveTopicConfInfoList(), sBuilder);
+ response.getRemoveTopicConfInfoList(), strBuff);
if (needProcess) {
new Thread() {
@Override
@@ -440,29 +441,29 @@ public class TubeBroker implements Stoppable {
}
}
- private void procConfigFromRegister(StringBuilder sBuilder,
+ private void procConfigFromRegister(StringBuilder strBuff,
final RegisterResponseM2B response) {
// process service status
ServiceStatusHolder
.setReadWriteServiceStatus(response.getStopRead(),
response.getStopWrite(), "Master");
- // process flow controller rules
- FlowCtrlRuleHandler flowCtrlRuleHandler =
+ // process default flow controller rules
+ FlowCtrlRuleHandler defFlowCtrlHandler =
metadataManager.getFlowCtrlRuleHandler();
if (response.hasFlowCheckId()) {
int qryPriorityId = response.hasQryPriorityId()
- ? response.getQryPriorityId() : flowCtrlRuleHandler.getQryPriorityId();
- if (response.getFlowCheckId() != flowCtrlRuleHandler.getFlowCtrlId()) {
+ ? response.getQryPriorityId() : defFlowCtrlHandler.getQryPriorityId();
+ if (response.getFlowCheckId() != defFlowCtrlHandler.getFlowCtrlId()) {
try {
- flowCtrlRuleHandler
+ defFlowCtrlHandler
.updateFlowCtrlInfo(response.getQryPriorityId(),
response.getFlowCheckId(), response.getFlowControlInfo());
} catch (Exception e1) {
logger.warn("[Register response] found parse flowCtrl rules failure", e1);
}
}
- if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
- flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+ if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) {
+ defFlowCtrlHandler.setQryPriorityId(qryPriorityId);
}
}
// update auth info
@@ -477,14 +478,14 @@ public class TubeBroker implements Stoppable {
ClusterConfigHolder.updClusterSetting(response.getClsConfig());
}
}
- sBuilder.append("[Register response] received broker metadata info: brokerConfId=")
+ strBuff.append("[Register response] received broker metadata info: brokerConfId=")
.append(response.getCurBrokerConfId())
.append(",stopWrite=").append(response.getStopWrite())
.append(",stopRead=").append(response.getStopRead())
.append(",configCheckSumId=").append(response.getConfCheckSumId())
.append(",hasFlowCtrl=").append(response.hasFlowCheckId())
- .append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
- .append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
+ .append(",curFlowCtrlId=").append(defFlowCtrlHandler.getFlowCtrlId())
+ .append(",curQryPriorityId=").append(defFlowCtrlHandler.getQryPriorityId())
.append(",hasClsConfig=").append(response.hasClsConfig())
.append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId())
.append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize())
@@ -501,10 +502,10 @@ public class TubeBroker implements Stoppable {
.append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo())
.append(",brokerTopicSetConfList=")
.append(response.getBrokerTopicSetConfInfoList().toString()).toString();
- sBuilder.delete(0, sBuilder.length());
+ strBuff.delete(0, strBuff.length());
metadataManager.updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
response.getConfCheckSumId(), response.getBrokerDefaultConfInfo(),
- response.getBrokerTopicSetConfInfoList(), true, sBuilder);
+ response.getBrokerTopicSetConfInfoList(), true, strBuff);
}
// build cluster configure info
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java
index 77116a3f2..c4ed4b32a 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/TopicMetadata.java
@@ -78,7 +78,7 @@ public class TopicMetadata {
return;
}
String[] topicConfInfoArr =
- topicMetaConfInfo.split(TokenConstants.ATTR_SEP);
+ topicMetaConfInfo.split(TokenConstants.ATTR_SEP, -1);
this.topic = topicConfInfoArr[0];
if (TStringUtils.isBlank(topicConfInfoArr[1])) {
this.numPartitions = brokerDefMetadata.getNumPartitions();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 8cc968a93..c147f4858 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -333,7 +333,8 @@ public class MessageStore implements Closeable {
.append("[Data Store] Closed MessageStore for storeKey ")
.append(this.storeKey).toString());
}
- if (timestamp <= this.msgFileStore.getIndexMaxAppendTime()) {
+ if (timestamp <= this.msgFileStore.getIndexMaxAppendTime()
+ || !tubeConfig.isEnableMemStore()) {
return this.msgFileStore.getStartOffsetByTimeStamp(timestamp);
}
this.writeCacheMutex.readLock().lock();
@@ -428,20 +429,27 @@ public class MessageStore implements Closeable {
indexBuffer.putLong(receivedTime);
indexBuffer.flip();
appendResult.putReceivedInfo(messageId, receivedTime);
+ boolean appendSuss = true;
+ long startTime = System.currentTimeMillis();
if (this.tubeConfig.isEnableMemStore()) {
do {
this.writeCacheMutex.readLock().lock();
try {
- if (this.msgMemStore.appendMsg(msgStoreStatsHolder,
+ appendSuss = this.msgMemStore.appendMsg(msgStoreStatsHolder,
partitionId, msgTypeCode, receivedTime, indexBuffer,
- msgBufLen, dataBuffer, appendResult)) {
- return true;
- }
+ msgBufLen, dataBuffer, appendResult);
} finally {
this.writeCacheMutex.readLock().unlock();
}
+ if (appendSuss) {
+ msgStoreStatsHolder.addMsgWriteSuccess(msgBufLen,
+ System.currentTimeMillis() - startTime);
+ return true;
+ }
if (triggerFlushAndAddMsg(true, false, partitionId, msgTypeCode,
receivedTime, indexBuffer, msgBufLen, dataBuffer, appendResult)) {
+ msgStoreStatsHolder.addMsgWriteSuccess(msgBufLen,
+ System.currentTimeMillis() - startTime);
return true;
}
ThreadUtils.sleep(waitRetryMs);
@@ -452,11 +460,17 @@ public class MessageStore implements Closeable {
StringBuilder strBuffer =
new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
Tuple3<Boolean, Long, Long> appendRet =
- this.msgFileStore.appendMsg(strBuffer, 1,
+ this.msgFileStore.appendMsg(false, startTime, strBuffer, 1,
DataStoreUtils.STORE_INDEX_HEAD_LEN, indexBuffer,
- msgBufLen, dataBuffer,receivedTime, receivedTime);
+ msgBufLen, dataBuffer, receivedTime, receivedTime);
appendResult.putAppendResult(appendRet.getF1(), appendRet.getF2());
- return true;
+ if (appendRet.getF0()) {
+ msgStoreStatsHolder.addMsgWriteSuccess(msgBufLen,
+ System.currentTimeMillis() - startTime);
+ } else {
+ msgStoreStatsHolder.addMsgWriteFailure();
+ }
+ return appendRet.getF0();
}
}
@@ -562,7 +576,7 @@ public class MessageStore implements Closeable {
strBuffer.delete(0, strBuffer.length());
if (tubeConfig.isEnableMemStore()) {
ThreadUtils.sleep(100);
- flush(System.currentTimeMillis(), strBuffer);
+ flush(strBuffer);
this.msgMemStore.close();
this.msgMemStoreBeingFlush.close();
this.executor.shutdown();
@@ -672,7 +686,7 @@ public class MessageStore implements Closeable {
*/
public long getIndexStoreSize() {
long totalSize = 0L;
- if (!tubeConfig.isEnableMemStore()) {
+ if (tubeConfig.isEnableMemStore()) {
this.writeCacheMutex.readLock().lock();
try {
if (this.msgMemStore.getCurMsgCount() > 0) {
@@ -698,7 +712,7 @@ public class MessageStore implements Closeable {
*/
public long getDataStoreSize() {
long totalSize = 0L;
- if (!tubeConfig.isEnableMemStore()) {
+ if (tubeConfig.isEnableMemStore()) {
this.writeCacheMutex.readLock().lock();
try {
if (this.msgMemStore.getCurMsgCount() > 0) {
@@ -777,15 +791,15 @@ public class MessageStore implements Closeable {
this.executor.execute(new Runnable() {
@Override
public void run() {
- long startTime2 = System.currentTimeMillis();
try {
final StringBuilder strBuffer = new StringBuilder(512);
- flush(startTime2, strBuffer);
+ flush(strBuffer);
} catch (Throwable e) {
logger.error("[Data Store] Error during flush", e);
} finally {
- msgStoreStatsHolder.addCacheTimeoutFlush(
- (System.currentTimeMillis() - startTime2), isTimeTrigger);
+ if (isTimeTrigger) {
+ msgStoreStatsHolder.addCacheTimeoutFlush();
+ }
}
}
});
@@ -819,7 +833,8 @@ public class MessageStore implements Closeable {
return false;
}
- private void flush(long startTime, StringBuilder strBuffer) throws IOException {
+ private void flush(StringBuilder strBuffer) throws IOException {
+ long startTime = System.currentTimeMillis();
flushMutex.lock();
this.lastMemFlushTime.set(System.currentTimeMillis());
try {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 1539c9294..5d9299058 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -421,16 +421,18 @@ public class MessageStoreManager implements StoreService {
if (qryTopicSet.isEmpty()) {
return topicPubStoreInfoMap;
}
+ Map<Integer, MessageStore> storeMap;
+ Map<Integer, TopicPubStoreInfo> storeInfoMap;
for (String topic : qryTopicSet) {
topicMetadata = confTopicInfo.get(topic);
if (topicMetadata == null) {
continue;
}
- Map<Integer, MessageStore> storeMap = dataStores.get(topic);
+ storeMap = dataStores.get(topic);
if (storeMap == null) {
continue;
}
- Map<Integer, TopicPubStoreInfo> storeInfoMap = new HashMap<>();
+ storeInfoMap = new HashMap<>();
for (Map.Entry<Integer, MessageStore> entry : storeMap.entrySet()) {
if (entry == null
|| entry.getKey() == null
@@ -439,11 +441,10 @@ public class MessageStoreManager implements StoreService {
}
store = entry.getValue();
for (Integer partitionId : topicMetadata.getPartIdsByStoreId(entry.getKey())) {
- TopicPubStoreInfo storeInfo =
- new TopicPubStoreInfo(topic, entry.getKey(), partitionId,
- store.getIndexMinOffset(), store.getIndexMaxOffset(),
- store.getDataMinOffset(), store.getDataMaxOffset());
- storeInfoMap.put(partitionId, storeInfo);
+ storeInfoMap.put(partitionId, new TopicPubStoreInfo(topic,
+ entry.getKey(), partitionId, store.getIndexMinOffset(),
+ store.getIndexMaxOffset(), store.getDataMinOffset(),
+ store.getDataMaxOffset()));
}
}
topicPubStoreInfoMap.put(topic, storeInfoMap);
@@ -460,12 +461,13 @@ public class MessageStoreManager implements StoreService {
@Override
public void getTopicPublishInfos(Map<String, OffsetRecordInfo> groupOffsetMap) {
MessageStore store = null;
+ Map<Integer, MessageStore> storeMap;
+ Map<String, Map<Integer, RecordItem>> topicOffsetMap;
for (Map.Entry<String, OffsetRecordInfo> entry : groupOffsetMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() == null) {
continue;
}
- Map<String, Map<Integer, RecordItem>> topicOffsetMap =
- entry.getValue().getOffsetMap();
+ topicOffsetMap = entry.getValue().getOffsetMap();
// Get offset records by topic
for (Map.Entry<String, Map<Integer, RecordItem>> entryTopic
: topicOffsetMap.entrySet()) {
@@ -475,8 +477,7 @@ public class MessageStoreManager implements StoreService {
continue;
}
// Get message store instance
- Map<Integer, MessageStore> storeMap =
- dataStores.get(entryTopic.getKey());
+ storeMap = dataStores.get(entryTopic.getKey());
if (storeMap == null) {
continue;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 9c9723418..f7fe541eb 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -118,6 +118,8 @@ public class MsgFileStore implements Closeable {
/**
* Append message to file segment
*
+ * @param fromMem whether data from memory directly
+ * @param currTime the current time
* @param sb string buffer
* @param msgCnt the record count to append
* @param indexSize the index buffer length
@@ -128,7 +130,8 @@ public class MsgFileStore implements Closeable {
* @param rightTime the latest record timestamp
* @return file storage status, the index and data offsets of the added message
*/
- public Tuple3<Boolean, Long, Long> appendMsg(StringBuilder sb, int msgCnt,
+ public Tuple3<Boolean, Long, Long> appendMsg(boolean fromMem, long currTime,
+ StringBuilder sb, int msgCnt,
int indexSize, ByteBuffer indexBuffer,
int dataSize, ByteBuffer dataBuffer,
long leftTime, long rightTime) {
@@ -145,7 +148,7 @@ public class MsgFileStore implements Closeable {
boolean pendingMsgSizeExceed = false;
boolean pendingMsgTimeExceed = false;
boolean isForceMetadata = false;
- // flushed message message count and data size info
+ // flushed message count and data size info
long flushedMsgCnt = 0;
long flushedDataSize = 0;
// Temporary variables in calculations
@@ -155,19 +158,29 @@ public class MsgFileStore implements Closeable {
long inDataOffset;
Segment curIndexSeg;
long indexOffset = -1;
- long currTime;
// new file paths of creating
String newDataFilePath = null;
String newIndexFilePath = null;
boolean fileStoreOK = false;
this.writeLock.lock();
try {
- inIndexOffset = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
- // filling data segment.
+ // position last segments
curDataSeg = this.dataSegments.last();
+ curIndexSeg = this.indexSegments.last();
+ // get inputted offsets
+ if (fromMem) {
+ inIndexOffset = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
+ inDataOffset = indexBuffer.getLong(DataStoreUtils.INDEX_POS_DATAOFFSET);
+ } else {
+ inIndexOffset = curIndexSeg.getLast();
+ inDataOffset = curDataSeg.getLast();
+ indexBuffer.putLong(DataStoreUtils.INDEX_POS_DATAOFFSET, inDataOffset);
+ dataBuffer.putLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF, inIndexOffset);
+ }
+ // filling data segment.
this.curUnflushSize.addAndGet(dataSize);
dataOffset = curDataSeg.append(dataBuffer, leftTime, rightTime);
- // judge whether need to create a new data segment.
+ // judge whether we need to create a new data segment.
if (curDataSeg.getCachedSize() >= this.tubeConfig.getMaxSegmentSize()) {
isDataSegFlushed = true;
long newDataOffset = curDataSeg.flush(true);
@@ -179,10 +192,8 @@ public class MsgFileStore implements Closeable {
this.dataSegments.append(new FileSegment(newDataOffset, newDataFile, SegmentType.DATA));
}
// filling index data.
- inDataOffset = indexBuffer.getLong(DataStoreUtils.INDEX_POS_DATAOFFSET);
- curIndexSeg = this.indexSegments.last();
indexOffset = curIndexSeg.append(indexBuffer, leftTime, rightTime);
- // judge whether need to create a new index segment.
+ // judge whether we need to create a new index segment.
if (curIndexSeg.getCachedSize()
>= this.tubeConfig.getMaxIndexSegmentSize()) {
isIndexSegFlushed = true;
@@ -195,16 +206,17 @@ public class MsgFileStore implements Closeable {
this.indexSegments.append(new FileSegment(newIndexOffset,
newIndexFile, SegmentType.INDEX));
}
- // check whether need to flush to disk.
- currTime = System.currentTimeMillis();
+ // check whether we need to flush to disk.
pendingMsgSizeExceed = (messageStore.getUnflushDataHold() > 0)
&& (curUnflushSize.get() >= messageStore.getUnflushDataHold());
- pendingMsgCntExceed = this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold();
- pendingMsgTimeExceed = currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval();
- boolean isSegmentRollOver = isDataSegFlushed || isIndexSegFlushed;
-
- if (pendingMsgCntExceed || pendingMsgTimeExceed || pendingMsgSizeExceed || isSegmentRollOver) {
- isForceMetadata = isSegmentRollOver || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR);
+ pendingMsgCntExceed =
+ (this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold());
+ pendingMsgTimeExceed =
+ (currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval());
+ if (pendingMsgCntExceed || pendingMsgTimeExceed
+ || pendingMsgSizeExceed || isDataSegFlushed || isIndexSegFlushed) {
+ isForceMetadata = (isDataSegFlushed || isIndexSegFlushed
+ || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR));
if (!isDataSegFlushed) {
curDataSeg.flush(isForceMetadata);
}
@@ -213,7 +225,7 @@ public class MsgFileStore implements Closeable {
}
flushedMsgCnt = this.curUnflushed.getAndSet(0);
flushedDataSize = this.curUnflushSize.getAndSet(0);
- this.lastFlushTime.set(System.currentTimeMillis());
+ this.lastFlushTime.set(currTime);
if (isForceMetadata) {
this.lastMetaFlushTime.set(this.lastFlushTime.get());
}
@@ -243,18 +255,21 @@ public class MsgFileStore implements Closeable {
} finally {
this.writeLock.unlock();
// add statistics.
- msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize,
- flushedMsgCnt, flushedDataSize, isDataSegFlushed, isIndexSegFlushed,
- pendingMsgSizeExceed, pendingMsgCntExceed, pendingMsgTimeExceed, isForceMetadata);
- if (isDataSegFlushed) {
- logger.info(sb.append("[File Store] Created data segment ")
- .append(newDataFilePath).toString());
- sb.delete(0, sb.length());
- }
- if (isIndexSegFlushed) {
- logger.info(sb.append("[File Store] Created index segment ")
- .append(newIndexFilePath).toString());
- sb.delete(0, sb.length());
+ if (fileStoreOK) {
+ msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize,
+ flushedMsgCnt, flushedDataSize, isDataSegFlushed, isIndexSegFlushed,
+ pendingMsgSizeExceed, pendingMsgCntExceed, pendingMsgTimeExceed,
+ isForceMetadata, System.currentTimeMillis() - currTime);
+ if (isDataSegFlushed) {
+ logger.info(sb.append("[File Store] Created data segment ")
+ .append(newDataFilePath).toString());
+ sb.delete(0, sb.length());
+ }
+ if (isIndexSegFlushed) {
+ logger.info(sb.append("[File Store] Created index segment ")
+ .append(newIndexFilePath).toString());
+ sb.delete(0, sb.length());
+ }
}
}
return new Tuple3<>(fileStoreOK, indexOffset, dataOffset);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index e8a432b73..40bba25c2 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -138,10 +138,13 @@ public class MsgMemStore implements Closeable {
this.writeLock.lock();
try {
// judge whether can write to memory or not.
- if ((fullDataSize = (this.cacheDataOffset.get() + dataEntryLength > this.maxDataCacheSize))
- || (fullCount = (this.curMessageCount.get() + 1 > maxAllowedMsgCount))
- || (fullIndexSize =
- (this.cacheIndexOffset.get() + DataStoreUtils.STORE_INDEX_HEAD_LEN > this.maxIndexCacheSize))) {
+ fullDataSize =
+ (this.cacheDataOffset.get() + dataEntryLength > this.maxDataCacheSize);
+ fullCount =
+ (this.curMessageCount.get() + 1 > maxAllowedMsgCount);
+ fullIndexSize =
+ (this.cacheIndexOffset.get() + DataStoreUtils.STORE_INDEX_HEAD_LEN > this.maxIndexCacheSize);
+ if (fullDataSize || fullCount || fullIndexSize) {
isAppended = false;
return false;
}
@@ -163,9 +166,7 @@ public class MsgMemStore implements Closeable {
}
} finally {
this.writeLock.unlock();
- if (isAppended) {
- memStatsHolder.addMsgWriteSuccess(dataEntryLength);
- } else {
+ if (!isAppended) {
memStatsHolder.addCacheFullType(fullDataSize, fullIndexSize, fullCount);
}
}
@@ -329,7 +330,7 @@ public class MsgMemStore implements Closeable {
tmpIndexBuffer.flip();
tmpDataReadBuf.flip();
long startTime = System.currentTimeMillis();
- msgFileStore.appendMsg(strBuffer, curMessageCount.get(),
+ msgFileStore.appendMsg(true, startTime, strBuffer, curMessageCount.get(),
cacheIndexOffset.get(), tmpIndexBuffer, cacheDataOffset.get(),
tmpDataReadBuf, leftAppendTime.get(), rightAppendTime.get());
BrokerSrvStatsHolder.updDiskSyncDataDlt(System.currentTimeMillis() - startTime);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
index cb467528c..cb9cdb353 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -255,8 +255,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
OffsetStorageInfo regInfo =
loadOrCreateOffset(group, topic, partitionId, offsetCacheKey, 0);
if ((tmpOffset == 0) && (!regInfo.isFirstCreate())) {
- updatedOffset = regInfo.getOffset();
- return updatedOffset;
+ return regInfo.getOffset();
}
updatedOffset = regInfo.addAndGetOffset(tmpOffset);
if (logger.isDebugEnabled()) {
@@ -462,6 +461,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
*/
@Override
public Map<String, OffsetRecordInfo> getOnlineGroupOffsetInfo() {
+ OffsetRecordInfo recordInfo;
+ Map<String, OffsetStorageInfo> storeMap;
Map<String, OffsetRecordInfo> result = new HashMap<>();
for (Map.Entry<String,
ConcurrentHashMap<String, OffsetStorageInfo>> entry : cfmOffsetMap.entrySet()) {
@@ -469,7 +470,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
continue;
}
// read offset map information
- Map<String, OffsetStorageInfo> storeMap = entry.getValue();
+ storeMap = entry.getValue();
if (storeMap.isEmpty()) {
continue;
}
@@ -477,7 +478,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
if (storageInfo == null) {
continue;
}
- OffsetRecordInfo recordInfo = result.get(entry.getKey());
+ recordInfo = result.get(entry.getKey());
if (recordInfo == null) {
recordInfo = new OffsetRecordInfo(
brokerConfig.getBrokerId(), entry.getKey());
@@ -740,5 +741,4 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
return (lagValue > TServerConstants.CFG_OFFSET_RESET_MID_ALARM_CHECK)
? 2 : (lagValue > TServerConstants.CFG_OFFSET_RESET_MIN_ALARM_CHECK) ? 1 : 0;
}
-
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
index ae8ac4ca3..a238d93d8 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
@@ -67,12 +67,15 @@ public class MsgStoreStatsHolder {
* Add write message success statistics.
*
* @param msgSize the message size
+ * @param writeDlt write duration
*/
- public void addMsgWriteSuccess(int msgSize) {
+ public void addMsgWriteSuccess(int msgSize, long writeDlt) {
if (isClosed) {
return;
}
- msgStoreStatsSets[getIndex()].msgAppendSizeStats.update(msgSize);
+ MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
+ tmStatsSet.msgAppendSizeStats.update(msgSize);
+ tmStatsSet.msgAppendDurStats.update(writeDlt);
}
/**
@@ -133,18 +136,12 @@ public class MsgStoreStatsHolder {
/**
* Add cache timeout flush statistic.
*
- * @param flushTime the flush time
- * @param isTimeoutFlush whether is timeout flush
*/
- public void addCacheTimeoutFlush(long flushTime, boolean isTimeoutFlush) {
+ public void addCacheTimeoutFlush() {
if (isClosed) {
return;
}
- MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
- tmStatsSet.cacheSyncStats.update(flushTime);
- if (isTimeoutFlush) {
- tmStatsSet.cacheTimeFullCnt.incValue();
- }
+ msgStoreStatsSets[getIndex()].cacheTimeFullCnt.incValue();
}
/**
@@ -161,12 +158,14 @@ public class MsgStoreStatsHolder {
* @param isMsgCntFull whether the cached message count is full
* @param isCacheTimeFull whether the cached time is full
* @param isForceMetadata whether force push metadata
+ * @param dltAppendTime the duration of the append operation
*/
public void addFileFlushStatsInfo(int msgCnt, int msgIndexSize, int msgDataSize,
long flushedMsgCnt, long flushedDataSize,
boolean isDataSegFlush, boolean isIndexSegFlush,
boolean isDataSizeFull, boolean isMsgCntFull,
- boolean isCacheTimeFull, boolean isForceMetadata) {
+ boolean isCacheTimeFull, boolean isForceMetadata,
+ long dltAppendTime) {
if (isClosed) {
return;
}
@@ -174,6 +173,7 @@ public class MsgStoreStatsHolder {
tmStatsSet.fileAccumMsgCnt.addValue(msgCnt);
tmStatsSet.fileAccumMsgIndexSize.addValue(msgIndexSize);
tmStatsSet.fileAccumMsgDataSize.addValue(msgDataSize);
+ tmStatsSet.fileFlusheDurStats.update(dltAppendTime);
if (flushedDataSize > 0) {
tmStatsSet.fileFlushedDataSize.update(flushedDataSize);
}
@@ -401,6 +401,7 @@ public class MsgStoreStatsHolder {
statsMap.put("isClosed", (isStatsClosed() ? 1L : 0L));
// for memory store
statsSet.msgAppendSizeStats.getValue(statsMap, false);
+ statsSet.msgAppendDurStats.getValue(statsMap, false);
statsMap.put(statsSet.msgAppendFailCnt.getFullName(),
statsSet.msgAppendFailCnt.getValue());
statsMap.put(statsSet.cacheDataSizeFullCnt.getFullName(),
@@ -415,7 +416,6 @@ public class MsgStoreStatsHolder {
statsSet.cacheFlushPendingCnt.getValue());
statsMap.put(statsSet.cacheReAllocCnt.getFullName(),
statsSet.cacheReAllocCnt.getValue());
- statsSet.cacheSyncStats.getValue(statsMap, false);
// for file store
statsMap.put(statsSet.fileAccumMsgCnt.getFullName(),
statsSet.fileAccumMsgCnt.getValue());
@@ -423,6 +423,7 @@ public class MsgStoreStatsHolder {
statsSet.fileAccumMsgDataSize.getValue());
statsMap.put(statsSet.fileAccumMsgIndexSize.getFullName(),
statsSet.fileAccumMsgIndexSize.getValue());
+ statsSet.fileFlusheDurStats.getValue(statsMap, false);
statsSet.fileFlushedDataSize.getValue(statsMap, false);
statsSet.fileFlushedMsgCnt.getValue(statsMap, false);
statsMap.put(statsSet.fileDataSegAddCnt.getFullName(),
@@ -460,6 +461,8 @@ public class MsgStoreStatsHolder {
.append("\":\"").append(statsSet.resetTime.getStrSinceTime())
.append("\",\"isClosed\":").append(isStatsClosed()).append(",");
statsSet.msgAppendSizeStats.getValue(strBuff, false);
+ strBuff.append(",");
+ statsSet.msgAppendDurStats.getValue(strBuff, false);
strBuff.append(",\"").append(statsSet.msgAppendFailCnt.getFullName())
.append("\":").append(statsSet.msgAppendFailCnt.getValue())
.append(",\"").append(statsSet.cacheDataSizeFullCnt.getFullName())
@@ -474,15 +477,15 @@ public class MsgStoreStatsHolder {
.append("\":").append(statsSet.cacheReAllocCnt.getValue())
.append(",\"").append(statsSet.cacheDataSizeFullCnt.getFullName())
.append("\":").append(statsSet.cacheDataSizeFullCnt.getValue())
- .append(",");
- statsSet.cacheSyncStats.getValue(strBuff, false);
- strBuff.append(",\"").append(statsSet.fileAccumMsgCnt.getFullName())
+ .append(",\"").append(statsSet.fileAccumMsgCnt.getFullName())
.append("\":").append(statsSet.fileAccumMsgCnt.getValue())
.append(",\"").append(statsSet.fileAccumMsgDataSize.getFullName())
.append("\":").append(statsSet.fileAccumMsgDataSize.getValue())
.append(",\"").append(statsSet.fileAccumMsgIndexSize.getFullName())
.append("\":").append(statsSet.fileAccumMsgIndexSize.getValue())
.append(",");
+ statsSet.fileFlusheDurStats.getValue(strBuff, false);
+ strBuff.append(",");
statsSet.fileFlushedDataSize.getValue(strBuff, false);
strBuff.append(",");
statsSet.fileFlushedMsgCnt.getValue(strBuff, false);
@@ -519,6 +522,9 @@ public class MsgStoreStatsHolder {
// The message size received
protected final SimpleHistogram msgAppendSizeStats =
new SimpleHistogram("msg_append_size", null);
+ // The duration of message written
+ protected final ESTHistogram msgAppendDurStats =
+ new ESTHistogram("msg_append_dlt", null);
// The count of message append failures
protected final LongStatsCounter msgAppendFailCnt =
new LongStatsCounter("msg_append_fail", null);
@@ -540,9 +546,6 @@ public class MsgStoreStatsHolder {
// The cache re-alloc count
protected final LongStatsCounter cacheReAllocCnt =
new LongStatsCounter("cache_realloc", null);
- // The cache persistence duration statistics
- protected final ESTHistogram cacheSyncStats =
- new ESTHistogram("cache_flush_dlt", null);
// for file store
// The accumulate message count statistics
protected final LongStatsCounter fileAccumMsgCnt =
@@ -553,6 +556,9 @@ public class MsgStoreStatsHolder {
// The accumulate message index statistics
protected final LongStatsCounter fileAccumMsgIndexSize =
new LongStatsCounter("file_total_index_size", null);
+ // statistics on file flush time
+ protected final ESTHistogram fileFlusheDurStats =
+ new ESTHistogram("file_flush_dlt", null);
// The data flushed statistics
protected final SimpleHistogram fileFlushedDataSize =
new SimpleHistogram("file_flush_data_size", null);
@@ -594,6 +600,7 @@ public class MsgStoreStatsHolder {
// for file metric items
this.fileAccumMsgCnt.clear();
this.fileAccumMsgDataSize.clear();
+ this.fileFlusheDurStats.clear();
this.fileFlushedDataSize.clear();
this.fileAccumMsgIndexSize.clear();
this.fileFlushedMsgCnt.clear();
@@ -605,6 +612,7 @@ public class MsgStoreStatsHolder {
this.fileCachedTimeFullCnt.clear();
// for message metric items
this.msgAppendSizeStats.clear();
+ this.msgAppendDurStats.clear();
this.msgAppendFailCnt.clear();
// for cache metric items
this.cacheDataSizeFullCnt.clear();
@@ -613,7 +621,6 @@ public class MsgStoreStatsHolder {
this.cacheFlushPendingCnt.clear();
this.cacheReAllocCnt.clear();
this.cacheTimeFullCnt.clear();
- this.cacheSyncStats.clear();
this.resetTime.reset();
}
}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
index c7a81312f..cea7777bb 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
@@ -32,12 +32,11 @@ public class MsgStoreStatsHolderTest {
public void testMemPartStats() {
MsgStoreStatsHolder msgStoreStatsHolder = new MsgStoreStatsHolder();
// case 1, not started
- msgStoreStatsHolder.addMsgWriteSuccess(50);
+ msgStoreStatsHolder.addMsgWriteSuccess(50, 2);
msgStoreStatsHolder.addCacheFullType(true, false, false);
msgStoreStatsHolder.addCacheFullType(false, true, false);
msgStoreStatsHolder.addCacheFullType(false, false, true);
- msgStoreStatsHolder.addCacheTimeoutFlush(50, false);
- msgStoreStatsHolder.addCacheTimeoutFlush(10, true);
+ msgStoreStatsHolder.addCacheTimeoutFlush();
msgStoreStatsHolder.addMsgWriteFailure();
Map<String, Long> retMap = new LinkedHashMap<>();
msgStoreStatsHolder.getValue(retMap);
@@ -52,9 +51,6 @@ public class MsgStoreStatsHolderTest {
Assert.assertEquals(0, retMap.get("cache_time_full").longValue());
Assert.assertEquals(0, retMap.get("cache_flush_pending").longValue());
Assert.assertEquals(0, retMap.get("cache_realloc").longValue());
- Assert.assertEquals(0, retMap.get("cache_flush_dlt_count").longValue());
- Assert.assertEquals(Long.MIN_VALUE, retMap.get("cache_flush_dlt_max").longValue());
- Assert.assertEquals(Long.MAX_VALUE, retMap.get("cache_flush_dlt_min").longValue());
Assert.assertNotNull(retMap.get("end_time"));
retMap.clear();
// get content by StringBuilder
@@ -66,16 +62,14 @@ public class MsgStoreStatsHolderTest {
// System.out.println("getAllMemStatsInfo : " + strBuff);
strBuff.delete(0, strBuff.length());
// case 2 started
- msgStoreStatsHolder.addMsgWriteSuccess(50);
- msgStoreStatsHolder.addMsgWriteSuccess(500);
- msgStoreStatsHolder.addMsgWriteSuccess(5);
+ msgStoreStatsHolder.addMsgWriteSuccess(50, 10);
+ msgStoreStatsHolder.addMsgWriteSuccess(500, 20);
+ msgStoreStatsHolder.addMsgWriteSuccess(5, 3);
msgStoreStatsHolder.addCacheFullType(true, false, false);
msgStoreStatsHolder.addCacheFullType(false, true, false);
msgStoreStatsHolder.addCacheFullType(false, false, true);
- msgStoreStatsHolder.addCacheTimeoutFlush(50, false);
- msgStoreStatsHolder.addCacheTimeoutFlush(10, true);
- msgStoreStatsHolder.addCacheTimeoutFlush(100, true);
- msgStoreStatsHolder.addCacheTimeoutFlush(1, false);
+ msgStoreStatsHolder.addCacheTimeoutFlush();
+ msgStoreStatsHolder.addCacheTimeoutFlush();
msgStoreStatsHolder.addMsgWriteFailure();
msgStoreStatsHolder.addMsgWriteFailure();
msgStoreStatsHolder.addCacheReAlloc();
@@ -95,16 +89,9 @@ public class MsgStoreStatsHolderTest {
Assert.assertEquals(2, retMap.get("cache_time_full").longValue());
Assert.assertEquals(3, retMap.get("cache_flush_pending").longValue());
Assert.assertEquals(2, retMap.get("cache_realloc").longValue());
- Assert.assertEquals(4, retMap.get("cache_flush_dlt_count").longValue());
- Assert.assertEquals(100, retMap.get("cache_flush_dlt_max").longValue());
- Assert.assertEquals(1, retMap.get("cache_flush_dlt_min").longValue());
- Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_0t2").longValue());
- Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_8t16").longValue());
- Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_32t64").longValue());
- Assert.assertEquals(1, retMap.get("cache_flush_dlt_cell_64t128").longValue());
Assert.assertNotNull(retMap.get("end_time"));
msgStoreStatsHolder.getMsgStoreStatsInfo(false, strBuff);
- // System.out.println("\n the second is : " + strBuff.toString());
+ System.out.println("\n the second is : " + strBuff.toString());
strBuff.delete(0, strBuff.length());
}
@@ -114,7 +101,7 @@ public class MsgStoreStatsHolderTest {
// case 1, not started
msgStoreStatsHolder.addFileFlushStatsInfo(2, 30, 500,
0, 0, true, true,
- true, true, true, true);
+ true, true, true, true, 5);
msgStoreStatsHolder.addFileTimeoutFlushStats(1, 500, false);
Map<String, Long> retMap = new LinkedHashMap<>();
msgStoreStatsHolder.getValue(retMap);
@@ -133,6 +120,9 @@ public class MsgStoreStatsHolderTest {
Assert.assertEquals(0, retMap.get("file_data_full").longValue());
Assert.assertEquals(0, retMap.get("file_count_full").longValue());
Assert.assertEquals(0, retMap.get("file_time_full").longValue());
+ Assert.assertEquals(0, retMap.get("file_flush_dlt_count").longValue());
+ Assert.assertEquals(Long.MAX_VALUE, retMap.get("file_flush_dlt_min").longValue());
+ Assert.assertEquals(Long.MIN_VALUE, retMap.get("file_flush_dlt_max").longValue());
Assert.assertNotNull(retMap.get("end_time"));
retMap.clear();
// get content by StringBuilder
@@ -159,27 +149,36 @@ public class MsgStoreStatsHolderTest {
Assert.assertEquals(0, retMap.get("file_data_full").longValue());
Assert.assertEquals(0, retMap.get("file_count_full").longValue());
Assert.assertEquals(1, retMap.get("file_time_full").longValue());
+ Assert.assertEquals(0, retMap.get("file_flush_dlt_count").longValue());
+ Assert.assertEquals(Long.MAX_VALUE, retMap.get("file_flush_dlt_min").longValue());
+ Assert.assertEquals(Long.MIN_VALUE, retMap.get("file_flush_dlt_max").longValue());
Assert.assertNotNull(retMap.get("end_time"));
retMap.clear();
// get value when started
msgStoreStatsHolder.addFileFlushStatsInfo(1, 1, 1,
1, 1, true, false,
- false, false, false, false);
+ false, false, false, false,
+ 6);
msgStoreStatsHolder.addFileFlushStatsInfo(6, 6, 6,
6, 6, false, false,
- false, false, false, true);
+ false, false, false, true,
+ 100);
msgStoreStatsHolder.addFileFlushStatsInfo(2, 2, 2,
2, 2, false, true,
- false, false, false, false);
+ false, false, false, false,
+ 10);
msgStoreStatsHolder.addFileFlushStatsInfo(5, 5, 5,
5, 5, false, false,
- false, false, true, false);
+ false, false, true, false,
+ 200);
msgStoreStatsHolder.addFileFlushStatsInfo(4, 4, 4,
4, 4, false, false,
- false, true, false, false);
+ false, true, false, false,
+ 50);
msgStoreStatsHolder.addFileFlushStatsInfo(3, 3, 3,
3, 3, false, false,
- true, false, false, false);
+ true, false, false, false,
+ 150);
msgStoreStatsHolder.snapShort(retMap);
Assert.assertNotNull(retMap.get("reset_time"));
Assert.assertEquals(21, retMap.get("file_total_msg_cnt").longValue());
@@ -197,6 +196,9 @@ public class MsgStoreStatsHolderTest {
Assert.assertEquals(2, retMap.get("file_meta_flush").longValue());
Assert.assertEquals(1, retMap.get("file_count_full").longValue());
Assert.assertEquals(2, retMap.get("file_time_full").longValue());
+ Assert.assertEquals(6, retMap.get("file_flush_dlt_count").longValue());
+ Assert.assertEquals(6, retMap.get("file_flush_dlt_min").longValue());
+ Assert.assertEquals(200, retMap.get("file_flush_dlt_max").longValue());
Assert.assertNotNull(retMap.get("end_time"));
retMap.clear();
msgStoreStatsHolder.getMsgStoreStatsInfo(true, strBuff);