You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/25 14:44:45 UTC
[incubator-inlong] branch master updated: [INLONG-2728][TubeMQ] Optimize the content of statistical metrics (#2729)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fdf46c6 [INLONG-2728][TubeMQ] Optimize the content of statistical metrics (#2729)
fdf46c6 is described below
commit fdf46c67c0db289b41a1fef54dac5f1b6dc834a8
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri Feb 25 22:44:39 2022 +0800
[INLONG-2728][TubeMQ] Optimize the content of statistical metrics (#2729)
---
.../tubemq/client/common/ClientStatsInfo.java | 579 +++++++++++++--------
.../inlong/tubemq/client/common/StatsConfig.java | 161 ++++++
.../{StatsOutputLevel.java => StatsLevel.java} | 15 +-
.../tubemq/client/common/TClientConstants.java | 12 +-
.../tubemq/client/config/TubeClientConfig.java | 111 ++--
.../client/consumer/BaseMessageConsumer.java | 13 +-
.../tubemq/client/consumer/FetchContext.java | 4 +
.../consumer/SimpleClientBalanceConsumer.java | 20 +-
.../client/consumer/SimplePullMessageConsumer.java | 10 +-
.../client/consumer/SimplePushMessageConsumer.java | 2 +-
.../tubemq/client/producer/ProducerManager.java | 8 +-
.../client/producer/SimpleMessageProducer.java | 2 +-
.../tubemq/client/consumer/StatsConfigTest.java | 51 ++
.../tubemq/corebase/metric/TrafficStatsUnit.java | 23 +
.../server/broker/stats/BrokerSrvStatsHolder.java | 4 +-
.../server/broker/stats/MsgStoreStatsHolder.java | 12 +-
.../server/broker/stats/TrafficStatsService.java | 20 +-
.../server/common/webbase/WebCallStatsHolder.java | 70 ++-
.../server/master/stats/MasterSrvStatsHolder.java | 4 +-
19 files changed, 755 insertions(+), 366 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
index 186da25..548b3e2 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
@@ -37,30 +37,20 @@ public class ClientStatsInfo {
LoggerFactory.getLogger(ClientStatsInfo.class);
private final boolean isProducer;
private final String logPrefix;
+ // statistic configure
+ private final StatsConfig statsConfig = new StatsConfig();
// switchable statistic items
private final ClientStatsItemSet[] switchableSets = new ClientStatsItemSet[2];
// current writable index
private final AtomicInteger writableIndex = new AtomicInteger(0);
- // statistics self-print period, ms
- private final long statsPrintPeriodMs;
- // statistics force reset period, ms
- private final long statsForcedResetPeriodMs;
- // whether the statistic is closed
- private volatile boolean isClosed = false;
- // whether to self-print statistics
- private volatile boolean isSelfPrint = true;
// last self-print time
private final AtomicLong lstSelfPrintTime = new AtomicLong(0);
// last snapshot time
private final AtomicLong lstSnapshotTime = new AtomicLong(0);
- public ClientStatsInfo(boolean isProducer, String clientId,
- boolean isSelfPrint, long statsPrintPeriodMs,
- long statsForcedResetPeriodMs) {
+ public ClientStatsInfo(boolean isProducer, String clientId, StatsConfig statsConfig) {
this.isProducer = isProducer;
- this.isSelfPrint = isSelfPrint;
- this.statsPrintPeriodMs = statsPrintPeriodMs;
- this.statsForcedResetPeriodMs = statsForcedResetPeriodMs;
+ this.statsConfig.updateStatsConfig(statsConfig);
StringBuilder strBuff = new StringBuilder(512);
if (isProducer) {
strBuff.append("[Producer");
@@ -73,20 +63,12 @@ public class ClientStatsInfo {
this.switchableSets[1] = new ClientStatsItemSet();
}
- public synchronized void setStatsStatus(boolean enableStats) {
- this.isClosed = !enableStats;
- }
-
- public boolean isStatsClosed() {
- return this.isClosed;
- }
-
- public boolean isStatsSelfPrint() {
- return this.isSelfPrint;
+ public void updStatsControlInfo(StatsLevel statsLevel, boolean enableSelfPrint) {
+ this.statsConfig.updateStatsControl(statsLevel, enableSelfPrint);
}
public void bookReg2Master(boolean isFailure) {
- if (isClosed) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
return;
}
switchableSets[getIndex()].regMasterCnt.incValue();
@@ -96,21 +78,21 @@ public class ClientStatsInfo {
}
public void bookHB2MasterTimeout() {
- if (isClosed) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
return;
}
switchableSets[getIndex()].regMasterTimoutCnt.incValue();
}
public void bookHB2MasterException() {
- if (isClosed) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
return;
}
switchableSets[getIndex()].hbMasterExcCnt.incValue();
}
public void bookReg2Broker(boolean isFailure) {
- if (isClosed) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
return;
}
switchableSets[getIndex()].regBrokerCnt.incValue();
@@ -120,82 +102,99 @@ public class ClientStatsInfo {
}
public void bookHB2BrokerTimeout() {
- if (isClosed) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
return;
}
switchableSets[getIndex()].regBrokerTimoutCnt.incValue();
}
public void bookHB2BrokerException() {
- if (isClosed) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
return;
}
switchableSets[getIndex()].hbBrokerExcCnt.incValue();
}
- public void bookConfirmDuration(long dltTime) {
- if (isClosed) {
+ public void bookReturnDuration(String partitionKey, long dltTime) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
+ return;
+ }
+ ClientStatsItemSet curItemSet = switchableSets[getIndex()];
+ curItemSet.csmLatencyStats.update(dltTime);
+ if (this.statsConfig.getStatsLevel() == StatsLevel.FULL) {
+ curItemSet.addCsmLatencyByPartKey(partitionKey, dltTime);
+ }
+ }
+
+ public void bookConfirmDuration(String partitionKey, long dltTime) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
return;
}
- switchableSets[getIndex()].csmLatencyStats.update(dltTime);
+ ClientStatsItemSet curItemSet = switchableSets[getIndex()];
+ curItemSet.confirmDltStats.update(dltTime);
+ if (this.statsConfig.getStatsLevel() == StatsLevel.FULL) {
+ curItemSet.addConfirmDltByPartKey(partitionKey, dltTime);
+ }
}
- public void bookSuccSendMsg(long dltTime, String topicName, int msgSize) {
- if (isClosed) {
+ public void bookFailRpcCall(int errCode) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
return;
}
- sendOrRecvMsg(topicName, dltTime, 1, msgSize);
- bookFailRpcCall(TErrCodeConstants.SUCCESS);
+ switchableSets[getIndex()].bookFailRpcCall(errCode);
}
- public void bookSuccGetMsg(long dltTime, String topicName, int msgCnt, int msgSize) {
- if (isClosed) {
+ public void bookSuccSendMsg(long dltTime, String topicName,
+ String partitionKey, int msgSize) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
return;
}
- sendOrRecvMsg(topicName, dltTime, msgCnt, msgSize);
- bookFailRpcCall(TErrCodeConstants.SUCCESS);
+ ClientStatsItemSet curItemSet = switchableSets[getIndex()];
+ curItemSet.sendOrRecvMsg(topicName, dltTime, 1, msgSize);
+ curItemSet.bookFailRpcCall(TErrCodeConstants.SUCCESS);
+ if (this.statsConfig.getStatsLevel() == StatsLevel.FULL) {
+ curItemSet.addTrafficInfoByPartKey(partitionKey, dltTime, 1, msgSize);
+ }
}
- public void bookFailRpcCall(int errCode) {
- if (isClosed) {
+ public void bookSuccGetMsg(long dltTime, String topicName,
+ String partitionKey, int msgCnt, int msgSize) {
+ if (this.statsConfig.getStatsLevel() == StatsLevel.ZERO) {
return;
}
- // accumulate msg count by errcode
ClientStatsItemSet curItemSet = switchableSets[getIndex()];
- LongStatsCounter curItemCounter = curItemSet.errRspStatsMap.get(errCode);
- if (curItemCounter == null) {
- LongStatsCounter tmpCounter = new LongStatsCounter(
- "err_" + errCode, "");
- curItemCounter = curItemSet.errRspStatsMap.putIfAbsent(errCode, tmpCounter);
- if (curItemCounter == null) {
- curItemCounter = tmpCounter;
- }
+ curItemSet.sendOrRecvMsg(topicName, dltTime, msgCnt, msgSize);
+ curItemSet.bookFailRpcCall(TErrCodeConstants.SUCCESS);
+ if (this.statsConfig.getStatsLevel() == StatsLevel.FULL) {
+ curItemSet.addTrafficInfoByPartKey(partitionKey, dltTime, msgCnt, msgSize);
}
- curItemCounter.incValue();
}
/**
* Self print statistics information to log file
*
* @param forcePrint whether force print statistics information
+ * @param needReset whether reset statistics set
* @param strBuff string buffer
*/
- public void selfPrintStatsInfo(boolean forcePrint, StringBuilder strBuff) {
- if (this.isClosed || !this.isSelfPrint) {
+ public void selfPrintStatsInfo(boolean forcePrint,
+ boolean needReset,
+ StringBuilder strBuff) {
+ if ((this.statsConfig.getStatsLevel() == StatsLevel.ZERO)
+ || !this.statsConfig.isEnableSelfPrint()) {
return;
}
long lstPrintTime = lstSelfPrintTime.get();
long curChkTime = Clock.systemDefaultZone().millis();
- if (forcePrint || (curChkTime - lstPrintTime > this.statsPrintPeriodMs)) {
+ if (forcePrint || (curChkTime - lstPrintTime > this.statsConfig.getSelfPrintPeriodMs())) {
if (lstSelfPrintTime.compareAndSet(lstPrintTime, curChkTime)) {
- if (switchWritingStatsUnit(false)) {
- strBuff.append(this.logPrefix).append(", reset value=");
+ if (switchWritingStatsUnit(needReset)) {
+ strBuff.append(this.logPrefix).append(", reset value= ");
getStatsInfo(switchableSets[getIndex(writableIndex.get() - 1)],
- strBuff, StatsOutputLevel.FULL, false);
+ true, strBuff);
} else {
- strBuff.append(this.logPrefix).append(", value=");
- getStatsInfo(switchableSets[getIndex()],
- strBuff, StatsOutputLevel.FULL, false);
+ strBuff.append(this.logPrefix).append(", value= ");
+ getStatsInfo(switchableSets[getIndex()], false, strBuff);
}
logger.info(strBuff.toString());
strBuff.delete(0, strBuff.length());
@@ -208,9 +207,9 @@ public class ClientStatsInfo {
long lstResetTime = lstSnapshotTime.get();
long checkDltTime = System.currentTimeMillis() - lstResetTime;
if (((needReset && (checkDltTime > TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS))
- || (checkDltTime > this.statsForcedResetPeriodMs))
+ || (checkDltTime > this.statsConfig.getForcedResetPeriodMs()))
&& lstSnapshotTime.compareAndSet(lstResetTime, System.currentTimeMillis())) {
- switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
+ switchableSets[getIndex(writableIndex.incrementAndGet())].resetStartTime();
return true;
}
return false;
@@ -219,167 +218,49 @@ public class ClientStatsInfo {
/**
* Get current data encapsulated by Json format
*
- * @param strBuff string buffer
- * @param outputLevel the output level of statistics
+ * @param statsSet the statistics to be printed
* @param resetValue whether to reset the current data
+ * @param strBuff string buffer
*/
- private void getStatsInfo(ClientStatsItemSet statsSet, StringBuilder strBuff,
- StatsOutputLevel outputLevel, boolean resetValue) {
- int totalCnt = 0;
+ private void getStatsInfo(ClientStatsItemSet statsSet,
+ boolean resetValue, StringBuilder strBuff) {
strBuff.append("{\"").append(statsSet.resetTime.getFullName())
.append("\":\"").append(statsSet.resetTime.getStrSinceTime())
.append("\",\"probe_time\":\"")
.append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()))
- .append("\"");
- if (resetValue) {
- strBuff.append(",\"").append(statsSet.totalTrafficStats.msgCnt.getFullName())
- .append("\":").append(statsSet.totalTrafficStats.msgCnt.getAndResetValue())
- .append(",\"").append(statsSet.totalTrafficStats.msgSize.getFullName())
- .append("\":").append(statsSet.totalTrafficStats.msgSize.getAndResetValue())
- .append(",\"traffic_details\":{");
- for (Map.Entry<String, TrafficStatsUnit> entry
- : statsSet.topicTrafficMap.entrySet()) {
- if (entry == null) {
- continue;
- }
- if (totalCnt++ > 0) {
- strBuff.append(",");
- }
- strBuff.append("\"").append(entry.getKey()).append("\":{\"")
- .append(entry.getValue().msgCnt.getShortName()).append("\":")
- .append(entry.getValue().msgCnt.getAndResetValue()).append(",\"")
- .append(entry.getValue().msgSize.getShortName()).append("\":")
- .append(entry.getValue().msgCnt.getAndResetValue()).append("}");
- }
- strBuff.append("}");
- if (outputLevel != StatsOutputLevel.SIMPLEST) {
- strBuff.append(",");
+ .append("\",");
+ statsSet.totalTrafficStats.getValue(strBuff, resetValue);
+ strBuff.append(",");
+ statsSet.getTopicDetailInfo(strBuff, resetValue);
+ if (this.statsConfig.getStatsLevel() != StatsLevel.SIMPLEST) {
+ strBuff.append(",");
+ if (resetValue) {
statsSet.msgCallDltStats.snapShort(strBuff, false);
if (!isProducer) {
strBuff.append(",");
statsSet.csmLatencyStats.snapShort(strBuff, false);
- }
- strBuff.append(",\"rsp_details\":{");
- totalCnt = 0;
- for (LongStatsCounter statsCounter : statsSet.errRspStatsMap.values()) {
- if (statsCounter == null) {
- continue;
- }
- if (totalCnt++ > 0) {
- strBuff.append(",");
- }
- strBuff.append("\"").append(statsCounter.getFullName()).append("\":")
- .append(statsCounter.getAndResetValue());
- }
- strBuff.append("}");
- }
- if (outputLevel == StatsOutputLevel.FULL) {
- strBuff.append(",\"").append(statsSet.regMasterCnt.getFullName())
- .append("\":").append(statsSet.regMasterCnt.getAndResetValue())
- .append(",\"").append(statsSet.regMasterFailCnt.getFullName())
- .append("\":").append(statsSet.regMasterFailCnt.getAndResetValue())
- .append(",\"").append(statsSet.regMasterTimoutCnt.getFullName())
- .append("\":").append(statsSet.regMasterTimoutCnt.getAndResetValue())
- .append(",\"").append(statsSet.hbMasterExcCnt.getFullName())
- .append("\":").append(statsSet.hbMasterExcCnt.getAndResetValue())
- .append(",\"").append(statsSet.regBrokerCnt.getFullName())
- .append("\":").append(statsSet.regBrokerCnt.getAndResetValue())
- .append(",\"").append(statsSet.regBrokerFailCnt.getFullName())
- .append("\":").append(statsSet.regBrokerFailCnt.getAndResetValue())
- .append(",\"").append(statsSet.regBrokerTimoutCnt.getFullName())
- .append("\":").append(statsSet.regBrokerTimoutCnt.getAndResetValue())
- .append(",\"").append(statsSet.hbBrokerExcCnt.getFullName())
- .append("\":").append(statsSet.hbBrokerExcCnt.getAndResetValue());
- }
- strBuff.append("}");
- } else {
- strBuff.append(",\"").append(statsSet.totalTrafficStats.msgCnt.getFullName())
- .append("\":").append(statsSet.totalTrafficStats.msgCnt.getValue())
- .append(",\"").append(statsSet.totalTrafficStats.msgSize.getFullName())
- .append("\":").append(statsSet.totalTrafficStats.msgSize.getValue())
- .append(",\"traffic_details\":{");
- for (Map.Entry<String, TrafficStatsUnit> entry
- : statsSet.topicTrafficMap.entrySet()) {
- if (entry == null) {
- continue;
- }
- if (totalCnt++ > 0) {
strBuff.append(",");
+ statsSet.confirmDltStats.getValue(strBuff, false);
}
- strBuff.append("\"").append(entry.getKey()).append("\":{\"")
- .append(entry.getValue().msgCnt.getShortName()).append("\":")
- .append(entry.getValue().msgCnt.getValue()).append(",\"")
- .append(entry.getValue().msgSize.getShortName()).append("\":")
- .append(entry.getValue().msgCnt.getValue()).append("}");
- }
- strBuff.append("}");
- if (outputLevel != StatsOutputLevel.SIMPLEST) {
- strBuff.append(",");
+ } else {
statsSet.msgCallDltStats.getValue(strBuff, false);
if (!isProducer) {
strBuff.append(",");
statsSet.csmLatencyStats.getValue(strBuff, false);
+ strBuff.append(",");
+ statsSet.confirmDltStats.getValue(strBuff, false);
}
- strBuff.append(",\"rsp_details\":{");
- totalCnt = 0;
- for (LongStatsCounter statsCounter : statsSet.errRspStatsMap.values()) {
- if (statsCounter == null) {
- continue;
- }
- if (totalCnt++ > 0) {
- strBuff.append(",");
- }
- strBuff.append("\"").append(statsCounter.getFullName()).append("\":")
- .append(statsCounter.getValue());
- }
- strBuff.append("}");
}
- if (outputLevel == StatsOutputLevel.FULL) {
- strBuff.append(",\"").append(statsSet.regMasterCnt.getFullName())
- .append("\":").append(statsSet.regMasterCnt.getValue())
- .append(",\"").append(statsSet.regMasterFailCnt.getFullName())
- .append("\":").append(statsSet.regMasterFailCnt.getValue())
- .append(",\"").append(statsSet.regMasterTimoutCnt.getFullName())
- .append("\":").append(statsSet.regMasterTimoutCnt.getValue())
- .append(",\"").append(statsSet.hbMasterExcCnt.getFullName())
- .append("\":").append(statsSet.hbMasterExcCnt.getValue())
- .append(",\"").append(statsSet.regBrokerCnt.getFullName())
- .append("\":").append(statsSet.regBrokerCnt.getValue())
- .append(",\"").append(statsSet.regBrokerFailCnt.getFullName())
- .append("\":").append(statsSet.regBrokerFailCnt.getValue())
- .append(",\"").append(statsSet.regBrokerTimoutCnt.getFullName())
- .append("\":").append(statsSet.regBrokerTimoutCnt.getValue())
- .append(",\"").append(statsSet.hbBrokerExcCnt.getFullName())
- .append("\":").append(statsSet.hbBrokerExcCnt.getValue());
- }
- strBuff.append("}");
- }
- }
-
- /**
- * Accumulate sent or received message information
- *
- * @param topic the topic name
- * @param dltTime the latency
- * @param msgCnt the message count
- * @param msgSize the message size
- */
- private void sendOrRecvMsg(String topic, long dltTime,
- int msgCnt, int msgSize) {
- ClientStatsItemSet curItemSet = switchableSets[getIndex()];
- curItemSet.msgCallDltStats.update(dltTime);
- curItemSet.totalTrafficStats.addMsgCntAndSize(msgCnt, msgSize);
- // accumulate traffic information by topic
- TrafficStatsUnit curStatsUnit = curItemSet.topicTrafficMap.get(topic);
- if (curStatsUnit == null) {
- TrafficStatsUnit tmpUnit =
- new TrafficStatsUnit("msg_cnt", "msg_size", topic);
- curStatsUnit = curItemSet.topicTrafficMap.putIfAbsent(topic, tmpUnit);
- if (curStatsUnit == null) {
- curStatsUnit = tmpUnit;
+ strBuff.append(",");
+ statsSet.getErrorRspInfo(strBuff, resetValue);
+ strBuff.append(",");
+ statsSet.getStatusInfo(strBuff, resetValue);
+ if (this.statsConfig.getStatsLevel() == StatsLevel.FULL) {
+ strBuff.append(",");
+ statsSet.getPartDetailsInfo(strBuff, isProducer, resetValue);
}
}
- curStatsUnit.addMsgCntAndSize(msgCnt, msgSize);
+ strBuff.append("}");
}
/**
@@ -411,16 +292,22 @@ public class ClientStatsInfo {
new SinceTime("reset_time", null);
// received or sent message traffic statistic
protected final TrafficStatsUnit totalTrafficStats =
- new TrafficStatsUnit("msg_cnt", "msg_size", "total");
+ new TrafficStatsUnit("msg_cnt", "msg_size", "total_traffic");
// topic-based traffic statistics
protected final ConcurrentHashMap<String, TrafficStatsUnit> topicTrafficMap =
new ConcurrentHashMap<>();
+ // partition-based traffic statistics
+ protected final ConcurrentHashMap<String, PartitionStatsItemSet> partDltStatsMap =
+ new ConcurrentHashMap<>();
// time consumption statistics for sending or receiving messages
protected final ESTHistogram msgCallDltStats =
new ESTHistogram("msg_call_dlt", "");
// statistics on consumption transaction time
protected final ESTHistogram csmLatencyStats =
new ESTHistogram("csm_latency_dlt", "");
+ // time consumption statistics for confirm request
+ protected final ESTHistogram confirmDltStats =
+ new ESTHistogram("msg_confirm_dlt", "");
// error response distribution statistics
protected final ConcurrentHashMap<Integer, LongStatsCounter> errRspStatsMap =
new ConcurrentHashMap<>();
@@ -444,12 +331,288 @@ public class ClientStatsInfo {
new LongStatsCounter("hb_broker_exception", null);
public ClientStatsItemSet() {
- resetSinceTime();
+ resetStartTime();
}
- public void resetSinceTime() {
+ public void resetStartTime() {
this.resetTime.reset();
}
+
+ /**
+ * Accumulate sent or received message information
+ *
+ * @param topic the topic name
+ * @param dltTime the latency
+ * @param msgCnt the message count
+ * @param msgSize the message size
+ */
+ public void sendOrRecvMsg(String topic, long dltTime,
+ int msgCnt, int msgSize) {
+ msgCallDltStats.update(dltTime);
+ totalTrafficStats.addMsgCntAndSize(msgCnt, msgSize);
+ // accumulate traffic information by topic
+ TrafficStatsUnit curStatsUnit = topicTrafficMap.get(topic);
+ if (curStatsUnit == null) {
+ TrafficStatsUnit tmpUnit =
+ new TrafficStatsUnit("msg_cnt", "msg_size", topic);
+ curStatsUnit = topicTrafficMap.putIfAbsent(topic, tmpUnit);
+ if (curStatsUnit == null) {
+ curStatsUnit = tmpUnit;
+ }
+ }
+ curStatsUnit.addMsgCntAndSize(msgCnt, msgSize);
+ }
+
+ /**
+ * Accumulate msg count by errcode
+ *
+ * @param errCode the error code
+ */
+ public void bookFailRpcCall(int errCode) {
+ LongStatsCounter curItemCounter = errRspStatsMap.get(errCode);
+ if (curItemCounter == null) {
+ LongStatsCounter tmpCounter = new LongStatsCounter(
+ "err_" + errCode, "");
+ curItemCounter = errRspStatsMap.putIfAbsent(errCode, tmpCounter);
+ if (curItemCounter == null) {
+ curItemCounter = tmpCounter;
+ }
+ }
+ curItemCounter.incValue();
+ }
+
+ /**
+ * Accumulate traffic information by partition
+ *
+ * @param dltTime the latency
+ * @param msgCnt the message count
+ * @param msgSize the message size
+ */
+ public void addTrafficInfoByPartKey(String partitionKey,
+ long dltTime, int msgCnt, int msgSize) {
+ PartitionStatsItemSet partStatsUnit =
+ partDltStatsMap.get(partitionKey);
+ if (partStatsUnit == null) {
+ PartitionStatsItemSet tmpUnit = new PartitionStatsItemSet(partitionKey);
+ partStatsUnit = partDltStatsMap.putIfAbsent(partitionKey, tmpUnit);
+ if (partStatsUnit == null) {
+ partStatsUnit = tmpUnit;
+ }
+ }
+ partStatsUnit.trafficStatsUnit.addMsgCntAndSize(msgCnt, msgSize);
+ partStatsUnit.msgCallDltStats.update(dltTime);
+ }
+
+ /**
+ * Accumulate consume latency information by partition
+ *
+ * @param dltTime the latency
+ */
+ public void addCsmLatencyByPartKey(String partitionKey, long dltTime) {
+ PartitionStatsItemSet partStatsUnit = partDltStatsMap.get(partitionKey);
+ if (partStatsUnit == null) {
+ PartitionStatsItemSet tmpUnit = new PartitionStatsItemSet(partitionKey);
+ partStatsUnit = partDltStatsMap.putIfAbsent(partitionKey, tmpUnit);
+ if (partStatsUnit == null) {
+ partStatsUnit = tmpUnit;
+ }
+ }
+ partStatsUnit.csmLatencyStats.update(dltTime);
+ }
+
+ /**
+ * Accumulate confirm delta time information by partition
+ *
+ * @param dltTime the latency
+ */
+ public void addConfirmDltByPartKey(String partitionKey, long dltTime) {
+ PartitionStatsItemSet partStatsUnit = partDltStatsMap.get(partitionKey);
+ if (partStatsUnit == null) {
+ PartitionStatsItemSet tmpUnit = new PartitionStatsItemSet(partitionKey);
+ partStatsUnit = partDltStatsMap.putIfAbsent(partitionKey, tmpUnit);
+ if (partStatsUnit == null) {
+ partStatsUnit = tmpUnit;
+ }
+ }
+ partStatsUnit.confirmDltStats.update(dltTime);
+ }
+
+ /**
+ * Get traffic statistics details by topic
+ *
+ * @param strBuff string buffer
+ * @param resetValue whether to reset the current data
+ */
+ public void getTopicDetailInfo(StringBuilder strBuff, boolean resetValue) {
+ int totalCnt = 0;
+ strBuff.append("\"topic_details\":{");
+ for (Map.Entry<String, TrafficStatsUnit> entry : topicTrafficMap.entrySet()) {
+ if (entry == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ strBuff.append(",");
+ }
+ entry.getValue().getValue(strBuff, true);
+ }
+ strBuff.append("}");
+ if (resetValue) {
+ topicTrafficMap.clear();
+ }
+ }
+
+ /**
+ * Get error response statistics
+ *
+ * @param strBuff string buffer
+ * @param resetValue whether to reset the current data
+ */
+ public void getErrorRspInfo(StringBuilder strBuff, boolean resetValue) {
+ int totalCnt = 0;
+ strBuff.append("\"rsp_details\":{");
+ for (LongStatsCounter statsCounter : errRspStatsMap.values()) {
+ if (statsCounter == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ strBuff.append(",");
+ }
+ strBuff.append("\"").append(statsCounter.getFullName()).append("\":")
+ .append(statsCounter.getValue());
+ }
+ strBuff.append("}");
+ if (resetValue) {
+ errRspStatsMap.clear();
+ }
+ }
+
+ /**
+ * Get running status statistics
+ *
+ * @param strBuff string buffer
+ * @param resetValue whether to reset the current data
+ */
+ public void getStatusInfo(StringBuilder strBuff, boolean resetValue) {
+ strBuff.append("\"status_details\":{\"");
+ if (resetValue) {
+ strBuff.append(regMasterCnt.getFullName()).append("\":")
+ .append(regMasterCnt.getAndResetValue())
+ .append(",\"").append(regMasterFailCnt.getFullName()).append("\":")
+ .append(regMasterFailCnt.getAndResetValue())
+ .append(",\"").append(regMasterTimoutCnt.getFullName()).append("\":")
+ .append(regMasterTimoutCnt.getAndResetValue())
+ .append(",\"").append(hbMasterExcCnt.getFullName()).append("\":")
+ .append(hbMasterExcCnt.getAndResetValue())
+ .append(",\"").append(regBrokerCnt.getFullName()).append("\":")
+ .append(regBrokerCnt.getAndResetValue())
+ .append(",\"").append(regBrokerFailCnt.getFullName()).append("\":")
+ .append(regBrokerFailCnt.getAndResetValue())
+ .append(",\"").append(regBrokerTimoutCnt.getFullName()).append("\":")
+ .append(regBrokerTimoutCnt.getAndResetValue())
+ .append(",\"").append(hbBrokerExcCnt.getFullName()).append("\":")
+ .append(hbBrokerExcCnt.getAndResetValue())
+ .append("}");
+ } else {
+ strBuff.append(regMasterCnt.getFullName()).append("\":")
+ .append(regMasterCnt.getValue())
+ .append(",\"").append(regMasterFailCnt.getFullName()).append("\":")
+ .append(regMasterFailCnt.getValue())
+ .append(",\"").append(regMasterTimoutCnt.getFullName()).append("\":")
+ .append(regMasterTimoutCnt.getValue())
+ .append(",\"").append(hbMasterExcCnt.getFullName()).append("\":")
+ .append(hbMasterExcCnt.getValue())
+ .append(",\"").append(regBrokerCnt.getFullName()).append("\":")
+ .append(regBrokerCnt.getValue())
+ .append(",\"").append(regBrokerFailCnt.getFullName()).append("\":")
+ .append(regBrokerFailCnt.getValue())
+ .append(",\"").append(regBrokerTimoutCnt.getFullName()).append("\":")
+ .append(regBrokerTimoutCnt.getValue())
+ .append(",\"").append(hbBrokerExcCnt.getFullName()).append("\":")
+ .append(hbBrokerExcCnt.getValue())
+ .append("}");
+ }
+ }
+
+ /**
+ * Get traffic statistics by partition key
+ *
+ * @param strBuff string buffer
+ * @param isProducer whether producer role
+ * @param resetValue whether to reset the current data
+ */
+ public void getPartDetailsInfo(StringBuilder strBuff,
+ boolean isProducer,
+ boolean resetValue) {
+ int totalCnt = 0;
+ strBuff.append("\"part_details\":{");
+ for (PartitionStatsItemSet partStatsSet : partDltStatsMap.values()) {
+ if (partStatsSet == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ strBuff.append(",");
+ }
+ partStatsSet.getValue(strBuff, isProducer, false);
+ }
+ strBuff.append("}");
+ if (resetValue) {
+ partDltStatsMap.clear();
+ }
+ }
+ }
+
+ /**
+ * PartitionStatsItemSet, partition related statistics set
+ *
+ */
+ private static class PartitionStatsItemSet {
+ private final String partKey;
+ protected final TrafficStatsUnit trafficStatsUnit =
+ new TrafficStatsUnit("msg_cnt", "msg_size", "traffic");
+ // time consumption statistics for sending or receiving messages
+ protected final ESTHistogram msgCallDltStats =
+ new ESTHistogram("msg_call_dlt", "");
+ // statistics on consumption transaction time
+ protected final ESTHistogram csmLatencyStats =
+ new ESTHistogram("csm_latency_dlt", "");
+ // time consumption statistics for confirm request
+ protected final ESTHistogram confirmDltStats =
+ new ESTHistogram("msg_confirm_dlt", "");
+
+ public PartitionStatsItemSet(String partKey) {
+ this.partKey = partKey;
+ }
+
+ /**
+ * Get partition's traffic statistics
+ *
+ * @param strBuff string buffer
+ * @param isProducer whether producer role
+ * @param resetValue whether to reset the current data
+ */
+ public void getValue(StringBuilder strBuff, boolean isProducer, boolean resetValue) {
+ strBuff.append("\"").append(partKey).append("\":{");
+ trafficStatsUnit.getValue(strBuff, resetValue);
+ strBuff.append(",");
+ if (resetValue) {
+ msgCallDltStats.snapShort(strBuff, false);
+ if (!isProducer) {
+ strBuff.append(",");
+ csmLatencyStats.snapShort(strBuff, false);
+ strBuff.append(",");
+ confirmDltStats.snapShort(strBuff, false);
+ }
+ } else {
+ msgCallDltStats.getValue(strBuff, false);
+ if (!isProducer) {
+ strBuff.append(",");
+ csmLatencyStats.getValue(strBuff, false);
+ strBuff.append(",");
+ confirmDltStats.getValue(strBuff, false);
+ }
+ }
+ strBuff.append("}");
+ }
}
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsConfig.java
new file mode 100644
index 0000000..6eed718
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsConfig.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+import java.util.Objects;
+import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+
+/**
+ * StatsConfig, configuration settings related to client statistics
+ *
+ */
+public class StatsConfig {
+ // client statistics information print period
+ private static final long STATS_SELF_PRINT_DEFAULT_PERIOD_MS = 6 * 1000 * 60L;
+ private static final long STATS_SELF_PRINT_MIN_PERIOD_MS = 1000 * 60L;
+ private static final long STATS_SELF_PRINT_MAX_PERIOD_MS = 60 * 1000 * 60L;
+ // client statistics information print period
+ private static final long STATS_AUTO_RESET_DEFAULT_PERIOD_MS = 30 * 60 * 1000L;
+ private static final long STATS_AUTO_RESET_MIN_PERIOD_MS = 30 * 1000L;
+ private static final long STATS_AUTO_RESET_MAX_PERIOD_MS = 24 * 3600 * 1000L;
+ // data statistics level
+ private StatsLevel statsLevel = StatsLevel.MEDIUM;
+ // Enable metric information print
+ private boolean enableSelfPrint = true;
+ // Metric print period in ms.
+ private long selfPrintPeriodMs = STATS_SELF_PRINT_DEFAULT_PERIOD_MS;
+ // Metric reset value period in ms.
+ private long forcedResetPeriodMs = STATS_AUTO_RESET_DEFAULT_PERIOD_MS;
+
+ /**
+ * Initialize instance with the default value
+ *
+ */
+ public StatsConfig() {
+ //
+ }
+
+ /**
+ * Update current configure settings according to the specified configuration
+ *
+ * @param that the specified configuration
+ */
+ public void updateStatsConfig(StatsConfig that) {
+ if (that == null) {
+ return;
+ }
+ updateStatsConfig(that.statsLevel, that.enableSelfPrint,
+ that.selfPrintPeriodMs, that.forcedResetPeriodMs);
+ }
+
+ /**
+ * Update current configure settings
+ * Attention, if statsLevel is ZERO, then printing will be automatically turned off
+ * regardless of the value of enableSelfPrint
+ *
+ * @param statsLevel the statistics level
+ * @param enableSelfPrint whether to allow the SDK to print by itself
+ * @param selfPrintPeriodMs the time interval that the SDK prints itself
+ * @param forcedResetPeriodMs the resets interval for collecting data
+ */
+ public void updateStatsConfig(StatsLevel statsLevel, boolean enableSelfPrint,
+ long selfPrintPeriodMs, long forcedResetPeriodMs) {
+ updateStatsControl(statsLevel, enableSelfPrint);
+ setStatsPeriodInfo(selfPrintPeriodMs, forcedResetPeriodMs);
+ }
+
+ /**
+ * Update current statistics control settings
+ * Attention, if statsLevel is ZERO, then printing will be automatically turned off
+ * regardless of the value of enableSelfPrint
+ *
+ * @param statsLevel the statistics level
+ * @param enableSelfPrint whether to allow the SDK to print by itself
+ */
+ public void updateStatsControl(StatsLevel statsLevel, boolean enableSelfPrint) {
+ this.statsLevel = statsLevel;
+ this.enableSelfPrint = enableSelfPrint;
+ }
+
+ /**
+ * Update current period settings
+ *
+ * @param selfPrintPeriodMs the time interval that the SDK prints itself
+ * @param forcedResetPeriodMs the resets interval for collecting data
+ */
+ public void setStatsPeriodInfo(long selfPrintPeriodMs,
+ long forcedResetPeriodMs) {
+ this.selfPrintPeriodMs =
+ MixedUtils.mid(selfPrintPeriodMs,
+ STATS_SELF_PRINT_MIN_PERIOD_MS, STATS_SELF_PRINT_MAX_PERIOD_MS);
+ this.forcedResetPeriodMs =
+ MixedUtils.mid(forcedResetPeriodMs,
+ STATS_AUTO_RESET_MIN_PERIOD_MS, STATS_AUTO_RESET_MAX_PERIOD_MS);
+ }
+
+ public void setStatsLevel(StatsLevel statsLevel) {
+ this.statsLevel = statsLevel;
+ }
+
+ public StatsLevel getStatsLevel() {
+ return statsLevel;
+ }
+
+ public boolean isEnableSelfPrint() {
+ return enableSelfPrint;
+ }
+
+ public long getSelfPrintPeriodMs() {
+ return selfPrintPeriodMs;
+ }
+
+ public long getForcedResetPeriodMs() {
+ return forcedResetPeriodMs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof StatsConfig)) {
+ return false;
+ }
+ StatsConfig that = (StatsConfig) o;
+ return ((enableSelfPrint == that.enableSelfPrint)
+ && (selfPrintPeriodMs == that.selfPrintPeriodMs)
+ && (forcedResetPeriodMs == that.forcedResetPeriodMs)
+ && (statsLevel == that.statsLevel));
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(statsLevel, enableSelfPrint,
+ selfPrintPeriodMs, forcedResetPeriodMs);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder(512)
+ .append("\"StatsConfig\":{\"statsLevel\":\"").append(statsLevel.getName())
+ .append("\",\"enableSelfPrint\":").append(enableSelfPrint)
+ .append(",\"selfPrintPeriodMs\":").append(selfPrintPeriodMs)
+ .append(",\"forcedResetPeriodMs\":").append(forcedResetPeriodMs)
+ .append("}").toString();
+ }
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsLevel.java
similarity index 75%
rename from inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java
rename to inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsLevel.java
index e45f7f4..5f3e759 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsLevel.java
@@ -17,12 +17,13 @@
package org.apache.inlong.tubemq.client.common;
-public enum StatsOutputLevel {
- SIMPLEST(0, "simplest", "Simplest statistics output"),
- MEDIUM(1, "medium", "Medium statistics output"),
- FULL(2, "full", "Full statistics output");
+public enum StatsLevel {
+ ZERO(0, "closed", "Statistics are turned off"),
+ SIMPLEST(1, "simplest", "Simplest statistics"),
+ MEDIUM(2, "medium", "Medium statistics"),
+ FULL(3, "full", "Full statistics");
- StatsOutputLevel(int id, String name, String desc) {
+ StatsLevel(int id, String name, String desc) {
this.id = id;
this.name = name;
this.desc = desc;
@@ -40,8 +41,8 @@ public enum StatsOutputLevel {
return desc;
}
- public static StatsOutputLevel valueOf(int value) {
- for (StatsOutputLevel outputLevel : StatsOutputLevel.values()) {
+ public static StatsLevel valueOf(int value) {
+ for (StatsLevel outputLevel : StatsLevel.values()) {
if (outputLevel.getId() == value) {
return outputLevel;
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
index 0269af2..b7a8d70 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
@@ -25,7 +25,7 @@ public class TClientConstants {
public static final long CFG_DEFAULT_REGFAIL_WAIT_PERIOD_MS = 1000;
public static final long CFG_DEFAULT_MSG_NOTFOUND_WAIT_PERIOD_MS = 400L;
public static final long CFG_DEFAULT_CONSUME_READ_WAIT_PERIOD_MS = 90000L;
- public static final long CFG_DEFAULT_CONSUME_READ_CHECK_SLICE_MS = 50L;
+ public static final long CFG_DEFAULT_CONSUME_READ_CHECK_SLICE_MS = 3L;
public static final long CFG_DEFAULT_PUSH_LISTENER_WAIT_PERIOD_MS = 3000L;
public static final long CFG_DEFAULT_PULL_REB_CONFIRM_WAIT_PERIOD_MS = 3000L;
public static final long CFG_DEFAULT_PULL_PROTECT_CONFIRM_WAIT_PERIOD_MS = 60000L;
@@ -39,14 +39,4 @@ public class TClientConstants {
public static final long CFG_DEFAULT_META_QUERY_WAIT_PERIOD_MS = 10000L;
public static final long CFG_MIN_META_QUERY_WAIT_PERIOD_MS = 5000L;
-
- // client statistics information print period
- public static final long STATS_SELF_PRINT_DEFAULT_PERIOD_MS = 3 * 1000 * 60;
- public static final long STATS_SELF_PRINT_MIN_PERIOD_MS = 1000 * 60;
- public static final long STATS_SELF_PRINT_MAX_PERIOD_MS = 60 * 1000 * 60;
-
- // client statistics information print period
- public static final long STATS_AUTO_RESET_DEFAULT_PERIOD_MS = 30 * 60 * 1000;
- public static final long STATS_AUTO_RESET_MIN_PERIOD_MS = 60 * 1000;
- public static final long STATS_AUTO_RESET_MAX_PERIOD_MS = 24 * 3600 * 1000;
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
index f06a4c0..ad8b920 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
@@ -17,11 +17,12 @@
package org.apache.inlong.tubemq.client.config;
+import org.apache.inlong.tubemq.client.common.StatsConfig;
+import org.apache.inlong.tubemq.client.common.StatsLevel;
import org.apache.inlong.tubemq.client.common.TClientConstants;
import org.apache.inlong.tubemq.corebase.cluster.MasterInfo;
import org.apache.inlong.tubemq.corebase.config.TLSConfig;
import org.apache.inlong.tubemq.corebase.utils.AddressUtils;
-import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corerpc.RpcConstants;
@@ -55,12 +56,8 @@ public class TubeClientConfig {
private long heartbeatPeriodAfterFail = TClientConstants.CFG_DEFAULT_HEARTBEAT_PERIOD_AFTER_RETRY_FAIL;
// Link statistic check duration in ms.
private long linkStatsDurationMs = RpcConstants.CFG_LQ_STATS_DURATION_MS;
- // Enable metric information print
- private boolean enableStatsSelfPrint = true;
- // Metric print period in ms.
- private long statsSelfPrintPeriodMs = TClientConstants.STATS_SELF_PRINT_DEFAULT_PERIOD_MS;
- // Metric reset value period in ms.
- private long statsForcedResetPeriodMs = TClientConstants.STATS_AUTO_RESET_DEFAULT_PERIOD_MS;
+ // statistics setting
+ private final StatsConfig statsConfig = new StatsConfig();
// The following 5 configuration parameters are used in broker exception process.
//
@@ -469,34 +466,14 @@ public class TubeClientConfig {
return usrPassWord;
}
- public boolean enableStatsSelfPrint() {
- return enableStatsSelfPrint;
+ public StatsConfig getStatsConfig() {
+ return this.statsConfig;
}
- public void setStatsSelfPrint(boolean enableStatsSelfPrint) {
- this.enableStatsSelfPrint = enableStatsSelfPrint;
- }
-
- public long getStatsSelfPrintPeriodMs() {
- return statsSelfPrintPeriodMs;
- }
-
- public void setStatsSelfPrintPeriodMs(long statsSelfPrintPeriodMs) {
- this.statsSelfPrintPeriodMs =
- MixedUtils.mid(statsSelfPrintPeriodMs,
- TClientConstants.STATS_SELF_PRINT_MIN_PERIOD_MS,
- TClientConstants.STATS_SELF_PRINT_MAX_PERIOD_MS);
- }
-
- public long getStatsForcedResetPeriodMs() {
- return statsForcedResetPeriodMs;
- }
-
- public void setStatsForcedResetPeriodMs(long statsForcedResetPeriodMs) {
- this.statsForcedResetPeriodMs =
- MixedUtils.mid(statsForcedResetPeriodMs,
- TClientConstants.STATS_AUTO_RESET_MIN_PERIOD_MS,
- TClientConstants.STATS_AUTO_RESET_MAX_PERIOD_MS);
+ public void setStatsConfig(StatsLevel statsLevel, boolean enableSelfPrint,
+ long selfPrintPeriodMs, long forcedResetPeriodMs) {
+ this.statsConfig.updateStatsConfig(statsLevel,
+ enableSelfPrint, selfPrintPeriodMs, forcedResetPeriodMs);
}
@Override
@@ -591,13 +568,7 @@ public class TubeClientConfig {
if (!this.tlsConfig.equals(that.tlsConfig)) {
return false;
}
- if (this.enableStatsSelfPrint != that.enableStatsSelfPrint) {
- return false;
- }
- if (this.statsSelfPrintPeriodMs != that.statsSelfPrintPeriodMs) {
- return false;
- }
- if (this.statsForcedResetPeriodMs != that.statsForcedResetPeriodMs) {
+ if (!this.statsConfig.equals(that.statsConfig)) {
return false;
}
return masterInfo.equals(that.masterInfo);
@@ -625,36 +596,34 @@ public class TubeClientConfig {
sBuilder.append("\"").append(item).append("\"");
}
return sBuilder.append("],\"rpcReadTimeoutMs\":").append(this.rpcReadTimeoutMs)
- .append(",\"rpcConnProcessorCnt\":").append(this.rpcConnProcessorCnt)
- .append(",\"rpcNettyWorkMemorySize\":").append(this.rpcNettyWorkMemorySize)
- .append(",\"rpcRspCallBackThreadCnt\":").append(this.rpcRspCallBackThreadCnt)
- .append(",\"nettyWriteBufferHighWaterMark\":").append(this.nettyWriteBufferHighWaterMark)
- .append(",\"nettyWriteBufferLowWaterMark\":").append(this.nettyWriteBufferLowWaterMark)
- .append(",\"maxRegisterRetryTimes\":").append(this.maxRegisterRetryTimes)
- .append(",\"regFailWaitPeriodMs\":").append(this.regFailWaitPeriodMs)
- .append(",\"maxHeartBeatRetryTimes\":").append(this.maxHeartBeatRetryTimes)
- .append(",\"heartbeatPeriodMs\":").append(this.heartbeatPeriodMs)
- .append(",\"heartbeatPeriodAfterFail\":").append(this.heartbeatPeriodAfterFail)
- .append(",\"linkStatsDurationMs\":").append(this.linkStatsDurationMs)
- .append(",\"linkStatsForbiddenDurationMs\":").append(this.linkStatsForbiddenDurationMs)
- .append(",\"linkStatsMaxAllowedFailTimes\":").append(this.linkStatsMaxAllowedFailTimes)
- .append(",\"linkStatsMaxForbiddenRate\":").append(this.linkStatsMaxForbiddenRate)
- .append(",\"maxSentForbiddenRate\":").append(this.maxSentForbiddenRate)
- .append(",\"maxForbiddenCheckDuration\":").append(this.maxForbiddenCheckDuration)
- .append(",\"sessionStatisticCheckDuration\":").append(this.sessionStatisticCheckDuration)
- .append(",\"sessionWarnForbiddenRate\":").append(this.sessionWarnForbiddenRate)
- .append(",\"sessionWarnDelayedMsgCount\":").append(this.sessionWarnDelayedMsgCount)
- .append(",\"linkMaxAllowedDelayedMsgCount\":").append(this.linkMaxAllowedDelayedMsgCount)
- .append(",\"sessionMaxAllowedDelayedMsgCount\":").append(this.sessionMaxAllowedDelayedMsgCount)
- .append(",\"unAvailableFbdDurationMs\":").append(this.unAvailableFbdDurationMs)
- .append(",\"enableUserAuthentic\":").append(this.enableUserAuthentic)
- .append(",\"enableStatsSelfPrint\":").append(this.enableStatsSelfPrint)
- .append(",\"statsSelfPrintPeriodMs\":").append(this.statsSelfPrintPeriodMs)
- .append(",\"statsForcedResetPeriodMs\":").append(this.statsForcedResetPeriodMs)
- .append(",\"usrName\":\"").append(this.usrName)
- .append("\",\"usrPassWord\":\"").append(this.usrPassWord)
- .append("\",\"localAddress\":\"").append(localAddress)
- .append("\",").append(this.tlsConfig.toString())
- .append("}").toString();
+ .append(",\"rpcConnProcessorCnt\":").append(this.rpcConnProcessorCnt)
+ .append(",\"rpcNettyWorkMemorySize\":").append(this.rpcNettyWorkMemorySize)
+ .append(",\"rpcRspCallBackThreadCnt\":").append(this.rpcRspCallBackThreadCnt)
+ .append(",\"nettyWriteBufferHighWaterMark\":").append(this.nettyWriteBufferHighWaterMark)
+ .append(",\"nettyWriteBufferLowWaterMark\":").append(this.nettyWriteBufferLowWaterMark)
+ .append(",\"maxRegisterRetryTimes\":").append(this.maxRegisterRetryTimes)
+ .append(",\"regFailWaitPeriodMs\":").append(this.regFailWaitPeriodMs)
+ .append(",\"maxHeartBeatRetryTimes\":").append(this.maxHeartBeatRetryTimes)
+ .append(",\"heartbeatPeriodMs\":").append(this.heartbeatPeriodMs)
+ .append(",\"heartbeatPeriodAfterFail\":").append(this.heartbeatPeriodAfterFail)
+ .append(",\"linkStatsDurationMs\":").append(this.linkStatsDurationMs)
+ .append(",\"linkStatsForbiddenDurationMs\":").append(this.linkStatsForbiddenDurationMs)
+ .append(",\"linkStatsMaxAllowedFailTimes\":").append(this.linkStatsMaxAllowedFailTimes)
+ .append(",\"linkStatsMaxForbiddenRate\":").append(this.linkStatsMaxForbiddenRate)
+ .append(",\"maxSentForbiddenRate\":").append(this.maxSentForbiddenRate)
+ .append(",\"maxForbiddenCheckDuration\":").append(this.maxForbiddenCheckDuration)
+ .append(",\"sessionStatisticCheckDuration\":").append(this.sessionStatisticCheckDuration)
+ .append(",\"sessionWarnForbiddenRate\":").append(this.sessionWarnForbiddenRate)
+ .append(",\"sessionWarnDelayedMsgCount\":").append(this.sessionWarnDelayedMsgCount)
+ .append(",\"linkMaxAllowedDelayedMsgCount\":").append(this.linkMaxAllowedDelayedMsgCount)
+ .append(",\"sessionMaxAllowedDelayedMsgCount\":").append(this.sessionMaxAllowedDelayedMsgCount)
+ .append(",\"unAvailableFbdDurationMs\":").append(this.unAvailableFbdDurationMs)
+ .append(",\"enableUserAuthentic\":").append(this.enableUserAuthentic)
+ .append(",").append(this.statsConfig.toString())
+ .append(",\"usrName\":\"").append(this.usrName)
+ .append("\",\"usrPassWord\":\"").append(this.usrPassWord)
+ .append("\",\"localAddress\":\"").append(localAddress)
+ .append("\",").append(this.tlsConfig.toString())
+ .append("}").toString();
}
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index 5571507..7263a9d 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -145,10 +145,8 @@ public class BaseMessageConsumer implements MessageConsumer {
throw new TubeClientException("Get consumer id failed!", e);
}
this.clientStatsInfo =
- new ClientStatsInfo(false, this.consumerId,
- this.consumerConfig.enableStatsSelfPrint(),
- this.consumerConfig.getStatsSelfPrintPeriodMs(),
- this.consumerConfig.getStatsForcedResetPeriodMs());
+ new ClientStatsInfo(false,
+ this.consumerId, this.consumerConfig.getStatsConfig());
this.rmtDataCache =
new RmtDataCache(this.consumerConfig, null);
this.rpcServiceFactory =
@@ -458,7 +456,7 @@ public class BaseMessageConsumer implements MessageConsumer {
//
}
}
- clientStatsInfo.selfPrintStatsInfo(true, strBuffer);
+ clientStatsInfo.selfPrintStatsInfo(true, true, strBuffer);
logger.info(strBuffer
.append("[SHUTDOWN_CONSUMER] Partitions unregistered, consumer :")
.append(this.consumerId).toString());
@@ -1280,7 +1278,8 @@ public class BaseMessageConsumer implements MessageConsumer {
strBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
.append(taskContext.getUsedToken()).toString(), messageList, maxOffset);
strBuffer.delete(0, strBuffer.length());
- clientStatsInfo.bookSuccGetMsg(dltTime, topic, msgCount, msgSize);
+ clientStatsInfo.bookSuccGetMsg(dltTime,
+ topic, partitionKey, msgCount, msgSize);
break;
}
case TErrCodeConstants.HB_NO_NODE:
@@ -1422,7 +1421,7 @@ public class BaseMessageConsumer implements MessageConsumer {
rmtDataCache.resumeTimeoutConsumePartitions(isPullConsume,
consumerConfig.getPullProtectConfirmTimeoutMs());
// print metric information
- clientStatsInfo.selfPrintStatsInfo(false, strBuffer);
+ clientStatsInfo.selfPrintStatsInfo(false, true, strBuffer);
// Fetch the rebalance result, construct message adn return it.
ConsumerEvent event = rebalanceResults.poll();
List<SubscribeInfo> subInfoList = null;
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/FetchContext.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/FetchContext.java
index c4dca03..03749ba 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/FetchContext.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/FetchContext.java
@@ -70,6 +70,10 @@ public class FetchContext {
return partition;
}
+ public String getPartitionKey() {
+ return partition.getPartitionKey();
+ }
+
public long getUsedToken() {
return usedToken;
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
index eaf379d..d1a1a68 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -141,9 +141,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
new RmtDataCache(this.consumerConfig, null);
this.clientStatsInfo =
new ClientStatsInfo(false, this.consumerId,
- this.consumerConfig.enableStatsSelfPrint(),
- this.consumerConfig.getStatsSelfPrintPeriodMs(),
- this.consumerConfig.getStatsForcedResetPeriodMs());
+ this.consumerConfig.getStatsConfig());
this.rpcServiceFactory =
this.sessionFactory.getRpcServiceFactory();
this.rpcConfig.put(RpcConstants.CONNECT_TIMEOUT, 3000);
@@ -327,7 +325,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
}
}
// print metric information
- clientStatsInfo.selfPrintStatsInfo(true, strBuffer);
+ clientStatsInfo.selfPrintStatsInfo(true, true, strBuffer);
logger.info(strBuffer
.append("[SHUTDOWN_CONSUMER] Partitions unregistered, consumer :")
.append(this.consumerId).toString());
@@ -531,13 +529,13 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
if (selectResult.isSuccess()) {
break;
}
- if ((consumerConfig.getPullConsumeReadyWaitPeriodMs() >= 0)
+ if ((consumerConfig.getPullConsumeReadyWaitPeriodMs() >= 0L)
&& ((System.currentTimeMillis() - startTime)
>= consumerConfig.getPullConsumeReadyWaitPeriodMs())) {
result.setFailResult(selectResult.getErrCode(), selectResult.getErrMsg());
return result.isSuccess();
}
- if (consumerConfig.getPullConsumeReadyChkSliceMs() > 10) {
+ if (consumerConfig.getPullConsumeReadyChkSliceMs() > 0L) {
ThreadUtils.sleep(consumerConfig.getPullConsumeReadyChkSliceMs());
}
}
@@ -595,6 +593,9 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
sBuilder.delete(0, sBuilder.length());
String topicName = strConfirmContextItems[1].trim();
long timeStamp = Long.parseLong(strConfirmContextItems[3]);
+ long midTime = System.currentTimeMillis();
+ // book statistics information
+ clientStatsInfo.bookReturnDuration(keyId, midTime - timeStamp);
if (!clientRmtDataCache.isPartitionInUse(keyId, timeStamp)) {
result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"The confirmContext's value invalid!");
@@ -649,6 +650,8 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
} finally {
clientRmtDataCache.succRspRelease(keyId, topicName, timeStamp,
isConsumed, isFilterConsume(topicName), currOffset, maxOffset);
+ clientStatsInfo.bookConfirmDuration(keyId,
+ System.currentTimeMillis() - midTime);
}
}
}
@@ -776,7 +779,8 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
sBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
.append(taskContext.getUsedToken()).toString(), messageList, maxOffset);
sBuffer.delete(0, sBuffer.length());
- clientStatsInfo.bookSuccGetMsg(dltTime, topic, msgCount, msgSize);
+ clientStatsInfo.bookSuccGetMsg(dltTime,
+ topic, partitionKey, msgCount, msgSize);
break;
}
case TErrCodeConstants.HB_NO_NODE:
@@ -981,7 +985,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
clientRmtDataCache.resumeTimeoutConsumePartitions(false,
consumerConfig.getPullProtectConfirmTimeoutMs());
// print metric information
- clientStatsInfo.selfPrintStatsInfo(false, strBuffer);
+ clientStatsInfo.selfPrintStatsInfo(false, true, strBuffer);
// Send heartbeat request to master
ClientMaster.HeartResponseM2CV2 response =
masterService.consumerHeartbeatC2MV2(createMasterHeartBeatRequest(),
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
index b430781..c7142c9 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
@@ -142,12 +142,12 @@ public class SimplePullMessageConsumer implements PullMessageConsumer {
if (selectResult.isSuccess()) {
break;
}
- if ((baseConsumer.getConsumerConfig().getPullConsumeReadyWaitPeriodMs() >= 0)
+ if ((baseConsumer.getConsumerConfig().getPullConsumeReadyWaitPeriodMs() >= 0L)
&& (System.currentTimeMillis() - startTime
>= baseConsumer.getConsumerConfig().getPullConsumeReadyWaitPeriodMs())) {
return new ConsumerResult(selectResult.getErrCode(), selectResult.getErrMsg());
}
- if (baseConsumer.getConsumerConfig().getPullConsumeReadyChkSliceMs() > 10) {
+ if (baseConsumer.getConsumerConfig().getPullConsumeReadyChkSliceMs() > 0L) {
ThreadUtils.sleep(baseConsumer.getConsumerConfig().getPullConsumeReadyChkSliceMs());
}
}
@@ -205,8 +205,8 @@ public class SimplePullMessageConsumer implements PullMessageConsumer {
.append("Not found the partition by confirmContext:")
.append(confirmContext).toString());
}
- baseConsumer.clientStatsInfo.bookConfirmDuration(
- System.currentTimeMillis() - timeStamp);
+ long midTime = System.currentTimeMillis();
+ baseConsumer.clientStatsInfo.bookReturnDuration(keyId, midTime - timeStamp);
if (this.baseConsumer.consumerConfig.isPullConfirmInLocal()) {
baseConsumer.rmtDataCache.succRspRelease(keyId, topicName,
timeStamp, isConsumed, isFilterConsume(topicName), currOffset, maxOffset);
@@ -240,6 +240,8 @@ public class SimplePullMessageConsumer implements PullMessageConsumer {
} finally {
baseConsumer.rmtDataCache.succRspRelease(keyId, topicName,
timeStamp, isConsumed, isFilterConsume(topicName), currOffset, maxOffset);
+ baseConsumer.clientStatsInfo.bookConfirmDuration(keyId,
+ System.currentTimeMillis() - midTime);
}
}
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
index 45f5b86..f535588 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
@@ -249,7 +249,7 @@ public class SimplePushMessageConsumer implements PushMessageConsumer {
} else {
this.receiveMessages(request, topicProcessor);
}
- baseConsumer.clientStatsInfo.bookConfirmDuration(
+ baseConsumer.clientStatsInfo.bookReturnDuration(request.getPartitionKey(),
System.currentTimeMillis() - request.getUsedToken());
return true;
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
index 312ada1..afe929f 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
@@ -143,9 +143,7 @@ public class ProducerManager {
// initial client statistics configure
this.clientStatsInfo =
new ClientStatsInfo(true, this.producerId,
- this.tubeClientConfig.enableStatsSelfPrint(),
- this.tubeClientConfig.getStatsSelfPrintPeriodMs(),
- this.tubeClientConfig.getStatsForcedResetPeriodMs());
+ this.tubeClientConfig.getStatsConfig());
heartBeatStatus.set(0);
this.masterService =
this.rpcServiceFactory.getFailoverService(MasterService.class,
@@ -303,7 +301,7 @@ public class ProducerManager {
}
return;
}
- clientStatsInfo.selfPrintStatsInfo(true, strBuff);
+ clientStatsInfo.selfPrintStatsInfo(true, true, strBuff);
if (this.nodeStatus.compareAndSet(0, 1)) {
this.heartbeatService.shutdownNow();
this.topicPartitionMap.clear();
@@ -676,7 +674,7 @@ public class ProducerManager {
ThreadUtils.sleep(100);
}
// print metrics information
- clientStatsInfo.selfPrintStatsInfo(false, sBuilder);
+ clientStatsInfo.selfPrintStatsInfo(false, true, sBuilder);
// check whether public topics
if (publishTopics.isEmpty()) {
return;
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
index 6f71b44..291c56e 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
@@ -371,7 +371,7 @@ public class SimpleMessageProducer implements MessageProducer {
final String resultStr = response.getErrMsg();
if (response.getErrCode() == TErrCodeConstants.SUCCESS) {
producerManager.getClientMetrics().bookSuccSendMsg(dltTime,
- message.getTopic(), message.getData().length);
+ message.getTopic(), partition.getPartitionKey(), message.getData().length);
if (response.hasMessageId()) {
return new MessageSentResult(true,
response.getErrCode(), "Ok!",
diff --git a/inlong-tubemq/tubemq-client/src/test/java/org/apache/inlong/tubemq/client/consumer/StatsConfigTest.java b/inlong-tubemq/tubemq-client/src/test/java/org/apache/inlong/tubemq/client/consumer/StatsConfigTest.java
new file mode 100644
index 0000000..9f6569a
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/test/java/org/apache/inlong/tubemq/client/consumer/StatsConfigTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.consumer;
+
+import org.apache.inlong.tubemq.client.common.StatsConfig;
+import org.apache.inlong.tubemq.client.common.StatsLevel;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StatsConfigTest {
+ @Test
+ public void testStatsConfig() {
+ StatsConfig statsConfig = new StatsConfig();
+ Assert.assertEquals(statsConfig.getStatsLevel(), StatsLevel.MEDIUM);
+ Assert.assertTrue(statsConfig.isEnableSelfPrint());
+ Assert.assertEquals(statsConfig.getSelfPrintPeriodMs(), 6 * 1000 * 60L);
+ Assert.assertEquals(statsConfig.getForcedResetPeriodMs(), 30 * 60 * 1000L);
+ // test apis
+ statsConfig.updateStatsControl(StatsLevel.FULL, false);
+ Assert.assertEquals(statsConfig.getStatsLevel(), StatsLevel.FULL);
+ Assert.assertFalse(statsConfig.isEnableSelfPrint());
+ statsConfig.setStatsPeriodInfo(3000, 5000);
+ Assert.assertEquals(statsConfig.getSelfPrintPeriodMs(), 1000 * 60L);
+ Assert.assertEquals(statsConfig.getForcedResetPeriodMs(), 30 * 1000L);
+ // test case 2
+ statsConfig.updateStatsConfig(StatsLevel.ZERO, true, 300000L, 50000L);
+ StatsConfig statsConfig2 = new StatsConfig();
+ statsConfig2.updateStatsConfig(statsConfig);
+ Assert.assertEquals(statsConfig2.getStatsLevel(), StatsLevel.ZERO);
+ Assert.assertEquals(statsConfig2.isEnableSelfPrint(), statsConfig.isEnableSelfPrint());
+ Assert.assertEquals(statsConfig2.getSelfPrintPeriodMs(),
+ statsConfig.getSelfPrintPeriodMs());
+ Assert.assertEquals(statsConfig2.getForcedResetPeriodMs(),
+ statsConfig.getForcedResetPeriodMs());
+ }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
index 996592a..69443d2 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
@@ -18,6 +18,7 @@
package org.apache.inlong.tubemq.corebase.metric;
import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
/**
* TrafficStatsUnit, Metric Statistics item Unit
@@ -26,6 +27,8 @@ import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
* according to the statistics dimension, which can be expanded later as needed
*/
public class TrafficStatsUnit {
+ // the traffic name
+ private String trafficName;
// the message count
public LongStatsCounter msgCnt;
// the message size
@@ -39,6 +42,7 @@ public class TrafficStatsUnit {
* @param prefix the prefix of statistics items
*/
public TrafficStatsUnit(String msgCntName, String msgSizeName, String prefix) {
+ this.trafficName = prefix;
this.msgCnt = new LongStatsCounter(msgCntName, prefix);
this.msgSize = new LongStatsCounter(msgSizeName, prefix);
}
@@ -53,4 +57,23 @@ public class TrafficStatsUnit {
this.msgCnt.addValue(msgCount);
this.msgSize.addValue(msgSize);
}
+
+ public void getValue(StringBuilder strBuff, boolean resetValue) {
+ if (!TStringUtils.isEmpty(this.trafficName)) {
+ strBuff.append("\"").append(this.trafficName).append("\":");
+ }
+ if (resetValue) {
+ strBuff.append("{\"")
+ .append(msgCnt.getShortName()).append("\":")
+ .append(msgCnt.getAndResetValue()).append(",\"")
+ .append(msgSize.getShortName()).append("\":")
+ .append(msgSize.getAndResetValue()).append("}");
+ } else {
+ strBuff.append("{\"")
+ .append(msgCnt.getShortName()).append("\":")
+ .append(msgCnt.getValue()).append(",\"")
+ .append(msgSize.getShortName()).append("\":")
+ .append(msgSize.getValue()).append("}");
+ }
+ }
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
index 07817aa..b2d8511 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
@@ -65,7 +65,7 @@ public class BrokerSrvStatsHolder {
if (switchWritingStatsUnit()) {
getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, statsMap);
} else {
- getValue(statsMap);
+ getStatsValue(switchableSets[getIndex()], false, statsMap);
}
}
@@ -73,7 +73,7 @@ public class BrokerSrvStatsHolder {
if (switchWritingStatsUnit()) {
getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, strBuff);
} else {
- getValue(strBuff);
+ getStatsValue(switchableSets[getIndex()], false, strBuff);
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
index c2d3ad3..373c872 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
@@ -118,14 +118,15 @@ public class MsgStoreStatsHolder {
if (isClosed) {
return;
}
+ MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
if (isDataSizeFull) {
- msgStoreStatsSets[getIndex()].cacheDataSizeFullCnt.incValue();
+ tmStatsSet.cacheDataSizeFullCnt.incValue();
}
if (isIndexSizeFull) {
- msgStoreStatsSets[getIndex()].cacheIndexSizeFullCnt.incValue();
+ tmStatsSet.cacheIndexSizeFullCnt.incValue();
}
if (isMsgCntFull) {
- msgStoreStatsSets[getIndex()].cacheMsgCountFullCnt.incValue();
+ tmStatsSet.cacheMsgCountFullCnt.incValue();
}
}
@@ -139,9 +140,10 @@ public class MsgStoreStatsHolder {
if (isClosed) {
return;
}
- msgStoreStatsSets[getIndex()].cacheSyncStats.update(flushTime);
+ MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
+ tmStatsSet.cacheSyncStats.update(flushTime);
if (isTimeoutFlush) {
- msgStoreStatsSets[getIndex()].cacheTimeFullCnt.incValue();
+ tmStatsSet.cacheTimeFullCnt.incValue();
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
index f27147c..d7b43aa 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -99,11 +99,12 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
TrafficStatsUnit tmpStatsSet;
TrafficStatsUnit trafficStatsSet;
// Increment write reference count
- switchableUnits[getIndex()].refCnt.incValue();
+ WritableUnit selectedUnit = switchableUnits[getIndex()];
+ selectedUnit.refCnt.incValue();
try {
// Accumulate statistics information
ConcurrentHashMap<String, TrafficStatsUnit> tmpStatsSetMap =
- switchableUnits[getIndex()].statsUnitMap;
+ selectedUnit.statsUnitMap;
for (Entry<String, TrafficInfo> entry : trafficInfos.entrySet()) {
trafficStatsSet = tmpStatsSetMap.get(entry.getKey());
if (trafficStatsSet == null) {
@@ -118,18 +119,19 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
}
} finally {
// Decrement write reference count
- switchableUnits[getIndex()].refCnt.decValue();
+ selectedUnit.refCnt.decValue();
}
}
@Override
public void add(String statsKey, long msgCnt, long msgSize) {
// Increment write reference count
- switchableUnits[getIndex()].refCnt.incValue();
+ WritableUnit selectedUnit = switchableUnits[getIndex()];
+ selectedUnit.refCnt.incValue();
try {
// Accumulate statistics information
ConcurrentHashMap<String, TrafficStatsUnit> tmpStatsSetMap =
- switchableUnits[getIndex()].statsUnitMap;
+ selectedUnit.statsUnitMap;
TrafficStatsUnit trafficStatsSet = tmpStatsSetMap.get(statsKey);
if (trafficStatsSet == null) {
TrafficStatsUnit tmpStatsSet = new TrafficStatsUnit("msg_cnt", "msg_size", null);
@@ -141,7 +143,7 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
trafficStatsSet.addMsgCntAndSize(msgCnt, msgSize);
} finally {
// Decrement write reference count
- switchableUnits[getIndex()].refCnt.decValue();
+ selectedUnit.refCnt.decValue();
}
}
@@ -163,7 +165,7 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
break;
}
try {
- Thread.sleep(20);
+ Thread.sleep(2);
} catch (InterruptedException e) {
break;
}
@@ -172,8 +174,8 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
Map<String, TrafficStatsUnit> statsMap = selectedUnit.statsUnitMap;
for (Entry<String, TrafficStatsUnit> entry : statsMap.entrySet()) {
logger.info("{}#{}#{}#{}", statsCat, entry.getKey(),
- entry.getValue().msgCnt.getAndResetValue(),
- entry.getValue().msgSize.getAndResetValue());
+ entry.getValue().msgCnt.getValue(),
+ entry.getValue().msgSize.getValue());
}
statsMap.clear();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
index 9360f40..b20ddb0 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
@@ -62,7 +62,7 @@ public class WebCallStatsHolder {
if (switchWritingStatsUnit()) {
getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, statsMap);
} else {
- getValue(statsMap);
+ getStatsValue(switchableSets[getIndex()], false, statsMap);
}
}
@@ -70,7 +70,7 @@ public class WebCallStatsHolder {
if (switchWritingStatsUnit()) {
getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, strBuff);
} else {
- getValue(strBuff);
+ getStatsValue(switchableSets[getIndex()], false, strBuff);
}
}
@@ -135,15 +135,10 @@ public class WebCallStatsHolder {
statsMap.put("isClosed", (isManualClosed ? 1L : 0L));
if (resetValue) {
statsSet.totalCallStats.snapShort(statsMap, false);
- for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
- itemStats.snapShort(statsMap, false);
- }
} else {
statsSet.totalCallStats.getValue(statsMap, false);
- for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
- itemStats.getValue(statsMap, false);
- }
}
+ statsSet.getMethodStatsInfo(statsMap, resetValue);
}
private static void getStatsValue(WebCallStatsItemSet statsSet,
@@ -152,28 +147,14 @@ public class WebCallStatsHolder {
strBuff.append("{\"").append(statsSet.lstResetTime.getFullName())
.append("\":\"").append(statsSet.lstResetTime.getStrSinceTime())
.append("\",\"isClosed\":").append(isManualClosed).append(",");
- int totalcnt = 0;
if (resetValue) {
statsSet.totalCallStats.snapShort(strBuff, false);
- strBuff.append(",\"").append("methods\":{");
- for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
- if (totalcnt++ > 0) {
- strBuff.append(",");
- }
- itemStats.snapShort(strBuff, false);
- }
- strBuff.append("}}");
} else {
statsSet.totalCallStats.getValue(strBuff, false);
- strBuff.append(",\"").append("methods\":{");
- for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
- if (totalcnt++ > 0) {
- strBuff.append(",");
- }
- itemStats.getValue(strBuff, false);
- }
- strBuff.append("}}");
}
+ strBuff.append(",");
+ statsSet.getMethodStatsInfo(strBuff, resetValue);
+ strBuff.append("}");
}
/**
@@ -217,6 +198,45 @@ public class WebCallStatsHolder {
public void resetSinceTime() {
this.lstResetTime.reset();
}
+
+ /**
+ * Gets the method statistics information
+ *
+ * @param statsMap the statistics content contain
+ * @param resetValue whether reset value
+ */
+ public void getMethodStatsInfo(Map<String, Long> statsMap, boolean resetValue) {
+ for (SimpleHistogram itemStats : methodStatsMap.values()) {
+ itemStats.getValue(statsMap, false);
+ }
+ if (resetValue) {
+ methodStatsMap.clear();
+ }
+ }
+
+ /**
+ * Gets the method statistics information
+ *
+ * @param strBuff the statistics content contain
+ * @param resetValue whether reset value
+ */
+ public void getMethodStatsInfo(StringBuilder strBuff, boolean resetValue) {
+ int totalCnt = 0;
+ strBuff.append("\"").append("methods\":{");
+ for (SimpleHistogram itemStats : methodStatsMap.values()) {
+ if (itemStats == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ strBuff.append(",");
+ }
+ itemStats.getValue(strBuff, false);
+ }
+ strBuff.append("}");
+ if (resetValue) {
+ methodStatsMap.clear();
+ }
+ }
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
index a60483e..f6870fe 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
@@ -92,7 +92,7 @@ public class MasterSrvStatsHolder {
if (switchWritingStatsUnit()) {
getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, statsMap);
} else {
- getValue(statsMap);
+ getStatsValue(switchableSets[getIndex()], false, statsMap);
}
}
@@ -100,7 +100,7 @@ public class MasterSrvStatsHolder {
if (switchWritingStatsUnit()) {
getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], true, strBuff);
} else {
- getValue(strBuff);
+ getStatsValue(switchableSets[getIndex()], false, strBuff);
}
}
// metric set operate APIs end