You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/19 12:40:55 UTC
[incubator-inlong] branch master updated: [INLONG-2518][TubeMQ] Adjust the client's metric statistics logic (#2601)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c53b805 [INLONG-2518][TubeMQ] Adjust the client's metric statistics logic (#2601)
c53b805 is described below
commit c53b80542c999b3bd72886f3015a363e0f88adf5
Author: gosonzhang <46...@qq.com>
AuthorDate: Sat Feb 19 20:40:50 2022 +0800
[INLONG-2518][TubeMQ] Adjust the client's metric statistics logic (#2601)
---
.../inlong/tubemq/client/common/ClientMetrics.java | 308 --------------
.../tubemq/client/common/ClientStatsInfo.java | 455 +++++++++++++++++++++
.../tubemq/client/common/StatsOutputLevel.java | 55 +++
.../tubemq/client/common/TClientConstants.java | 16 +-
.../tubemq/client/config/TubeClientConfig.java | 54 +--
.../client/consumer/BaseMessageConsumer.java | 78 ++--
.../consumer/SimpleClientBalanceConsumer.java | 49 +--
.../client/consumer/SimplePullMessageConsumer.java | 2 +-
.../client/consumer/SimplePushMessageConsumer.java | 2 +-
.../tubemq/client/producer/MessageProducer.java | 3 -
.../tubemq/client/producer/ProducerManager.java | 42 +-
.../client/producer/SimpleMessageProducer.java | 9 +-
.../inlong/tubemq/corebase/TBaseConstants.java | 1 +
.../tubemq/corebase/metric/TrafficStatsUnit.java | 56 +++
.../inlong/tubemq/corebase/utils/TStringUtils.java | 24 ++
.../inlong/tubemq/corebase/utils/ThreadUtils.java | 4 +
.../server/broker/msgstore/MessageStore.java | 1 -
.../server/broker/msgstore/disk/MsgFileStore.java | 6 +-
.../server/broker/msgstore/mem/MsgMemStore.java | 4 +-
.../server/broker/stats/BrokerSrvStatsHolder.java | 4 +-
.../server/broker/stats/MsgStoreStatsHolder.java | 4 +-
.../server/broker/stats/TrafficStatsService.java | 50 +--
.../tubemq/server/common/TServerConstants.java | 3 +-
.../server/common/webbase/WebCallStatsHolder.java | 4 +-
.../server/master/stats/MasterSrvStatsHolder.java | 4 +-
25 files changed, 751 insertions(+), 487 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientMetrics.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientMetrics.java
deleted file mode 100644
index 2b21aca..0000000
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientMetrics.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.client.common;
-
-import java.time.Clock;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-import org.apache.inlong.tubemq.corebase.metric.AbsMetricItem;
-import org.apache.inlong.tubemq.corebase.metric.CountMetricItem;
-import org.apache.inlong.tubemq.corebase.metric.TimeDltMetricItem;
-import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ClientMetrics {
- private static final Logger logger =
- LoggerFactory.getLogger(ClientMetrics.class);
- private final boolean isProducer;
- private final String clientId;
- private final String logPrefix;
- private final TubeClientConfig clientConfig;
- private final AtomicLong lastMetricPrintTime =
- new AtomicLong(0);
- private final AtomicLong lastMetricResetTime =
- new AtomicLong(Clock.systemDefaultZone().millis());
- private final AtomicLong lastResetTime =
- new AtomicLong(Clock.systemDefaultZone().millis());
- // client 2 Master status statistics
- protected final AbsMetricItem regMasterCnt =
- new CountMetricItem("master_reg_cnt");
- protected final AbsMetricItem regMasterFailureCnt =
- new CountMetricItem("master_reg_failure_cnt");
- protected final AbsMetricItem masterNoNodeCnt =
- new CountMetricItem("master_timeout_cnt");
- protected final AbsMetricItem masterHBExceptionCnt =
- new CountMetricItem("hb_master_exception_cnt");
- // client 2 Broker status statistics
- protected final AbsMetricItem regBrokerCnt =
- new CountMetricItem("broker_reg_cnt");
- protected final AbsMetricItem regBrokerFailureCnt =
- new CountMetricItem("broker_reg_failure_cnt");
- protected final AbsMetricItem brokerNoNodeCnt =
- new CountMetricItem("broker_timeout_cnt");
- protected final AbsMetricItem brokerHBExceptionCnt =
- new CountMetricItem("hb_broker_exception_cnt");
- // send or get message statistic
- private final TimeDltMetricItem rpcCallMetric =
- new TimeDltMetricItem("rpc_call", true);
- // statistic count
- private final AbsMetricItem totalMsgCount =
- new CountMetricItem("msg_cnt");
- protected final AbsMetricItem totalMsgSize =
- new CountMetricItem("msg_size");
- // error code map
- private final AbsMetricItem errCodeBadRequest =
- new CountMetricItem("err_400");
- private final AbsMetricItem errCodeNotFound =
- new CountMetricItem("err_404");
- private final AbsMetricItem errCodeOccupied =
- new CountMetricItem("err_410");
- private final AbsMetricItem errCodeNoNode =
- new CountMetricItem("err_411");
- private final AbsMetricItem errCodeOverFlow =
- new CountMetricItem("err_419");
- private final AbsMetricItem errCodeInTopic =
- new CountMetricItem("err_425");
- private final AbsMetricItem errCodeInFilters =
- new CountMetricItem("err_426");
- private final AbsMetricItem errCodeInSourceCnt =
- new CountMetricItem("err_429");
- private final AbsMetricItem errCodeSrvUnavailable =
- new CountMetricItem("err_503");
- private final AbsMetricItem errCodeOther =
- new CountMetricItem("err_other");
- // consumer statistic
- private final TimeDltMetricItem procLatencyMetric =
- new TimeDltMetricItem("process_data", true);
-
- public ClientMetrics(boolean isProducer, String clientId,
- TubeClientConfig clientConfig) {
- this.isProducer = isProducer;
- this.clientId = clientId;
- this.clientConfig = clientConfig;
- StringBuilder strBuff = new StringBuilder(512);
- if (isProducer) {
- strBuff.append("[Producer");
- } else {
- strBuff.append("[Consumer");
- }
- this.logPrefix = strBuff.append(" Metrics]: ")
- .append("Client=").append(clientId).toString();
- }
-
- public void bookReg2Master(boolean isFailure) {
- this.regMasterCnt.incrementAndGet();
- if (isFailure) {
- this.regMasterFailureCnt.incrementAndGet();
- }
- }
-
- public void bookHB2MasterTimeout() {
- this.masterNoNodeCnt.incrementAndGet();
- }
-
- public void bookHB2MasterException() {
- this.masterHBExceptionCnt.incrementAndGet();
- }
-
- public void bookReg2Broker(boolean isFailure) {
- this.regBrokerCnt.incrementAndGet();
- if (isFailure) {
- this.regBrokerFailureCnt.incrementAndGet();
- }
- }
-
- public void bookHB2BrokerTimeout() {
- this.brokerNoNodeCnt.incrementAndGet();
- }
-
- public void bookHB2BrokerException() {
- this.brokerHBExceptionCnt.incrementAndGet();
- }
-
- public void bookConfirmDuration(long dltTime) {
- this.procLatencyMetric.updProcTimeDlt(dltTime);
- }
-
- public void bookSuccSendMsg(long dltTime, int msgSize) {
- rpcCallMetric.updProcTimeDlt(dltTime);
- totalMsgCount.incrementAndGet();
- totalMsgSize.addAndGet(msgSize);
- }
-
- public void bookSuccGetMsg(long dltTime, int msgCnt, int msgSize) {
- rpcCallMetric.updProcTimeDlt(dltTime);
- totalMsgCount.addAndGet(msgCnt);
- totalMsgSize.addAndGet(msgSize);
- }
-
- public void bookFailRpcCall(long dltTime, int errCode) {
- rpcCallMetric.updProcTimeDlt(dltTime);
- switch (errCode) {
- case TErrCodeConstants.BAD_REQUEST: {
- errCodeBadRequest.incrementAndGet();
- break;
- }
-
- case TErrCodeConstants.NOT_FOUND: {
- errCodeNotFound.incrementAndGet();
- break;
- }
-
- case TErrCodeConstants.PARTITION_OCCUPIED: {
- errCodeOccupied.incrementAndGet();
- break;
- }
-
- case TErrCodeConstants.HB_NO_NODE: {
- errCodeNoNode.incrementAndGet();
- break;
- }
-
- case TErrCodeConstants.SERVER_RECEIVE_OVERFLOW: {
- errCodeOverFlow.incrementAndGet();
- break;
- }
-
- case TErrCodeConstants.CLIENT_INCONSISTENT_TOPICSET: {
- errCodeInTopic.incrementAndGet();
- break;
- }
-
- case TErrCodeConstants.CLIENT_INCONSISTENT_FILTERSET: {
- errCodeInFilters.incrementAndGet();
- break;
- }
-
- case TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT: {
- errCodeInSourceCnt.incrementAndGet();
- break;
- }
-
- case TErrCodeConstants.SERVICE_UNAVAILABLE: {
- errCodeSrvUnavailable.incrementAndGet();
- break;
- }
-
- default: {
- errCodeOther.incrementAndGet();
- break;
- }
- }
- }
-
- /**
- * print Metric information to log file
- *
- * @param forcePrint whether force print metric information
- * @param strBuff string buffer
- */
- public void printMetricInfo(boolean forcePrint, StringBuilder strBuff) {
- if (!clientConfig.isEnableMetricPrint()) {
- return;
- }
- boolean needReset = false;
- long lstPrintTime = lastMetricPrintTime.get();
- long curChkTime = Clock.systemDefaultZone().millis();
- if (forcePrint || (curChkTime - lstPrintTime
- > clientConfig.getMetricInfoPrintPeriodMs())) {
- if (lastMetricPrintTime.compareAndSet(lstPrintTime, curChkTime)) {
- long lstResetTime = lastMetricResetTime.get();
- if ((curChkTime - lstResetTime
- > clientConfig.getMetricForcedResetPeriodMs())
- && lastMetricResetTime.compareAndSet(lstResetTime, curChkTime)) {
- needReset = true;
- }
- if (needReset) {
- strBuff.append(this.logPrefix).append(", reset value=");
- } else {
- strBuff.append(this.logPrefix).append(", value=");
- }
- getStrMetrics(strBuff, needReset);
- logger.info(strBuff.toString());
- strBuff.delete(0, strBuff.length());
- }
- }
- }
-
- /**
- * Get current data encapsulated by Json
- *
- * @param strBuff string buffer
- * @param resetValue whether to reset the current data
- */
- public void getStrMetrics(StringBuilder strBuff, boolean resetValue) {
- strBuff.append("{\"").append(regMasterCnt.getName()).append("\":")
- .append(regMasterCnt.getValue(resetValue)).append(",\"")
- .append(regMasterFailureCnt.getName()).append("\":")
- .append(regMasterFailureCnt.getValue(resetValue)).append(",\"")
- .append(masterNoNodeCnt.getName()).append("\":")
- .append(masterNoNodeCnt.getValue(resetValue)).append(",\"")
- .append(masterHBExceptionCnt.getName()).append("\":")
- .append(masterHBExceptionCnt.getValue(resetValue)).append(",\"")
- .append(totalMsgCount.getName()).append("\":")
- .append(totalMsgCount.getValue(resetValue)).append(",\"")
- .append(totalMsgSize.getName()).append("\":")
- .append(totalMsgSize.getValue(resetValue)).append(",");
- rpcCallMetric.getStrMetrics(strBuff, resetValue);
- strBuff.append(",\"err_rsp\":{\"")
- .append(errCodeBadRequest.getName()).append("\":")
- .append(errCodeBadRequest.getValue(resetValue)).append(",\"")
- .append(errCodeNotFound.getName()).append("\":")
- .append(errCodeNotFound.getValue(resetValue)).append(",\"")
- .append(errCodeOccupied.getName()).append("\":")
- .append(errCodeOccupied.getValue(resetValue)).append(",\"")
- .append(errCodeNoNode.getName()).append("\":")
- .append(errCodeNoNode.getValue(resetValue)).append(",\"")
- .append(errCodeOverFlow.getName()).append("\":")
- .append(errCodeOverFlow.getValue(resetValue)).append(",\"")
- .append(errCodeInTopic.getName()).append("\":")
- .append(errCodeInTopic.getValue(resetValue)).append(",\"")
- .append(errCodeInFilters.getName()).append("\":")
- .append(errCodeInFilters.getValue(resetValue)).append(",\"")
- .append(errCodeInSourceCnt.getName()).append("\":")
- .append(errCodeInSourceCnt.getValue(resetValue)).append(",\"")
- .append(errCodeSrvUnavailable.getName()).append("\":")
- .append(errCodeSrvUnavailable.getValue(resetValue)).append(",\"")
- .append(errCodeOther.getName()).append("\":")
- .append(errCodeOther.getValue(resetValue)).append("}");
- if (!isProducer) {
- strBuff.append(",\"").append(regBrokerCnt.getName()).append("\":")
- .append(regBrokerCnt.getValue(resetValue)).append(",\"")
- .append(regBrokerFailureCnt.getName()).append("\":")
- .append(regBrokerFailureCnt.getValue(resetValue)).append(",\"")
- .append(brokerNoNodeCnt.getName()).append("\":")
- .append(brokerNoNodeCnt.getValue(resetValue)).append(",\"")
- .append(brokerHBExceptionCnt.getName()).append("\":")
- .append(brokerHBExceptionCnt.getValue(resetValue)).append(",");
- procLatencyMetric.getStrMetrics(strBuff, resetValue);
- }
- if (resetValue) {
- long befTime = lastResetTime.getAndSet(Clock.systemDefaultZone().millis());
- strBuff.append(",\"last_reset_time\":\"")
- .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(befTime)).append("\"}");
- } else {
- strBuff.append(",\"last_reset_time\":\"")
- .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(lastResetTime.get()))
- .append("\"}");
- }
- }
-}
-
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
new file mode 100644
index 0000000..186da25
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+import java.time.Clock;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.metric.TrafficStatsUnit;
+import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClientStatsInfo {
+ private static final Logger logger =
+ LoggerFactory.getLogger(ClientStatsInfo.class);
+ private final boolean isProducer;
+ private final String logPrefix;
+ // switchable statistic items
+ private final ClientStatsItemSet[] switchableSets = new ClientStatsItemSet[2];
+ // current writable index
+ private final AtomicInteger writableIndex = new AtomicInteger(0);
+ // statistics self-print period, ms
+ private final long statsPrintPeriodMs;
+ // statistics force reset period, ms
+ private final long statsForcedResetPeriodMs;
+ // whether the statistic is closed
+ private volatile boolean isClosed = false;
+ // whether to self-print statistics
+ private volatile boolean isSelfPrint = true;
+ // last self-print time
+ private final AtomicLong lstSelfPrintTime = new AtomicLong(0);
+ // last snapshot time
+ private final AtomicLong lstSnapshotTime = new AtomicLong(0);
+
+ public ClientStatsInfo(boolean isProducer, String clientId,
+ boolean isSelfPrint, long statsPrintPeriodMs,
+ long statsForcedResetPeriodMs) {
+ this.isProducer = isProducer;
+ this.isSelfPrint = isSelfPrint;
+ this.statsPrintPeriodMs = statsPrintPeriodMs;
+ this.statsForcedResetPeriodMs = statsForcedResetPeriodMs;
+ StringBuilder strBuff = new StringBuilder(512);
+ if (isProducer) {
+ strBuff.append("[Producer");
+ } else {
+ strBuff.append("[Consumer");
+ }
+ this.logPrefix = strBuff.append(" Stats]: ")
+ .append("Client=").append(clientId).toString();
+ this.switchableSets[0] = new ClientStatsItemSet();
+ this.switchableSets[1] = new ClientStatsItemSet();
+ }
+
+ public synchronized void setStatsStatus(boolean enableStats) {
+ this.isClosed = !enableStats;
+ }
+
+ public boolean isStatsClosed() {
+ return this.isClosed;
+ }
+
+ public boolean isStatsSelfPrint() {
+ return this.isSelfPrint;
+ }
+
+ public void bookReg2Master(boolean isFailure) {
+ if (isClosed) {
+ return;
+ }
+ switchableSets[getIndex()].regMasterCnt.incValue();
+ if (isFailure) {
+ switchableSets[getIndex()].regMasterFailCnt.incValue();
+ }
+ }
+
+ public void bookHB2MasterTimeout() {
+ if (isClosed) {
+ return;
+ }
+ switchableSets[getIndex()].regMasterTimoutCnt.incValue();
+ }
+
+ public void bookHB2MasterException() {
+ if (isClosed) {
+ return;
+ }
+ switchableSets[getIndex()].hbMasterExcCnt.incValue();
+ }
+
+ public void bookReg2Broker(boolean isFailure) {
+ if (isClosed) {
+ return;
+ }
+ switchableSets[getIndex()].regBrokerCnt.incValue();
+ if (isFailure) {
+ switchableSets[getIndex()].regBrokerFailCnt.incValue();
+ }
+ }
+
+ public void bookHB2BrokerTimeout() {
+ if (isClosed) {
+ return;
+ }
+ switchableSets[getIndex()].regBrokerTimoutCnt.incValue();
+ }
+
+ public void bookHB2BrokerException() {
+ if (isClosed) {
+ return;
+ }
+ switchableSets[getIndex()].hbBrokerExcCnt.incValue();
+ }
+
+ public void bookConfirmDuration(long dltTime) {
+ if (isClosed) {
+ return;
+ }
+ switchableSets[getIndex()].csmLatencyStats.update(dltTime);
+ }
+
+ public void bookSuccSendMsg(long dltTime, String topicName, int msgSize) {
+ if (isClosed) {
+ return;
+ }
+ sendOrRecvMsg(topicName, dltTime, 1, msgSize);
+ bookFailRpcCall(TErrCodeConstants.SUCCESS);
+ }
+
+ public void bookSuccGetMsg(long dltTime, String topicName, int msgCnt, int msgSize) {
+ if (isClosed) {
+ return;
+ }
+ sendOrRecvMsg(topicName, dltTime, msgCnt, msgSize);
+ bookFailRpcCall(TErrCodeConstants.SUCCESS);
+ }
+
+ public void bookFailRpcCall(int errCode) {
+ if (isClosed) {
+ return;
+ }
+ // accumulate msg count by errcode
+ ClientStatsItemSet curItemSet = switchableSets[getIndex()];
+ LongStatsCounter curItemCounter = curItemSet.errRspStatsMap.get(errCode);
+ if (curItemCounter == null) {
+ LongStatsCounter tmpCounter = new LongStatsCounter(
+ "err_" + errCode, "");
+ curItemCounter = curItemSet.errRspStatsMap.putIfAbsent(errCode, tmpCounter);
+ if (curItemCounter == null) {
+ curItemCounter = tmpCounter;
+ }
+ }
+ curItemCounter.incValue();
+ }
+
+ /**
+ * Self print statistics information to log file
+ *
+ * @param forcePrint whether force print statistics information
+ * @param strBuff string buffer
+ */
+ public void selfPrintStatsInfo(boolean forcePrint, StringBuilder strBuff) {
+ if (this.isClosed || !this.isSelfPrint) {
+ return;
+ }
+ long lstPrintTime = lstSelfPrintTime.get();
+ long curChkTime = Clock.systemDefaultZone().millis();
+ if (forcePrint || (curChkTime - lstPrintTime > this.statsPrintPeriodMs)) {
+ if (lstSelfPrintTime.compareAndSet(lstPrintTime, curChkTime)) {
+ if (switchWritingStatsUnit(false)) {
+ strBuff.append(this.logPrefix).append(", reset value=");
+ getStatsInfo(switchableSets[getIndex(writableIndex.get() - 1)],
+ strBuff, StatsOutputLevel.FULL, false);
+ } else {
+ strBuff.append(this.logPrefix).append(", value=");
+ getStatsInfo(switchableSets[getIndex()],
+ strBuff, StatsOutputLevel.FULL, false);
+ }
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ }
+ }
+
+ // private functions
+ private boolean switchWritingStatsUnit(boolean needReset) {
+ long lstResetTime = lstSnapshotTime.get();
+ long checkDltTime = System.currentTimeMillis() - lstResetTime;
+ if (((needReset && (checkDltTime > TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS))
+ || (checkDltTime > this.statsForcedResetPeriodMs))
+ && lstSnapshotTime.compareAndSet(lstResetTime, System.currentTimeMillis())) {
+ switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Get current data encapsulated by Json format
+ *
+ * @param strBuff string buffer
+ * @param outputLevel the output level of statistics
+ * @param resetValue whether to reset the current data
+ */
+ private void getStatsInfo(ClientStatsItemSet statsSet, StringBuilder strBuff,
+ StatsOutputLevel outputLevel, boolean resetValue) {
+ int totalCnt = 0;
+ strBuff.append("{\"").append(statsSet.resetTime.getFullName())
+ .append("\":\"").append(statsSet.resetTime.getStrSinceTime())
+ .append("\",\"probe_time\":\"")
+ .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()))
+ .append("\"");
+ if (resetValue) {
+ strBuff.append(",\"").append(statsSet.totalTrafficStats.msgCnt.getFullName())
+ .append("\":").append(statsSet.totalTrafficStats.msgCnt.getAndResetValue())
+ .append(",\"").append(statsSet.totalTrafficStats.msgSize.getFullName())
+ .append("\":").append(statsSet.totalTrafficStats.msgSize.getAndResetValue())
+ .append(",\"traffic_details\":{");
+ for (Map.Entry<String, TrafficStatsUnit> entry
+ : statsSet.topicTrafficMap.entrySet()) {
+ if (entry == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ strBuff.append(",");
+ }
+ strBuff.append("\"").append(entry.getKey()).append("\":{\"")
+ .append(entry.getValue().msgCnt.getShortName()).append("\":")
+ .append(entry.getValue().msgCnt.getAndResetValue()).append(",\"")
+ .append(entry.getValue().msgSize.getShortName()).append("\":")
+ .append(entry.getValue().msgCnt.getAndResetValue()).append("}");
+ }
+ strBuff.append("}");
+ if (outputLevel != StatsOutputLevel.SIMPLEST) {
+ strBuff.append(",");
+ statsSet.msgCallDltStats.snapShort(strBuff, false);
+ if (!isProducer) {
+ strBuff.append(",");
+ statsSet.csmLatencyStats.snapShort(strBuff, false);
+ }
+ strBuff.append(",\"rsp_details\":{");
+ totalCnt = 0;
+ for (LongStatsCounter statsCounter : statsSet.errRspStatsMap.values()) {
+ if (statsCounter == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ strBuff.append(",");
+ }
+ strBuff.append("\"").append(statsCounter.getFullName()).append("\":")
+ .append(statsCounter.getAndResetValue());
+ }
+ strBuff.append("}");
+ }
+ if (outputLevel == StatsOutputLevel.FULL) {
+ strBuff.append(",\"").append(statsSet.regMasterCnt.getFullName())
+ .append("\":").append(statsSet.regMasterCnt.getAndResetValue())
+ .append(",\"").append(statsSet.regMasterFailCnt.getFullName())
+ .append("\":").append(statsSet.regMasterFailCnt.getAndResetValue())
+ .append(",\"").append(statsSet.regMasterTimoutCnt.getFullName())
+ .append("\":").append(statsSet.regMasterTimoutCnt.getAndResetValue())
+ .append(",\"").append(statsSet.hbMasterExcCnt.getFullName())
+ .append("\":").append(statsSet.hbMasterExcCnt.getAndResetValue())
+ .append(",\"").append(statsSet.regBrokerCnt.getFullName())
+ .append("\":").append(statsSet.regBrokerCnt.getAndResetValue())
+ .append(",\"").append(statsSet.regBrokerFailCnt.getFullName())
+ .append("\":").append(statsSet.regBrokerFailCnt.getAndResetValue())
+ .append(",\"").append(statsSet.regBrokerTimoutCnt.getFullName())
+ .append("\":").append(statsSet.regBrokerTimoutCnt.getAndResetValue())
+ .append(",\"").append(statsSet.hbBrokerExcCnt.getFullName())
+ .append("\":").append(statsSet.hbBrokerExcCnt.getAndResetValue());
+ }
+ strBuff.append("}");
+ } else {
+ strBuff.append(",\"").append(statsSet.totalTrafficStats.msgCnt.getFullName())
+ .append("\":").append(statsSet.totalTrafficStats.msgCnt.getValue())
+ .append(",\"").append(statsSet.totalTrafficStats.msgSize.getFullName())
+ .append("\":").append(statsSet.totalTrafficStats.msgSize.getValue())
+ .append(",\"traffic_details\":{");
+ for (Map.Entry<String, TrafficStatsUnit> entry
+ : statsSet.topicTrafficMap.entrySet()) {
+ if (entry == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ strBuff.append(",");
+ }
+ strBuff.append("\"").append(entry.getKey()).append("\":{\"")
+ .append(entry.getValue().msgCnt.getShortName()).append("\":")
+ .append(entry.getValue().msgCnt.getValue()).append(",\"")
+ .append(entry.getValue().msgSize.getShortName()).append("\":")
+ .append(entry.getValue().msgCnt.getValue()).append("}");
+ }
+ strBuff.append("}");
+ if (outputLevel != StatsOutputLevel.SIMPLEST) {
+ strBuff.append(",");
+ statsSet.msgCallDltStats.getValue(strBuff, false);
+ if (!isProducer) {
+ strBuff.append(",");
+ statsSet.csmLatencyStats.getValue(strBuff, false);
+ }
+ strBuff.append(",\"rsp_details\":{");
+ totalCnt = 0;
+ for (LongStatsCounter statsCounter : statsSet.errRspStatsMap.values()) {
+ if (statsCounter == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ strBuff.append(",");
+ }
+ strBuff.append("\"").append(statsCounter.getFullName()).append("\":")
+ .append(statsCounter.getValue());
+ }
+ strBuff.append("}");
+ }
+ if (outputLevel == StatsOutputLevel.FULL) {
+ strBuff.append(",\"").append(statsSet.regMasterCnt.getFullName())
+ .append("\":").append(statsSet.regMasterCnt.getValue())
+ .append(",\"").append(statsSet.regMasterFailCnt.getFullName())
+ .append("\":").append(statsSet.regMasterFailCnt.getValue())
+ .append(",\"").append(statsSet.regMasterTimoutCnt.getFullName())
+ .append("\":").append(statsSet.regMasterTimoutCnt.getValue())
+ .append(",\"").append(statsSet.hbMasterExcCnt.getFullName())
+ .append("\":").append(statsSet.hbMasterExcCnt.getValue())
+ .append(",\"").append(statsSet.regBrokerCnt.getFullName())
+ .append("\":").append(statsSet.regBrokerCnt.getValue())
+ .append(",\"").append(statsSet.regBrokerFailCnt.getFullName())
+ .append("\":").append(statsSet.regBrokerFailCnt.getValue())
+ .append(",\"").append(statsSet.regBrokerTimoutCnt.getFullName())
+ .append("\":").append(statsSet.regBrokerTimoutCnt.getValue())
+ .append(",\"").append(statsSet.hbBrokerExcCnt.getFullName())
+ .append("\":").append(statsSet.hbBrokerExcCnt.getValue());
+ }
+ strBuff.append("}");
+ }
+ }
+
+ /**
+ * Accumulate sent or received message information
+ *
+ * @param topic the topic name
+ * @param dltTime the latency
+ * @param msgCnt the message count
+ * @param msgSize the message size
+ */
+ private void sendOrRecvMsg(String topic, long dltTime,
+ int msgCnt, int msgSize) {
+ ClientStatsItemSet curItemSet = switchableSets[getIndex()];
+ curItemSet.msgCallDltStats.update(dltTime);
+ curItemSet.totalTrafficStats.addMsgCntAndSize(msgCnt, msgSize);
+ // accumulate traffic information by topic
+ TrafficStatsUnit curStatsUnit = curItemSet.topicTrafficMap.get(topic);
+ if (curStatsUnit == null) {
+ TrafficStatsUnit tmpUnit =
+ new TrafficStatsUnit("msg_cnt", "msg_size", topic);
+ curStatsUnit = curItemSet.topicTrafficMap.putIfAbsent(topic, tmpUnit);
+ if (curStatsUnit == null) {
+ curStatsUnit = tmpUnit;
+ }
+ }
+ curStatsUnit.addMsgCntAndSize(msgCnt, msgSize);
+ }
+
+ /**
+ * Get current writable block index.
+ *
+ * @return the writable block index
+ */
+ private int getIndex() {
+ return getIndex(writableIndex.get());
+ }
+
+ /**
+ * Gets the metric block index based on the specified value.
+ *
+ * @param origIndex the specified value
+ * @return the metric block index
+ */
+ private int getIndex(int origIndex) {
+ return Math.abs(origIndex % 2);
+ }
+
+ /**
+ * ClientStatsItemSet, Client related statistics set
+ *
+ */
+ private static class ClientStatsItemSet {
+ // The reset time of statistics set
+ protected final SinceTime resetTime =
+ new SinceTime("reset_time", null);
+ // received or sent message traffic statistic
+ protected final TrafficStatsUnit totalTrafficStats =
+ new TrafficStatsUnit("msg_cnt", "msg_size", "total");
+ // topic-based traffic statistics
+ protected final ConcurrentHashMap<String, TrafficStatsUnit> topicTrafficMap =
+ new ConcurrentHashMap<>();
+ // time consumption statistics for sending or receiving messages
+ protected final ESTHistogram msgCallDltStats =
+ new ESTHistogram("msg_call_dlt", "");
+ // statistics on consumption transaction time
+ protected final ESTHistogram csmLatencyStats =
+ new ESTHistogram("csm_latency_dlt", "");
+ // error response distribution statistics
+ protected final ConcurrentHashMap<Integer, LongStatsCounter> errRspStatsMap =
+ new ConcurrentHashMap<>();
+ // client 2 Master status statistics
+ protected final LongStatsCounter regMasterCnt =
+ new LongStatsCounter("reg_master_cnt", null);
+ protected final LongStatsCounter regMasterFailCnt =
+ new LongStatsCounter("reg_master_fail", null);
+ protected final LongStatsCounter regMasterTimoutCnt =
+ new LongStatsCounter("reg_master_timeout", null);
+ protected final LongStatsCounter hbMasterExcCnt =
+ new LongStatsCounter("hb_master_exception", null);
+ // client 2 Broker status statistics
+ protected final LongStatsCounter regBrokerCnt =
+ new LongStatsCounter("reg_broker_cnt", null);
+ protected final LongStatsCounter regBrokerFailCnt =
+ new LongStatsCounter("reg_broker_fail", null);
+ protected final LongStatsCounter regBrokerTimoutCnt =
+ new LongStatsCounter("reg_broker_timeout", null);
+ protected final LongStatsCounter hbBrokerExcCnt =
+ new LongStatsCounter("hb_broker_exception", null);
+
+ public ClientStatsItemSet() {
+ resetSinceTime();
+ }
+
+ public void resetSinceTime() {
+ this.resetTime.reset();
+ }
+ }
+}
+
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java
new file mode 100644
index 0000000..e45f7f4
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/StatsOutputLevel.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+public enum StatsOutputLevel {
+ SIMPLEST(0, "simplest", "Simplest statistics output"),
+ MEDIUM(1, "medium", "Medium statistics output"),
+ FULL(2, "full", "Full statistics output");
+
+ StatsOutputLevel(int id, String name, String desc) {
+ this.id = id;
+ this.name = name;
+ this.desc = desc;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public static StatsOutputLevel valueOf(int value) {
+ for (StatsOutputLevel outputLevel : StatsOutputLevel.values()) {
+ if (outputLevel.getId() == value) {
+ return outputLevel;
+ }
+ }
+ return SIMPLEST;
+ }
+
+ private final int id;
+ private final String name;
+ private final String desc;
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
index 7b5b8f1..0269af2 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
@@ -40,13 +40,13 @@ public class TClientConstants {
public static final long CFG_DEFAULT_META_QUERY_WAIT_PERIOD_MS = 10000L;
public static final long CFG_MIN_META_QUERY_WAIT_PERIOD_MS = 5000L;
- // client metric information print period
- public static final long METRIC_PRINT_DEFAULT_PERIOD_MS = 3 * 1000 * 60;
- public static final long METRIC_PRINT_MIN_PERIOD_MS = 3 * 1000 * 60;
- public static final long METRIC_PRINT_MAX_PERIOD_MS = 60 * 1000 * 60;
+ // client statistics information print period
+ public static final long STATS_SELF_PRINT_DEFAULT_PERIOD_MS = 3 * 1000 * 60;
+ public static final long STATS_SELF_PRINT_MIN_PERIOD_MS = 1000 * 60;
+ public static final long STATS_SELF_PRINT_MAX_PERIOD_MS = 60 * 1000 * 60;
- // client metric information print period
- public static final long METRIC_RESET_DEFAULT_PERIOD_MS = 30 * 60 * 1000;
- public static final long METRIC_RESET_MIN_PERIOD_MS = 30 * 60 * 1000;
- public static final long METRIC_RESET_MAX_PERIOD_MS = 24 * 3600 * 1000;
+ // client statistics information print period
+ public static final long STATS_AUTO_RESET_DEFAULT_PERIOD_MS = 30 * 60 * 1000;
+ public static final long STATS_AUTO_RESET_MIN_PERIOD_MS = 60 * 1000;
+ public static final long STATS_AUTO_RESET_MAX_PERIOD_MS = 24 * 3600 * 1000;
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
index aaef618..f06a4c0 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/TubeClientConfig.java
@@ -56,11 +56,11 @@ public class TubeClientConfig {
// Link statistic check duration in ms.
private long linkStatsDurationMs = RpcConstants.CFG_LQ_STATS_DURATION_MS;
// Enable metric information print
- private boolean enableMetricPrint = true;
+ private boolean enableStatsSelfPrint = true;
// Metric print period in ms.
- private long metricInfoPrintPeriodMs = TClientConstants.METRIC_PRINT_DEFAULT_PERIOD_MS;
+ private long statsSelfPrintPeriodMs = TClientConstants.STATS_SELF_PRINT_DEFAULT_PERIOD_MS;
// Metric reset value period in ms.
- private long metricForcedResetPeriodMs = TClientConstants.METRIC_RESET_DEFAULT_PERIOD_MS;
+ private long statsForcedResetPeriodMs = TClientConstants.STATS_AUTO_RESET_DEFAULT_PERIOD_MS;
// The following 5 configuration parameters are used in broker exception process.
//
@@ -469,34 +469,34 @@ public class TubeClientConfig {
return usrPassWord;
}
- public boolean isEnableMetricPrint() {
- return enableMetricPrint;
+ public boolean enableStatsSelfPrint() {
+ return enableStatsSelfPrint;
}
- public void setEnableMetricPrint(boolean enableMetricPrint) {
- this.enableMetricPrint = enableMetricPrint;
+ public void setStatsSelfPrint(boolean enableStatsSelfPrint) {
+ this.enableStatsSelfPrint = enableStatsSelfPrint;
}
- public long getMetricInfoPrintPeriodMs() {
- return metricInfoPrintPeriodMs;
+ public long getStatsSelfPrintPeriodMs() {
+ return statsSelfPrintPeriodMs;
}
- public void setMetricInfoPrintPeriodMs(long metricInfoPrintPeriodMs) {
- this.metricInfoPrintPeriodMs =
- MixedUtils.mid(metricInfoPrintPeriodMs,
- TClientConstants.METRIC_PRINT_MIN_PERIOD_MS,
- TClientConstants.METRIC_PRINT_MAX_PERIOD_MS);
+ public void setStatsSelfPrintPeriodMs(long statsSelfPrintPeriodMs) {
+ this.statsSelfPrintPeriodMs =
+ MixedUtils.mid(statsSelfPrintPeriodMs,
+ TClientConstants.STATS_SELF_PRINT_MIN_PERIOD_MS,
+ TClientConstants.STATS_SELF_PRINT_MAX_PERIOD_MS);
}
- public long getMetricForcedResetPeriodMs() {
- return metricForcedResetPeriodMs;
+ public long getStatsForcedResetPeriodMs() {
+ return statsForcedResetPeriodMs;
}
- public void setMetricForcedResetPeriodMs(long metricForcedResetPeriodMs) {
- this.metricForcedResetPeriodMs =
- MixedUtils.mid(metricForcedResetPeriodMs,
- TClientConstants.METRIC_RESET_MIN_PERIOD_MS,
- TClientConstants.METRIC_RESET_MAX_PERIOD_MS);
+ public void setStatsForcedResetPeriodMs(long statsForcedResetPeriodMs) {
+ this.statsForcedResetPeriodMs =
+ MixedUtils.mid(statsForcedResetPeriodMs,
+ TClientConstants.STATS_AUTO_RESET_MIN_PERIOD_MS,
+ TClientConstants.STATS_AUTO_RESET_MAX_PERIOD_MS);
}
@Override
@@ -591,13 +591,13 @@ public class TubeClientConfig {
if (!this.tlsConfig.equals(that.tlsConfig)) {
return false;
}
- if (this.enableMetricPrint != that.enableMetricPrint) {
+ if (this.enableStatsSelfPrint != that.enableStatsSelfPrint) {
return false;
}
- if (this.metricInfoPrintPeriodMs != that.metricInfoPrintPeriodMs) {
+ if (this.statsSelfPrintPeriodMs != that.statsSelfPrintPeriodMs) {
return false;
}
- if (this.metricForcedResetPeriodMs != that.metricForcedResetPeriodMs) {
+ if (this.statsForcedResetPeriodMs != that.statsForcedResetPeriodMs) {
return false;
}
return masterInfo.equals(that.masterInfo);
@@ -648,9 +648,9 @@ public class TubeClientConfig {
.append(",\"sessionMaxAllowedDelayedMsgCount\":").append(this.sessionMaxAllowedDelayedMsgCount)
.append(",\"unAvailableFbdDurationMs\":").append(this.unAvailableFbdDurationMs)
.append(",\"enableUserAuthentic\":").append(this.enableUserAuthentic)
- .append(",\"enableMetricPrint\":").append(this.enableMetricPrint)
- .append(",\"metricInfoPrintPeriodMs\":").append(this.metricInfoPrintPeriodMs)
- .append(",\"metricResetPeriodMs\":").append(this.metricForcedResetPeriodMs)
+ .append(",\"enableStatsSelfPrint\":").append(this.enableStatsSelfPrint)
+ .append(",\"statsSelfPrintPeriodMs\":").append(this.statsSelfPrintPeriodMs)
+ .append(",\"statsForcedResetPeriodMs\":").append(this.statsForcedResetPeriodMs)
.append(",\"usrName\":\"").append(this.usrName)
.append("\",\"usrPassWord\":\"").append(this.usrPassWord)
.append("\",\"localAddress\":\"").append(localAddress)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index 744bfd9..5571507 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.inlong.tubemq.client.common.ClientMetrics;
+import org.apache.inlong.tubemq.client.common.ClientStatsInfo;
import org.apache.inlong.tubemq.client.common.TubeClientVersion;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -96,15 +96,16 @@ public class BaseMessageConsumer implements MessageConsumer {
private final ConsumerSamplePrint samplePrintCtrl =
new ConsumerSamplePrint();
private final RpcConfig rpcConfig = new RpcConfig();
- private AtomicLong visitToken = new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
- private AtomicReference<String> authAuthorizedTokenRef =
+ private final AtomicLong visitToken =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private final AtomicReference<String> authAuthorizedTokenRef =
new AtomicReference<>("");
- private ClientAuthenticateHandler authenticateHandler =
+ private final ClientAuthenticateHandler authenticateHandler =
new SimpleClientAuthenticateHandler();
private Thread heartBeatThread2Broker;
- private AtomicBoolean isShutdown = new AtomicBoolean(false);
- private AtomicBoolean isRebalanceStopped = new AtomicBoolean(false);
- private AtomicBoolean isFirst = new AtomicBoolean(true);
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private final AtomicBoolean isRebalanceStopped = new AtomicBoolean(false);
+ private final AtomicBoolean isFirst = new AtomicBoolean(true);
private int heartbeatRetryTimes = 0;
// Status:
// -1: Unsubscribed
@@ -116,7 +117,7 @@ public class BaseMessageConsumer implements MessageConsumer {
private int rebalanceRetryTimes = 0;
private long lastHeartbeatTime2Master = 0;
private long lastHeartbeatTime2Broker = 0;
- protected final ClientMetrics clientMetrics;
+ protected final ClientStatsInfo clientStatsInfo;
/**
* Construct a BaseMessageConsumer object.
@@ -143,8 +144,11 @@ public class BaseMessageConsumer implements MessageConsumer {
} catch (Exception e) {
throw new TubeClientException("Get consumer id failed!", e);
}
- this.clientMetrics =
- new ClientMetrics(false, this.consumerId, this.consumerConfig);
+ this.clientStatsInfo =
+ new ClientStatsInfo(false, this.consumerId,
+ this.consumerConfig.enableStatsSelfPrint(),
+ this.consumerConfig.getStatsSelfPrintPeriodMs(),
+ this.consumerConfig.getStatsForcedResetPeriodMs());
this.rmtDataCache =
new RmtDataCache(this.consumerConfig, null);
this.rpcServiceFactory =
@@ -454,7 +458,7 @@ public class BaseMessageConsumer implements MessageConsumer {
//
}
}
- clientMetrics.printMetricInfo(true, strBuffer);
+ clientStatsInfo.selfPrintStatsInfo(true, strBuffer);
logger.info(strBuffer
.append("[SHUTDOWN_CONSUMER] Partitions unregistered, consumer :")
.append(this.consumerId).toString());
@@ -828,7 +832,7 @@ public class BaseMessageConsumer implements MessageConsumer {
.consumerRegisterC2B(createBrokerRegisterRequest(partition),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
if (responseB2C != null && responseB2C.getSuccess()) {
- clientMetrics.bookReg2Broker(false);
+ clientStatsInfo.bookReg2Broker(false);
long currOffset =
responseB2C.hasCurrOffset() ? responseB2C.getCurrOffset()
: TBaseConstants.META_VALUE_UNDEFINED;
@@ -842,7 +846,7 @@ public class BaseMessageConsumer implements MessageConsumer {
.append(partition.toString()).toString());
strBuffer.delete(0, strBuffer.length());
} else {
- clientMetrics.bookReg2Broker(true);
+ clientStatsInfo.bookReg2Broker(true);
if (responseB2C == null) {
logger.warn(strBuffer.append("register2broker error! ")
.append(retryTimesRegister2Broker).append(" register ")
@@ -1208,9 +1212,7 @@ public class BaseMessageConsumer implements MessageConsumer {
partition, taskContext.isLastConsumed()),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
} catch (Throwable ee) {
- clientMetrics.bookFailRpcCall(
- System.currentTimeMillis() - startTime,
- TErrCodeConstants.UNSPECIFIED_ABNORMAL);
+ clientStatsInfo.bookFailRpcCall(TErrCodeConstants.UNSPECIFIED_ABNORMAL);
// Process the exception
rmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
taskContext.setFailProcessResult(400, strBuffer
@@ -1221,7 +1223,7 @@ public class BaseMessageConsumer implements MessageConsumer {
}
long dltTime = System.currentTimeMillis() - startTime;
if (msgRspB2C == null) {
- clientMetrics.bookFailRpcCall(dltTime, TErrCodeConstants.INTERNAL_SERVER_ERROR);
+ clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
rmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
taskContext.setFailProcessResult(500, "Get message null");
return taskContext;
@@ -1278,7 +1280,7 @@ public class BaseMessageConsumer implements MessageConsumer {
strBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
.append(taskContext.getUsedToken()).toString(), messageList, maxOffset);
strBuffer.delete(0, strBuffer.length());
- clientMetrics.bookSuccGetMsg(dltTime, msgCount, msgSize);
+ clientStatsInfo.bookSuccGetMsg(dltTime, topic, msgCount, msgSize);
break;
}
case TErrCodeConstants.HB_NO_NODE:
@@ -1339,13 +1341,11 @@ public class BaseMessageConsumer implements MessageConsumer {
}
}
if (msgRspB2C.getErrCode() != TErrCodeConstants.SUCCESS) {
- clientMetrics.bookFailRpcCall(dltTime, msgRspB2C.getErrCode());
+ clientStatsInfo.bookFailRpcCall(msgRspB2C.getErrCode());
}
return taskContext;
} catch (Throwable ee) {
- clientMetrics.bookFailRpcCall(
- System.currentTimeMillis() - startTime,
- TErrCodeConstants.INTERNAL_SERVER_ERROR);
+ clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
logger.error("Process response code error", ee);
rmtDataCache.succRspRelease(partitionKey, topic,
taskContext.getUsedToken(), false, isFilterConsume(topic),
@@ -1422,7 +1422,7 @@ public class BaseMessageConsumer implements MessageConsumer {
rmtDataCache.resumeTimeoutConsumePartitions(isPullConsume,
consumerConfig.getPullProtectConfirmTimeoutMs());
// print metric information
- clientMetrics.printMetricInfo(false, strBuffer);
+ clientStatsInfo.selfPrintStatsInfo(false, strBuffer);
// Fetch the rebalance result, construct message adn return it.
ConsumerEvent event = rebalanceResults.poll();
List<SubscribeInfo> subInfoList = null;
@@ -1442,7 +1442,7 @@ public class BaseMessageConsumer implements MessageConsumer {
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
// Process unsuccessful response
if (response == null) {
- clientMetrics.bookHB2MasterTimeout();
+ clientStatsInfo.bookHB2MasterTimeout();
logger.error(strBuffer.append("[Heartbeat Failed] ")
.append("return result is null!").toString());
heartbeatRetryTimes++;
@@ -1451,7 +1451,7 @@ public class BaseMessageConsumer implements MessageConsumer {
if (!response.getSuccess()) {
// If master replies that cannot find current consumer node, re-register
if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
- clientMetrics.bookHB2MasterTimeout();
+ clientStatsInfo.bookHB2MasterTimeout();
try {
ClientMaster.RegisterResponseM2C regResponse =
masterService.consumerRegisterC2M(createMasterRegisterRequest(),
@@ -1493,7 +1493,7 @@ public class BaseMessageConsumer implements MessageConsumer {
}
return;
}
- clientMetrics.bookHB2MasterException();
+ clientStatsInfo.bookHB2MasterException();
logger.error(strBuffer.append("[Heartbeat Failed] ")
.append(response.getErrMsg()).toString());
if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
@@ -1599,8 +1599,11 @@ public class BaseMessageConsumer implements MessageConsumer {
createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
// When response is success
- if (heartBeatResponseV2 != null
- && heartBeatResponseV2.getSuccess()) {
+ if (heartBeatResponseV2 == null) {
+ clientStatsInfo.bookHB2BrokerTimeout();
+ continue;
+ }
+ if (heartBeatResponseV2.getSuccess()) {
// If the peer require authentication, set a flag.
// The following request will attach the auth information.
rmtDataCache.bookBrokerRequireAuthInfo(
@@ -1644,19 +1647,18 @@ public class BaseMessageConsumer implements MessageConsumer {
}
}
}
- }
- if (heartBeatResponseV2 != null) {
- clientMetrics.bookHB2BrokerException();
+ } else {
+ clientStatsInfo.bookHB2BrokerException();
if (heartBeatResponseV2.getErrCode()
- == TErrCodeConstants.CERTIFICATE_FAILURE) {
+ == TErrCodeConstants.CERTIFICATE_FAILURE) {
for (Partition partition : partitions) {
removePartition(partition);
}
logger.warn(strBuffer
- .append("[heart2broker error] certificate failure, ")
- .append(brokerInfo.getBrokerStrInfo())
- .append("'s partitions area released, ")
- .append(heartBeatResponseV2.getErrMsg()).toString());
+ .append("[heart2broker error] certificate failure, ")
+ .append(brokerInfo.getBrokerStrInfo())
+ .append("'s partitions area released, ")
+ .append(heartBeatResponseV2.getErrMsg()).toString());
strBuffer.delete(0, strBuffer.length());
}
}
@@ -1665,7 +1667,7 @@ public class BaseMessageConsumer implements MessageConsumer {
// If there's error in the heartbeat, collect the log and print out.
// Release the log string buffer.
if (!isShutdown()) {
- clientMetrics.bookHB2BrokerException();
+ clientStatsInfo.bookHB2BrokerException();
samplePrintCtrl.printExceptionCaught(ee);
if (!partStrSet.isEmpty()) {
strBuffer.delete(0, strBuffer.length());
@@ -1685,7 +1687,7 @@ public class BaseMessageConsumer implements MessageConsumer {
lastHeartbeatTime2Broker = System.currentTimeMillis();
Thread.sleep(consumerConfig.getHeartbeatPeriodMs());
} catch (Throwable e) {
- clientMetrics.bookHB2BrokerException();
+ clientStatsInfo.bookHB2BrokerException();
lastHeartbeatTime2Broker = System.currentTimeMillis();
if (!isShutdown()) {
logger.error("heartbeat thread error 3 : ", e);
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
index 12a10f9..8ac51fa 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.inlong.tubemq.client.common.ClientMetrics;
+import org.apache.inlong.tubemq.client.common.ClientStatsInfo;
import org.apache.inlong.tubemq.client.common.ConfirmResult;
import org.apache.inlong.tubemq.client.common.ConsumeResult;
import org.apache.inlong.tubemq.client.common.QueryMetaResult;
@@ -108,7 +108,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
private long lastHeartbeatTime2Broker = 0;
private final ConcurrentHashMap<String, Long> partRegFreqCtrlMap =
new ConcurrentHashMap<>();
- protected final ClientMetrics clientMetrics;
+ protected final ClientStatsInfo clientStatsInfo;
public SimpleClientBalanceConsumer(final InnerSessionFactory messageSessionFactory,
final ConsumerConfig consumerConfig) throws TubeClientException {
@@ -127,8 +127,11 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
}
this.clientRmtDataCache =
new RmtDataCache(this.consumerConfig, null);
- this.clientMetrics =
- new ClientMetrics(false, this.consumerId, this.consumerConfig);
+ this.clientStatsInfo =
+ new ClientStatsInfo(false, this.consumerId,
+ this.consumerConfig.enableStatsSelfPrint(),
+ this.consumerConfig.getStatsSelfPrintPeriodMs(),
+ this.consumerConfig.getStatsForcedResetPeriodMs());
this.rpcServiceFactory =
this.sessionFactory.getRpcServiceFactory();
this.rpcConfig.put(RpcConstants.CONNECT_TIMEOUT, 3000);
@@ -312,7 +315,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
}
}
// print metric information
- clientMetrics.printMetricInfo(true, strBuffer);
+ clientStatsInfo.selfPrintStatsInfo(true, strBuffer);
logger.info(strBuffer
.append("[SHUTDOWN_CONSUMER] Partitions unregistered, consumer :")
.append(this.consumerId).toString());
@@ -694,9 +697,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
} catch (Throwable ee) {
// Process the exception
- clientMetrics.bookFailRpcCall(
- System.currentTimeMillis() - startTime,
- TErrCodeConstants.BAD_REQUEST);
+ clientStatsInfo.bookFailRpcCall(TErrCodeConstants.BAD_REQUEST);
clientRmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
taskContext.setFailProcessResult(400, sBuffer
.append("Get message error, reason is ")
@@ -706,7 +707,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
}
long dltTime = System.currentTimeMillis() - startTime;
if (msgRspB2C == null) {
- clientMetrics.bookFailRpcCall(dltTime, TErrCodeConstants.INTERNAL_SERVER_ERROR);
+ clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
clientRmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
taskContext.setFailProcessResult(500, "Get message null");
return taskContext;
@@ -763,7 +764,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
sBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
.append(taskContext.getUsedToken()).toString(), messageList, maxOffset);
sBuffer.delete(0, sBuffer.length());
- clientMetrics.bookSuccGetMsg(dltTime, msgCount, msgSize);
+ clientStatsInfo.bookSuccGetMsg(dltTime, topic, msgCount, msgSize);
break;
}
case TErrCodeConstants.HB_NO_NODE:
@@ -824,11 +825,11 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
}
}
if (msgRspB2C.getErrCode() != TErrCodeConstants.SUCCESS) {
- clientMetrics.bookFailRpcCall(dltTime, msgRspB2C.getErrCode());
+ clientStatsInfo.bookFailRpcCall(msgRspB2C.getErrCode());
}
return taskContext;
} catch (Throwable ee) {
- clientMetrics.bookFailRpcCall(dltTime, TErrCodeConstants.INTERNAL_SERVER_ERROR);
+ clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
logger.error("Process response code error", ee);
clientRmtDataCache.succRspRelease(partitionKey, topic,
taskContext.getUsedToken(), false, isFilterConsume(topic),
@@ -968,14 +969,14 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
clientRmtDataCache.resumeTimeoutConsumePartitions(false,
consumerConfig.getPullProtectConfirmTimeoutMs());
// print metric information
- clientMetrics.printMetricInfo(false, strBuffer);
+ clientStatsInfo.selfPrintStatsInfo(false, strBuffer);
// Send heartbeat request to master
ClientMaster.HeartResponseM2CV2 response =
masterService.consumerHeartbeatC2MV2(createMasterHeartBeatRequest(),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
// Process unsuccessful response
if (response == null) {
- clientMetrics.bookHB2MasterTimeout();
+ clientStatsInfo.bookHB2MasterTimeout();
logger.warn(strBuffer.append("[Heartbeat Failed] ")
.append("return result is null!").toString());
strBuffer.delete(0, strBuffer.length());
@@ -985,7 +986,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
if (response.getErrCode() != TErrCodeConstants.SUCCESS) {
// If master replies that cannot find current consumer node, re-register
if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
- clientMetrics.bookHB2MasterTimeout();
+ clientStatsInfo.bookHB2MasterTimeout();
if (tryRegister2Master(result, strBuffer)) {
logger.info(strBuffer.append("[Re-register] ")
.append(consumerId).toString());
@@ -995,7 +996,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
}
return;
}
- clientMetrics.bookHB2MasterException();
+ clientStatsInfo.bookHB2MasterException();
logger.error(strBuffer.append("[Heartbeat Failed] ")
.append(response.getErrMsg()).toString());
if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
@@ -1065,14 +1066,14 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
createBrokerRegisterRequest(partition, boostrapOffset),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
if (response == null) {
- clientMetrics.bookReg2Broker(true);
+ clientStatsInfo.bookReg2Broker(true);
result.setFailResult(TErrCodeConstants.CONNECT_RETURN_NULL,
sBuffer.append(" register ").append(partition.toString())
.append(" return null!").toString());
return result.isSuccess();
}
if (response.getSuccess()) {
- clientMetrics.bookReg2Broker(false);
+ clientStatsInfo.bookReg2Broker(false);
long currOffset = response.hasCurrOffset()
? response.getCurrOffset() : TBaseConstants.META_VALUE_UNDEFINED;
long maxOffset = response.hasMaxOffset()
@@ -1086,7 +1087,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
result.setSuccResult();
return result.isSuccess();
} else {
- clientMetrics.bookReg2Broker(true);
+ clientStatsInfo.bookReg2Broker(true);
if (response.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED
|| response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
clientRmtDataCache.removePartition(partition);
@@ -1125,7 +1126,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
masterService.consumerRegisterC2MV2(createMasterRegisterRequest(),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
if (response == null) {
- clientMetrics.bookReg2Master(true);
+ clientStatsInfo.bookReg2Master(true);
result.setFailResult(TErrCodeConstants.CONNECT_RETURN_NULL,
sBuffer.append("Register Failed: ").append(consumerId)
.append(" register to master return null!").toString());
@@ -1133,7 +1134,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
return result.isSuccess();
}
if (response.getErrCode() != TErrCodeConstants.SUCCESS) {
- clientMetrics.bookReg2Master(true);
+ clientStatsInfo.bookReg2Master(true);
// If the consumer group is forbidden, output the log
if (response.getErrCode()
== TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
@@ -1149,7 +1150,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
- clientMetrics.bookReg2Master(false);
+ clientStatsInfo.bookReg2Master(false);
// Process the successful response
clientRmtDataCache.updateReg2MasterTime();
clientRmtDataCache.updateBrokerInfoList(response.getBrokerConfigId(),
@@ -1287,7 +1288,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
if (response == null) {
- clientMetrics.bookHB2BrokerTimeout();
+ clientStatsInfo.bookHB2BrokerTimeout();
continue;
}
if (response.getSuccess()) {
@@ -1333,7 +1334,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
}
}
} else {
- clientMetrics.bookHB2BrokerException();
+ clientStatsInfo.bookHB2BrokerException();
if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
for (Partition partition : partitions) {
clientRmtDataCache.removePartition(partition);
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
index bb3b33e..b430781 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePullMessageConsumer.java
@@ -205,7 +205,7 @@ public class SimplePullMessageConsumer implements PullMessageConsumer {
.append("Not found the partition by confirmContext:")
.append(confirmContext).toString());
}
- baseConsumer.clientMetrics.bookConfirmDuration(
+ baseConsumer.clientStatsInfo.bookConfirmDuration(
System.currentTimeMillis() - timeStamp);
if (this.baseConsumer.consumerConfig.isPullConfirmInLocal()) {
baseConsumer.rmtDataCache.succRspRelease(keyId, topicName,
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
index 2fa49fd..45f5b86 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.java
@@ -249,7 +249,7 @@ public class SimplePushMessageConsumer implements PushMessageConsumer {
} else {
this.receiveMessages(request, topicProcessor);
}
- baseConsumer.clientMetrics.bookConfirmDuration(
+ baseConsumer.clientStatsInfo.bookConfirmDuration(
System.currentTimeMillis() - request.getUsedToken());
return true;
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageProducer.java
index fd519a1..34e0576 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageProducer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MessageProducer.java
@@ -37,7 +37,4 @@ public interface MessageProducer extends Shutdownable {
void sendMessage(Message message, MessageSentCallback cb)
throws TubeClientException, InterruptedException;
-
- @Override
- void shutdown() throws Throwable;
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
index 3a1fff3..4ad2a29 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.inlong.tubemq.client.common.ClientMetrics;
+import org.apache.inlong.tubemq.client.common.ClientStatsInfo;
import org.apache.inlong.tubemq.client.common.TubeClientVersion;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -94,14 +94,15 @@ public class ProducerManager {
private long lastEmptyBrokerPrintTime = 0;
private long lastEmptyTopicPrintTime = 0;
private int heartbeatRetryTimes = 0;
- private AtomicBoolean isStartHeart = new AtomicBoolean(false);
- private AtomicInteger heartBeatStatus = new AtomicInteger(-1);
+ private final AtomicBoolean isStartHeart = new AtomicBoolean(false);
+ private final AtomicInteger heartBeatStatus = new AtomicInteger(-1);
private volatile long lastHeartbeatTime = System.currentTimeMillis();
- private AtomicInteger nodeStatus = new AtomicInteger(-1);
+ private final AtomicInteger nodeStatus = new AtomicInteger(-1);
private Map<String, Map<Integer, List<Partition>>> topicPartitionMap =
new ConcurrentHashMap<>();
- private AtomicBoolean nextWithAuthInfo2M = new AtomicBoolean(false);
- private final ClientMetrics clientMetrics;
+ private final AtomicBoolean nextWithAuthInfo2M =
+ new AtomicBoolean(false);
+ private final ClientStatsInfo clientStatsInfo;
public ProducerManager(final InnerSessionFactory sessionFactory,
final TubeClientConfig tubeClientConfig) throws TubeClientException {
@@ -132,9 +133,12 @@ public class ProducerManager {
rpcConfig.put(RpcConstants.WORKER_THREAD_NAME, "tube_netty_worker-");
rpcConfig.put(RpcConstants.CALLBACK_WORKER_COUNT,
tubeClientConfig.getRpcRspCallBackThreadCnt());
- // initial client metrics
- this.clientMetrics =
- new ClientMetrics(true, this.producerId, this.tubeClientConfig);
+ // initial client statistics configure
+ this.clientStatsInfo =
+ new ClientStatsInfo(true, this.producerId,
+ this.tubeClientConfig.enableStatsSelfPrint(),
+ this.tubeClientConfig.getStatsSelfPrintPeriodMs(),
+ this.tubeClientConfig.getStatsForcedResetPeriodMs());
heartBeatStatus.set(0);
this.masterService =
this.rpcServiceFactory.getFailoverService(MasterService.class,
@@ -292,7 +296,7 @@ public class ProducerManager {
}
return;
}
- clientMetrics.printMetricInfo(true, strBuff);
+ clientStatsInfo.selfPrintStatsInfo(true, strBuff);
if (this.nodeStatus.compareAndSet(0, 1)) {
this.heartbeatService.shutdownNow();
this.topicPartitionMap.clear();
@@ -307,8 +311,8 @@ public class ProducerManager {
*
* @return client metrics
*/
- public ClientMetrics getClientMetrics() {
- return clientMetrics;
+ public ClientStatsInfo getClientMetrics() {
+ return clientStatsInfo;
}
/**
@@ -420,18 +424,18 @@ public class ProducerManager {
this.masterService.producerRegisterP2M(createRegisterRequest(),
AddressUtils.getLocalAddress(), tubeClientConfig.isTlsEnable());
if (response == null) {
- clientMetrics.bookReg2Master(true);
+ clientStatsInfo.bookReg2Master(true);
} else {
if (response.getSuccess()) {
if (response.getBrokerCheckSum() != this.brokerInfoCheckSum) {
updateBrokerInfoList(true, response.getBrokerInfosList(),
response.getBrokerCheckSum(), sBuilder);
}
- clientMetrics.bookReg2Master(false);
+ clientStatsInfo.bookReg2Master(false);
processRegSyncInfo(response);
return;
} else {
- clientMetrics.bookReg2Master(true);
+ clientStatsInfo.bookReg2Master(true);
}
}
if (remainingRetry <= 0) {
@@ -665,7 +669,7 @@ public class ProducerManager {
ThreadUtils.sleep(100);
}
// print metrics information
- clientMetrics.printMetricInfo(false, sBuilder);
+ clientStatsInfo.selfPrintStatsInfo(false, sBuilder);
// check whether public topics
if (publishTopics.isEmpty()) {
return;
@@ -677,14 +681,14 @@ public class ProducerManager {
if (response == null || !response.getSuccess()) {
heartbeatRetryTimes++;
if (response == null) {
- clientMetrics.bookHB2MasterException();
+ clientStatsInfo.bookHB2MasterException();
logger.error("[Heartbeat Failed] receive null HeartResponseM2P response!");
} else {
logger.error(sBuilder.append("[Heartbeat Failed] ")
.append(response.getErrMsg()).toString());
sBuilder.delete(0, sBuilder.length());
if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
- clientMetrics.bookHB2MasterTimeout();
+ clientStatsInfo.bookHB2MasterTimeout();
try {
register2Master();
} catch (Throwable ee) {
@@ -694,7 +698,7 @@ public class ProducerManager {
sBuilder.delete(0, sBuilder.length());
}
} else {
- clientMetrics.bookHB2MasterException();
+ clientStatsInfo.bookHB2MasterException();
if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
adjustHeartBeatPeriod("certificate failure", sBuilder);
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
index 4e35b5d..c4699fd 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
@@ -63,7 +63,7 @@ public class SimpleMessageProducer implements MessageProducer {
private final PartitionRouter partitionRouter;
private final DefaultBrokerRcvQltyStats brokerRcvQltyStats;
private final RpcConfig rpcConfig = new RpcConfig();
- private AtomicBoolean isShutDown = new AtomicBoolean(false);
+ private final AtomicBoolean isShutDown = new AtomicBoolean(false);
public SimpleMessageProducer(final InnerSessionFactory sessionFactory,
TubeClientConfig tubeClientConfig) throws TubeClientException {
@@ -223,7 +223,6 @@ public class SimpleMessageProducer implements MessageProducer {
rpcServiceFactory.addRmtAddrErrCount(partition.getBroker().getBrokerAddr());
}
producerManager.getClientMetrics().bookFailRpcCall(
- System.currentTimeMillis() - startTime,
TErrCodeConstants.UNSPECIFIED_ABNORMAL);
partition.increRetries(1);
this.brokerRcvQltyStats.addReceiveStatistic(brokerId, false);
@@ -269,7 +268,6 @@ public class SimpleMessageProducer implements MessageProducer {
@Override
public void handleError(Throwable error) {
producerManager.getClientMetrics().bookFailRpcCall(
- System.currentTimeMillis() - startTime,
TErrCodeConstants.UNSPECIFIED_ABNORMAL);
partition.increRetries(1);
brokerRcvQltyStats.addReceiveStatistic(brokerId, false);
@@ -365,7 +363,8 @@ public class SimpleMessageProducer implements MessageProducer {
final ClientBroker.SendMessageResponseB2P response) {
final String resultStr = response.getErrMsg();
if (response.getErrCode() == TErrCodeConstants.SUCCESS) {
- producerManager.getClientMetrics().bookSuccSendMsg(dltTime, message.getData().length);
+ producerManager.getClientMetrics().bookSuccSendMsg(dltTime,
+ message.getTopic(), message.getData().length);
if (response.hasMessageId()) {
return new MessageSentResult(true,
response.getErrCode(), "Ok!",
@@ -376,7 +375,7 @@ public class SimpleMessageProducer implements MessageProducer {
message, Long.parseLong(resultStr), partition);
}
} else {
- producerManager.getClientMetrics().bookFailRpcCall(dltTime, response.getErrCode());
+ producerManager.getClientMetrics().bookFailRpcCall(response.getErrCode());
return new MessageSentResult(false, response.getErrCode(), resultStr,
message, TBaseConstants.META_VALUE_UNDEFINED, partition);
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
index 92070f3..2835a2e 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
@@ -82,5 +82,6 @@ public class TBaseConstants {
public static final long CFG_DEF_META_FORCE_UPDATE_PERIOD = 3 * 60 * 1000;
public static final long CFG_MIN_META_FORCE_UPDATE_PERIOD = 1 * 60 * 1000;
+ public static final long CFG_STATS_MIN_SNAPSHOT_PERIOD_MS = 2000;
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
new file mode 100644
index 0000000..996592a
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/TrafficStatsUnit.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+
+/**
+ * TrafficStatsUnit, Metric Statistics item Unit
+ *
+ * Currently includes the total number of messages and bytes
+ * according to the statistics dimension, which can be expanded later as needed
+ */
+public class TrafficStatsUnit {
+ // the message count
+ public LongStatsCounter msgCnt;
+ // the message size
+ public LongStatsCounter msgSize;
+
+ /**
+ * Accumulate the count of messages and message bytes.
+ *
+ * @param msgCntName the specified count statistics item name
+ * @param msgSizeName the specified size statistics item name
+ * @param prefix the prefix of statistics items
+ */
+ public TrafficStatsUnit(String msgCntName, String msgSizeName, String prefix) {
+ this.msgCnt = new LongStatsCounter(msgCntName, prefix);
+ this.msgSize = new LongStatsCounter(msgSizeName, prefix);
+ }
+
+ /**
+ * Accumulate the count of messages and message bytes.
+ *
+ * @param msgCount the specified message count
+ * @param msgSize the specified message size
+ */
+ public void addMsgCntAndSize(long msgCount, long msgSize) {
+ this.msgCnt.addValue(msgCount);
+ this.msgSize.addValue(msgSize);
+ }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java
index 4b9f793..7305bf6 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java
@@ -134,6 +134,15 @@ public class TStringUtils {
return new String(tgtStr, 0, curWritePos);
}
+ /**
+ * Get the authorization signature based on the provided values
+ * base64.encode(hmacSha1(password, username, timestamp, random number))
+ *
+ * @param usrName the user name
+ * @param usrPassWord the password of username
+ * @param timestamp the time stamp
+ * @param randomValue the random value
+ */
public static String getAuthSignature(final String usrName,
final String usrPassWord,
long timestamp, int randomValue) {
@@ -155,6 +164,14 @@ public class TStringUtils {
return signature;
}
+ /**
+ * Build attribute information
+ *
+ * @param srcAttrs the current attribute
+ * @param attrKey the attribute key
+ * @param attrVal the attribute value
+ * @return the new attribute information
+ */
public static String setAttrValToAttributes(String srcAttrs,
String attrKey, String attrVal) {
StringBuilder sbuf = new StringBuilder(512);
@@ -184,6 +201,13 @@ public class TStringUtils {
return sbuf.toString();
}
+ /**
+ * Get attribute value by key from attribute information
+ *
+ * @param srcAttrs the current attribute
+ * @param attrKey the attribute key
+ * @return the attribute value
+ */
public static String getAttrValFrmAttributes(String srcAttrs, String attrKey) {
if (!isBlank(srcAttrs)) {
String[] strAttrs = srcAttrs.split(TokenConstants.SEGMENT_SEP);
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ThreadUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ThreadUtils.java
index 2abccd5..ebaf7d9 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ThreadUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ThreadUtils.java
@@ -99,6 +99,8 @@ public class ThreadUtils {
}
/**
+ * Check and wait the specified thread exit
+ *
* @param t Waits on the passed thread to die dumping a threaddump every minute while its up.
*/
public static void threadDumpingIsAlive(final Thread t) throws InterruptedException {
@@ -115,6 +117,8 @@ public class ThreadUtils {
}
/**
+ * Sleep current thread
+ *
* @param millis How long to sleep for in milliseconds.
*/
public static void sleep(long millis) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 2b8aba3..a419b02 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -806,7 +806,6 @@ public class MessageStore implements Closeable {
if (tmpStore.getMaxAllowedMsgCount() == writeCacheMaxCnt
&& tmpStore.getMaxDataCacheSize() == writeCacheMaxSize) {
msgMemStore = tmpStore;
- msgMemStore.clear();
} else {
isRealloc = true;
tmpStore.close();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 77ab572..7c6c20a 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -188,8 +188,10 @@ public class MsgFileStore implements Closeable {
long currTime = System.currentTimeMillis();
isMsgDataFlushed = (messageStore.getUnflushDataHold() > 0)
&& (curUnflushSize.get() >= messageStore.getUnflushDataHold());
- if ((isMsgCntFlushed = this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold())
- || (isMsgTimeFlushed = currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval())
+ if ((isMsgCntFlushed =
+ (this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold()))
+ || (isMsgTimeFlushed =
+ (currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval()))
|| isMsgDataFlushed || isDataSegFlushed || isIndexSegFlushed) {
isForceMetadata = (isDataSegFlushed || isIndexSegFlushed
|| (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR));
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 81a0f9b..678c72c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -139,9 +139,7 @@ public class MsgMemStore implements Closeable {
dataOffset = this.writeDataStartPos + this.cacheDataOffset.get();
indexEntry.putLong(DataStoreUtils.INDEX_POS_DATAOFFSET, dataOffset);
dataEntry.putLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF, indexOffset);
- // this.cacheDataSegment.position(this.cacheDataOffset.get());
this.cacheDataSegment.put(dataEntry.array());
- // this.cachedIndexSegment.position(this.cacheIndexOffset.get());
this.cachedIndexSegment.put(indexEntry.array());
this.cacheDataOffset.getAndAdd(dataEntryLength);
indexSizePos = cacheIndexOffset.getAndAdd(DataStoreUtils.STORE_INDEX_HEAD_LEN);
@@ -392,6 +390,8 @@ public class MsgMemStore implements Closeable {
this.curMessageCount.set(0);
this.queuesMap.clear();
this.keysMap.clear();
+ this.cacheDataSegment.rewind();
+ this.cachedIndexSegment.rewind();
this.leftAppendTime.set(System.currentTimeMillis());
this.rightAppendTime.set(System.currentTimeMillis());
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
index 11dd0ec..07817aa 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
@@ -20,11 +20,11 @@ package org.apache.inlong.tubemq.server.broker.stats;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
/**
* BrokerSrvStatsHolder, statistic Broker metrics information for RPC services
@@ -142,7 +142,7 @@ public class BrokerSrvStatsHolder {
long curSnapshotTime = lstSnapshotTime.get();
// Avoid frequent snapshots
if ((System.currentTimeMillis() - curSnapshotTime)
- >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ >= TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS) {
if (lstSnapshotTime.compareAndSet(curSnapshotTime, System.currentTimeMillis())) {
switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
return true;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
index ff88df7..c2d3ad3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
@@ -20,6 +20,8 @@ package org.apache.inlong.tubemq.server.broker.stats;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
@@ -352,7 +354,7 @@ public class MsgStoreStatsHolder {
long curSwitchTime = lstSnapshotTime.get();
// Avoid frequent snapshots
if ((System.currentTimeMillis() - curSwitchTime)
- >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ >= TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS) {
if (lstSnapshotTime.compareAndSet(curSwitchTime, System.currentTimeMillis())) {
msgStoreStatsSets[getIndex(writableIndex.get() - 1)].clear();
msgStoreStatsSets[getIndex(writableIndex.getAndIncrement())]
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
index 0aa8ea2..f27147c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -22,8 +22,8 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
+import org.apache.inlong.tubemq.corebase.metric.TrafficStatsUnit;
import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
-import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,18 +96,18 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
@Override
public void add(Map<String, TrafficInfo> trafficInfos) {
- TrafficStatsSet tmpStatsSet;
- TrafficStatsSet trafficStatsSet;
+ TrafficStatsUnit tmpStatsSet;
+ TrafficStatsUnit trafficStatsSet;
// Increment write reference count
switchableUnits[getIndex()].refCnt.incValue();
try {
// Accumulate statistics information
- ConcurrentHashMap<String, TrafficStatsSet> tmpStatsSetMap =
+ ConcurrentHashMap<String, TrafficStatsUnit> tmpStatsSetMap =
switchableUnits[getIndex()].statsUnitMap;
for (Entry<String, TrafficInfo> entry : trafficInfos.entrySet()) {
trafficStatsSet = tmpStatsSetMap.get(entry.getKey());
if (trafficStatsSet == null) {
- tmpStatsSet = new TrafficStatsSet();
+ tmpStatsSet = new TrafficStatsUnit("msg_cnt", "msg_size", null);
trafficStatsSet = tmpStatsSetMap.putIfAbsent(entry.getKey(), tmpStatsSet);
if (trafficStatsSet == null) {
trafficStatsSet = tmpStatsSet;
@@ -128,11 +128,11 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
switchableUnits[getIndex()].refCnt.incValue();
try {
// Accumulate statistics information
- ConcurrentHashMap<String, TrafficStatsSet> tmpStatsSetMap =
+ ConcurrentHashMap<String, TrafficStatsUnit> tmpStatsSetMap =
switchableUnits[getIndex()].statsUnitMap;
- TrafficStatsSet trafficStatsSet = tmpStatsSetMap.get(statsKey);
+ TrafficStatsUnit trafficStatsSet = tmpStatsSetMap.get(statsKey);
if (trafficStatsSet == null) {
- TrafficStatsSet tmpStatsSet = new TrafficStatsSet();
+ TrafficStatsUnit tmpStatsSet = new TrafficStatsUnit("msg_cnt", "msg_size", null);
trafficStatsSet = tmpStatsSetMap.putIfAbsent(statsKey, tmpStatsSet);
if (trafficStatsSet == null) {
trafficStatsSet = tmpStatsSet;
@@ -169,8 +169,8 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
}
} while (selectedUnit.refCnt.getValue() > 0);
// Output data to file
- Map<String, TrafficStatsSet> statsMap = selectedUnit.statsUnitMap;
- for (Entry<String, TrafficStatsSet> entry : statsMap.entrySet()) {
+ Map<String, TrafficStatsUnit> statsMap = selectedUnit.statsUnitMap;
+ for (Entry<String, TrafficStatsUnit> entry : statsMap.entrySet()) {
logger.info("{}#{}#{}#{}", statsCat, entry.getKey(),
entry.getValue().msgCnt.getAndResetValue(),
entry.getValue().msgSize.getAndResetValue());
@@ -198,34 +198,6 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
}
/**
- * StatsItemSet, Metric Statistics item set
- *
- * Currently includes the total number of messages and bytes
- * according to the statistics dimension, which can be expanded later as needed
- */
- private static class TrafficStatsSet {
- protected LongStatsCounter msgCnt =
- new LongStatsCounter("msg_count", null);
- protected LongStatsCounter msgSize =
- new LongStatsCounter("msg_size", null);
-
- public TrafficStatsSet() {
- //
- }
-
- /**
- * Accumulate the count of messages and message bytes.
- *
- * @param msgCount the specified message count
- * @param msgSize the specified message size
- */
- public void addMsgCntAndSize(long msgCount, long msgSize) {
- this.msgCnt.addValue(msgCount);
- this.msgSize.addValue(msgSize);
- }
- }
-
- /**
* WritableUnit,
*
* This class is mainly defined to facilitate reading and writing of
@@ -237,7 +209,7 @@ public class TrafficStatsService extends AbstractDaemonService implements Traffi
public LongOnlineCounter refCnt =
new LongOnlineCounter("ref_count", null);
// statistic unit map
- protected ConcurrentHashMap<String, TrafficStatsSet> statsUnitMap =
+ protected ConcurrentHashMap<String, TrafficStatsUnit> statsUnitMap =
new ConcurrentHashMap<>(512);
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index a336ae8..007a7fb 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -99,7 +99,6 @@ public final class TServerConstants {
public static final long CFG_OFFSET_RESET_MID_ALARM_CHECK =
DataStoreUtils.STORE_INDEX_HEAD_LEN * 1000000L;
- // Minimum snapshot period
- public static final long MIN_SNAPSHOT_PERIOD_MS = 2000L;
+ // max statistics token type length
public static final int META_MAX_STATSTYPE_LENGTH = 256;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
index f47beb7..9360f40 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
@@ -21,10 +21,10 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
/**
* WebCallStatsHolder, statistic for web api calls
@@ -118,7 +118,7 @@ public class WebCallStatsHolder {
long curSnapshotTime = lstSnapshotTime.get();
// Avoid frequent snapshots
if ((System.currentTimeMillis() - curSnapshotTime)
- >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ >= TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS) {
if (lstSnapshotTime.compareAndSet(curSnapshotTime, System.currentTimeMillis())) {
switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
return true;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
index 082640c..a60483e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/stats/MasterSrvStatsHolder.java
@@ -20,11 +20,11 @@ package org.apache.inlong.tubemq.server.master.stats;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
/**
* MasterSrvStatsHolder, statistics Master's RPC service metric information
@@ -217,7 +217,7 @@ public class MasterSrvStatsHolder {
long curSnapshotTime = lstSnapshotTime.get();
// Avoid frequent snapshots
if ((System.currentTimeMillis() - curSnapshotTime)
- >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ >= TBaseConstants.CFG_STATS_MIN_SNAPSHOT_PERIOD_MS) {
if (lstSnapshotTime.compareAndSet(curSnapshotTime, System.currentTimeMillis())) {
switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
return true;