You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/06 05:07:13 UTC
[incubator-inlong] branch master updated: [INLONG-1908]Adjust the metric realization of TubeMQ (#1909)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1a905c6 [INLONG-1908]Adjust the metric realization of TubeMQ (#1909)
1a905c6 is described below
commit 1a905c6a1682351fadef81fc082f3a036efd37c0
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Dec 6 13:07:07 2021 +0800
[INLONG-1908]Adjust the metric realization of TubeMQ (#1909)
---
.../tubemq/corebase/metric/AbsMetricItem.java | 8 +-
.../tubemq/corebase/metric/CountMetricItem.java | 2 +-
.../corebase/metric/GaugeNormMetricItem.java | 2 +-
.../inlong/tubemq/corebase/metric/MetricValue.java | 3 -
.../tubemq/corebase/metric/MetricValueType.java | 14 +-
.../tubemq/corebase/metric/MetricValues.java | 2 -
.../tubemq/corebase/metric/MetricItemTest.java | 93 +++++
.../tubemq/server/broker/BrokerServiceServer.java | 9 +-
.../inlong/tubemq/server/broker/TubeBroker.java | 5 +-
.../server/broker/metrics/BrokerMetricMXBean.java | 11 +-
.../server/broker/metrics/BrokerMetrics.java | 82 +++-
.../server/broker/metrics/BrokerMetricsHolder.java | 50 ++-
.../server/broker/msgstore/disk/FileSegment.java | 8 +-
.../server/broker/msgstore/disk/MsgFileStore.java | 6 +-
.../server/broker/msgstore/mem/MsgMemStore.java | 6 +-
.../server/broker/offset/DefaultOffsetManager.java | 6 +-
.../common/offsetstorage/ZkOffsetStorage.java | 8 +-
.../inlong/tubemq/server/master/TMaster.java | 54 +--
.../server/master/balance/DefaultLoadBalancer.java | 16 +-
.../tubemq/server/master/metrics/MasterMetric.java | 108 -----
.../metrics/MasterMetricMXBean.java} | 15 +-
.../server/master/metrics/MasterMetrics.java | 274 ++++++++++++
.../server/master/metrics/MasterMetricsHolder.java | 164 ++++++++
.../nodemanage/nodebroker/BrokerAbnHolder.java | 26 +-
.../nodemanage/nodebroker/DefBrokerRunManager.java | 17 +-
.../nodeconsumer/ConsumerEventManager.java | 19 +-
.../nodeconsumer/ConsumerInfoHolder.java | 37 +-
.../nodeproducer/ProducerInfoHolder.java | 18 +-
.../tubemq/server/broker/BrokerMetricsTest.java | 207 +++++++--
.../tubemq/server/master/MasterMetricsTest.java | 463 ++++++++++++++++++++-
.../nodeconsumer/ConsumerEventManagerTest.java | 6 +-
31 files changed, 1408 insertions(+), 331 deletions(-)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java
index 3bf0b43..7a1e21c 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/AbsMetricItem.java
@@ -27,10 +27,6 @@ public abstract class AbsMetricItem {
protected final String name;
protected final AtomicLong value = new AtomicLong(0);
- public AbsMetricItem(MetricType metricType, String name) {
- this(metricType, MetricValueType.NORMAL, name, 0);
- }
-
public AbsMetricItem(MetricType metricType, MetricValueType valueType,
String name, long initialValue) {
this.metricType = metricType;
@@ -67,6 +63,10 @@ public abstract class AbsMetricItem {
return value.incrementAndGet();
}
+ public boolean compareAndSet(long expect, long update) {
+ return value.compareAndSet(expect, update);
+ }
+
public long decrementAndGet() {
return value.decrementAndGet();
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java
index 67f2c62..194dec8 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/CountMetricItem.java
@@ -20,7 +20,7 @@ package org.apache.inlong.tubemq.corebase.metric;
public class CountMetricItem extends AbsMetricItem {
public CountMetricItem(String name) {
- super(MetricType.COUNTER, name);
+ super(MetricType.COUNTER, MetricValueType.MAX, name, 0);
}
@Override
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java
index 2005bab..a221047 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/GaugeNormMetricItem.java
@@ -20,7 +20,7 @@ package org.apache.inlong.tubemq.corebase.metric;
public class GaugeNormMetricItem extends AbsMetricItem {
public GaugeNormMetricItem(String name) {
- super(MetricType.GAUGE, MetricValueType.MIN, name, 0);
+ super(MetricType.GAUGE, MetricValueType.NORMAL, name, 0);
}
@Override
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java
index 4ff4aa4..18e0477 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValue.java
@@ -17,14 +17,11 @@
package org.apache.inlong.tubemq.corebase.metric;
-import java.beans.ConstructorProperties;
-
public class MetricValue {
private final String type;
private final String name;
private final long value;
- @ConstructorProperties({"type", "name", "value"})
public MetricValue(String type, String name, long value) {
this.name = name;
this.type = type;
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java
index 64d972c..ded31b7 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValueType.java
@@ -18,13 +18,14 @@
package org.apache.inlong.tubemq.corebase.metric;
public enum MetricValueType {
- NORMAL(0, "Normal"),
- MIN(1, "Min"),
- MAX(2, "Max");
+ NORMAL(0, "Normal", "Current value"),
+ MIN(1, "Min", "Historical minimum value"),
+ MAX(2, "Max", "Historical maximum value");
- MetricValueType(int id, String name) {
+ MetricValueType(int id, String name, String desc) {
this.id = id;
this.name = name;
+ this.desc = desc;
}
public int getId() {
@@ -35,6 +36,10 @@ public enum MetricValueType {
return name;
}
+ public String getDesc() {
+ return desc;
+ }
+
public static MetricValueType valueOf(int value) {
for (MetricValueType valueType : MetricValueType.values()) {
if (valueType.getId() == value) {
@@ -46,4 +51,5 @@ public enum MetricValueType {
private final int id;
private final String name;
+ private final String desc;
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java
index 34c771b..1ab380d 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricValues.java
@@ -17,14 +17,12 @@
package org.apache.inlong.tubemq.corebase.metric;
-import java.beans.ConstructorProperties;
import java.util.Map;
public class MetricValues {
private final String lastResetTime;
private final Map<String, Long> metricValues;
- @ConstructorProperties({"lastResetTime", "metricValues"})
public MetricValues(String lastResetTime, Map<String, Long> metricValues) {
this.lastResetTime = lastResetTime;
this.metricValues = metricValues;
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/MetricItemTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/MetricItemTest.java
new file mode 100644
index 0000000..e941b55
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/MetricItemTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetricItemTest {
+ private static final Logger logger =
+ LoggerFactory.getLogger(MetricItemTest.class);
+
+ @Test
+ public void testMetricItem() {
+ try {
+ final CountMetricItem countMetricItem =
+ new CountMetricItem("CountMetricItem");
+ final GaugeNormMetricItem gaugeNormMetricItem =
+ new GaugeNormMetricItem("GaugeNormMetricItem");
+ final GaugeMaxMetricItem gaugeMaxMetricItem =
+ new GaugeMaxMetricItem("GaugeMaxMetricItem");
+ final GaugeMinMetricItem gaugeMinMetricItem =
+ new GaugeMinMetricItem("GaugeMinMetricItem");
+
+ countMetricItem.incrementAndGet();
+ countMetricItem.incrementAndGet();
+ countMetricItem.incrementAndGet();
+ countMetricItem.decrementAndGet();
+
+ gaugeNormMetricItem.update(1000);
+ gaugeNormMetricItem.update(2000);
+ gaugeNormMetricItem.update(500);
+
+ gaugeMaxMetricItem.update(1000);
+ gaugeMaxMetricItem.update(5000);
+ gaugeMaxMetricItem.update(3000);
+
+ gaugeMinMetricItem.update(1000);
+ gaugeMinMetricItem.update(1);
+ gaugeMinMetricItem.update(10000);
+
+ Assert.assertEquals(2, countMetricItem.getValue());
+ Assert.assertEquals(500, gaugeNormMetricItem.getValue());
+ Assert.assertEquals(5000, gaugeMaxMetricItem.getValue());
+ Assert.assertEquals(1, gaugeMinMetricItem.getValue());
+
+ countMetricItem.getAndSet();
+ gaugeNormMetricItem.getAndSet();
+ gaugeMaxMetricItem.getAndSet();
+ gaugeMinMetricItem.getAndSet();
+
+ Assert.assertEquals(0, countMetricItem.getValue());
+ Assert.assertEquals(500, gaugeNormMetricItem.getValue());
+ Assert.assertEquals(0, gaugeMaxMetricItem.getValue());
+ Assert.assertEquals(Long.MAX_VALUE, gaugeMinMetricItem.getValue());
+
+ Assert.assertEquals(MetricType.COUNTER.getId(),
+ countMetricItem.getMetricType().getId());
+ Assert.assertEquals(MetricValueType.MAX.getId(),
+ countMetricItem.getMetricValueType().getId());
+ Assert.assertEquals(MetricType.GAUGE.getId(),
+ gaugeNormMetricItem.getMetricType().getId());
+ Assert.assertEquals(MetricValueType.NORMAL.getId(),
+ gaugeNormMetricItem.getMetricValueType().getId());
+ Assert.assertEquals(MetricType.GAUGE.getId(),
+ gaugeMaxMetricItem.getMetricType().getId());
+ Assert.assertEquals(MetricValueType.MAX.getId(),
+ gaugeMaxMetricItem.getMetricValueType().getId());
+ Assert.assertEquals(MetricType.GAUGE.getId(),
+ gaugeMinMetricItem.getMetricType().getId());
+ Assert.assertEquals(MetricValueType.MIN.getId(),
+ gaugeMinMetricItem.getMetricValueType().getId());
+ } catch (Exception ex) {
+ logger.error("error happens" + ex);
+ }
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 6fe76f4..f6cfa46 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -845,7 +845,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
consumerNodeInfo = new ConsumerNodeInfo(storeManager, reqQryPriorityId,
clientId, filterCondSet, reqSessionKey, reqSessionTime, true, partStr);
if (consumerRegisterMap.put(partStr, consumerNodeInfo) == null) {
- BrokerMetricsHolder.METRICS.consumerOnlineCnt.incrementAndGet();
+ BrokerMetricsHolder.incConsumerCnt();
}
heartbeatManager.regConsumerNode(getHeartbeatNodeId(clientId, partStr), clientId, partStr);
MessageStore dataStore = null;
@@ -891,7 +891,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
heartbeatManager.getConsumerRegMap().get(getHeartbeatNodeId(consumerId, partStr));
if (timeoutInfo == null || System.currentTimeMillis() >= timeoutInfo.getTimeoutTime()) {
if (consumerRegisterMap.remove(partStr) != null) {
- BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet();
+ BrokerMetricsHolder.decConsumerCnt(true);
}
strBuffer.append("[Duplicated Register] Remove Invalid Consumer Register ")
.append(consumerId).append(TokenConstants.SEGMENT_SEP).append(partStr);
@@ -957,7 +957,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
.append(request.getPartitionId()).append(" updatedOffset:").append(updatedOffset).toString());
strBuffer.delete(0, strBuffer.length());
if (consumerRegisterMap.remove(partStr) != null) {
- BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet();
+ BrokerMetricsHolder.decConsumerCnt(false);
}
heartbeatManager.unRegConsumerNode(
getHeartbeatNodeId(clientId, partStr));
@@ -1245,8 +1245,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
}
if (consumerNodeInfo.getConsumerId().equalsIgnoreCase(nodeInfo.getSecondKey())) {
if (consumerRegisterMap.remove(nodeInfo.getThirdKey()) != null) {
- BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet();
- BrokerMetricsHolder.METRICS.consumerTmoTotCnt.decrementAndGet();
+ BrokerMetricsHolder.decConsumerCnt(true);
}
String[] groupTopicPart =
consumerNodeInfo.getPartStr().split(TokenConstants.ATTR_SEP);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
index 474ddfb..6773264 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
@@ -218,8 +218,7 @@ public class TubeBroker implements Stoppable {
if (!response.getSuccess()) {
isKeepAlive.set(false);
if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
- BrokerMetricsHolder.METRICS
- .masterNoNodeCnt.incrementAndGet();
+ BrokerMetricsHolder.incMasterNoNodeCnt();
register2Master();
heartbeatErrors.set(0);
logger.info("Re-register to master successfully!");
@@ -234,7 +233,7 @@ public class TubeBroker implements Stoppable {
isKeepAlive.set(false);
heartbeatErrors.incrementAndGet();
samplePrintCtrl.printExceptionCaught(t);
- BrokerMetricsHolder.METRICS.hbExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incHBExceptionCnt();
}
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java
index f8f0c38..3058556 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java
@@ -20,14 +20,17 @@ package org.apache.inlong.tubemq.server.broker.metrics;
import org.apache.inlong.tubemq.corebase.metric.MetricValues;
/**
- * BrokerMonitorMXBean
- * Provide access interface of a metric item with JMX.<br>
- * Decouple between metric item and monitor system, in particular scene, <br>
- * inlong can depend on user-defined monitor system.
+ * BrokerMetricMXBean
+ * Broker's metric data access interface, including:
+ * the getMetric() that directly obtains data
+ * the getAndReSetMetrics() that can clear the values of
+ * the counter, maximum and minimum extremum Gauge data
*/
public interface BrokerMetricMXBean {
+ // get current metric data by viewing mode
MetricValues getMetrics();
+ // get current metric data and reset the Counter, maximum/minimum Gauge metric
MetricValues getAndReSetMetrics();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java
index f7588c8..aa573c9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetrics.java
@@ -33,31 +33,33 @@ public class BrokerMetrics implements BrokerMetricMXBean {
private final AtomicLong lastResetTime =
new AtomicLong(System.currentTimeMillis());
- public final AbsMetricItem syncDataDurMin =
- new GaugeMinMetricItem("fSync_latency_min");
- public final AbsMetricItem syncDataDurMax =
- new GaugeMaxMetricItem("fSync_latency_max");
- public final AbsMetricItem syncZkDurMin =
- new GaugeMinMetricItem("zkSync_latency_min");
- public final AbsMetricItem syncZkDurMax =
- new GaugeMaxMetricItem("zkSync_latency_max");
- public final AbsMetricItem zkExceptionCnt =
+ // Delay statistics for syncing data to files
+ protected final AbsMetricItem syncDataDurMin =
+ new GaugeMinMetricItem("fSync_duration_min");
+ protected final AbsMetricItem syncDataDurMax =
+ new GaugeMaxMetricItem("fSync_duration_max");
+ // Delay statistics for syncing data to Zookeeper
+ protected final AbsMetricItem syncZkDurMin =
+ new GaugeMinMetricItem("zkSync_duration_min");
+ protected final AbsMetricItem syncZkDurMax =
+ new GaugeMaxMetricItem("zkSync_duration_max");
+ // Zookeeper Exception statistics
+ protected final AbsMetricItem zkExceptionCnt =
new CountMetricItem("zk_exception_cnt");
- public final AbsMetricItem masterNoNodeCnt =
+ // Broker 2 Master status statistics
+ protected final AbsMetricItem masterNoNodeCnt =
new CountMetricItem("online_timeout_cnt");
- public final AbsMetricItem hbExceptionCnt =
+ protected final AbsMetricItem hbExceptionCnt =
new CountMetricItem("hb_master_exception_cnt");
- public final AbsMetricItem ioExceptionCnt =
+ // Disk IO Exception statistics
+ protected final AbsMetricItem ioExceptionCnt =
new CountMetricItem("io_exception_cnt");
- public final AbsMetricItem consumerOnlineCnt =
+ // Consumer client statistics
+ protected final AbsMetricItem consumerOnlineCnt =
new GaugeNormMetricItem("consumer_online_cnt");
- public final AbsMetricItem consumerTmoTotCnt =
+ protected final AbsMetricItem consumerTmoTotCnt =
new CountMetricItem("consumer_timeout_cnt");
- public BrokerMetrics() {
- this.lastResetTime.set(System.currentTimeMillis());
- }
-
@Override
public MetricValues getMetrics() {
Map<String, Long> metricValues = new HashMap<>();
@@ -92,5 +94,49 @@ public class BrokerMetrics implements BrokerMetricMXBean {
return new MetricValues(
WebParameterUtils.date2yyyyMMddHHmmss(new Date(befTime)), metricValues);
}
+
+ public long getLastResetTime() {
+ return lastResetTime.get();
+ }
+
+ public AbsMetricItem getSyncDataDurMin() {
+ return syncDataDurMin;
+ }
+
+ public AbsMetricItem getSyncDataDurMax() {
+ return syncDataDurMax;
+ }
+
+ public AbsMetricItem getSyncZkDurMin() {
+ return syncZkDurMin;
+ }
+
+ public AbsMetricItem getSyncZkDurMax() {
+ return syncZkDurMax;
+ }
+
+ public AbsMetricItem getZkExceptionCnt() {
+ return zkExceptionCnt;
+ }
+
+ public AbsMetricItem getMasterNoNodeCnt() {
+ return masterNoNodeCnt;
+ }
+
+ public AbsMetricItem getHbExceptionCnt() {
+ return hbExceptionCnt;
+ }
+
+ public AbsMetricItem getIoExceptionCnt() {
+ return ioExceptionCnt;
+ }
+
+ public AbsMetricItem getConsumerOnlineCnt() {
+ return consumerOnlineCnt;
+ }
+
+ public AbsMetricItem getConsumerTmoTotCnt() {
+ return consumerTmoTotCnt;
+ }
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java
index 845f14d..b64c678 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricsHolder.java
@@ -27,9 +27,10 @@ import org.slf4j.LoggerFactory;
public class BrokerMetricsHolder {
private static final Logger logger =
LoggerFactory.getLogger(BrokerMetricsHolder.class);
-
+ // Registration status indicator
private static final AtomicBoolean registered = new AtomicBoolean(false);
- public static final BrokerMetrics METRICS = new BrokerMetrics();
+ // broker metrics information
+ private static final BrokerMetrics statsInfo = new BrokerMetrics();
public static void registerMXBean() {
if (!registered.compareAndSet(false, true)) {
@@ -38,11 +39,52 @@ public class BrokerMetricsHolder {
try {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxBeanName =
- new ObjectName("org.apache.inlong.tubemq.server.broker:type=brokerMetrics");
- mbs.registerMBean(METRICS, mxBeanName);
+ new ObjectName("org.apache.inlong.tubemq.server.broker:type=BrokerMetrics");
+ mbs.registerMBean(statsInfo, mxBeanName);
} catch (Exception ex) {
logger.error("Register BrokerMXBean error: ", ex);
}
}
+
+ public static void incConsumerCnt() {
+ statsInfo.consumerOnlineCnt.incrementAndGet();
+ }
+
+ public static void decConsumerCnt(boolean isTimeout) {
+ statsInfo.consumerOnlineCnt.decrementAndGet();
+ if (isTimeout) {
+ statsInfo.consumerTmoTotCnt.incrementAndGet();
+ }
+ }
+
+ public static void incMasterNoNodeCnt() {
+ statsInfo.masterNoNodeCnt.incrementAndGet();
+ }
+
+ public static void incHBExceptionCnt() {
+ statsInfo.hbExceptionCnt.incrementAndGet();
+ }
+
+ public static void incIOExceptionCnt() {
+ statsInfo.ioExceptionCnt.incrementAndGet();
+ }
+
+ public static void incZKExceptionCnt() {
+ statsInfo.zkExceptionCnt.incrementAndGet();
+ }
+
+ public static void updSyncDataDurations(long dltTime) {
+ statsInfo.syncDataDurMin.update(dltTime);
+ statsInfo.syncDataDurMax.update(dltTime);
+ }
+
+ public static void updSyncZKDurations(long dltTime) {
+ statsInfo.syncZkDurMin.update(dltTime);
+ statsInfo.syncZkDurMax.update(dltTime);
+ }
+
+ public static BrokerMetrics getStatsInfo() {
+ return statsInfo;
+ }
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
index a6202cf..42580bd 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
@@ -115,7 +115,7 @@ public class FileSegment implements Segment {
} catch (final Exception e) {
if (e instanceof IOException) {
ServiceStatusHolder.addReadIOErrCnt();
- BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incIOExceptionCnt();
}
if (this.segmentType == SegmentType.DATA) {
logger.error("[File Store] Set DATA Segment cachedSize error", e);
@@ -140,7 +140,7 @@ public class FileSegment implements Segment {
} catch (Throwable ee) {
if (ee instanceof IOException) {
ServiceStatusHolder.addReadIOErrCnt();
- BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incIOExceptionCnt();
}
logger.error(new StringBuilder(512).append("[File Store] Close ")
.append(this.file.getAbsoluteFile().toString())
@@ -163,7 +163,7 @@ public class FileSegment implements Segment {
} catch (Throwable e1) {
if (e1 instanceof IOException) {
ServiceStatusHolder.addReadIOErrCnt();
- BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incIOExceptionCnt();
}
logger.error("[File Store] failure to close channel ", e1);
}
@@ -175,7 +175,7 @@ public class FileSegment implements Segment {
} catch (Throwable ee) {
if (ee instanceof IOException) {
ServiceStatusHolder.addReadIOErrCnt();
- BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incIOExceptionCnt();
}
logger.error("[File Store] failure to delete file ", ee);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 2b116dd..33495b4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -197,7 +197,7 @@ public class MsgFileStore implements Closeable {
// print abnormal information
if (inIndexOffset != indexOffset || inDataOffset != dataOffset) {
ServiceStatusHolder.addWriteIOErrCnt();
- BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incIOExceptionCnt();
logger.error(sb.append("[File Store]: appendMsg data Error, storekey=")
.append(this.storeKey).append(",msgCnt=").append(msgCnt)
.append(",indexSize=").append(indexSize)
@@ -211,7 +211,7 @@ public class MsgFileStore implements Closeable {
} catch (Throwable e) {
if (!closed.get()) {
ServiceStatusHolder.addWriteIOErrCnt();
- BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incIOExceptionCnt();
}
samplePrintCtrl.printExceptionCaught(e);
} finally {
@@ -327,7 +327,7 @@ public class MsgFileStore implements Closeable {
} catch (Throwable e2) {
if (e2 instanceof IOException) {
ServiceStatusHolder.addReadIOErrCnt();
- BrokerMetricsHolder.METRICS.ioExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incIOExceptionCnt();
}
samplePrintCtrl.printExceptionCaught(e2,
messageStore.getStoreKey(), String.valueOf(partitionId));
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index f37dfa2..e662112 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -272,12 +272,10 @@ public class MsgMemStore implements Closeable {
final ByteBuffer tmpDataReadBuf = this.cacheDataSegment.asReadOnlyBuffer();
tmpIndexBuffer.flip();
tmpDataReadBuf.flip();
- long tmpValue = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis();
msgFileStore.batchAppendMsg(strBuffer, curMessageCount.get(),
cacheIndexOffset.get(), tmpIndexBuffer, cacheDataOffset.get(), tmpDataReadBuf);
- long dltTime = System.currentTimeMillis() - tmpValue;
- BrokerMetricsHolder.METRICS.syncDataDurMin.update(dltTime);
- BrokerMetricsHolder.METRICS.syncDataDurMax.update(dltTime);
+ BrokerMetricsHolder.updSyncDataDurations(System.currentTimeMillis() - startTime);
return true;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
index a4f1b7a..1e74bb7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -614,7 +614,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
}
private void commitCfmOffsets(boolean retryable) {
- long tmpValue = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis();
for (Map.Entry<String, ConcurrentHashMap<String, OffsetStorageInfo>> entry : cfmOffsetMap.entrySet()) {
if (TStringUtils.isBlank(entry.getKey())
|| entry.getValue() == null || entry.getValue().isEmpty()) {
@@ -622,9 +622,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
}
zkOffsetStorage.commitOffset(entry.getKey(), entry.getValue().values(), retryable);
}
- long dltTime = System.currentTimeMillis() - tmpValue;
- BrokerMetricsHolder.METRICS.syncZkDurMin.update(dltTime);
- BrokerMetricsHolder.METRICS.syncZkDurMax.update(dltTime);
+ BrokerMetricsHolder.updSyncZKDurations(System.currentTimeMillis() - startTime);
}
/***
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
index 30eb1d1..7d23ba1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
@@ -79,7 +79,7 @@ public class ZkOffsetStorage implements OffsetStorage {
try {
this.zkw = new ZooKeeperWatcher(zkConfig);
} catch (Throwable e) {
- BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incZKExceptionCnt();
logger.error(new StringBuilder(256)
.append("[ZkOffsetStorage] Failed to connect ZooKeeper server (")
.append(this.zkConfig.getZkServerAddr()).append(") !").toString(), e);
@@ -143,7 +143,7 @@ public class ZkOffsetStorage implements OffsetStorage {
try {
offsetZkInfo = ZKUtil.readDataMaybeNull(this.zkw, znode);
} catch (KeeperException e) {
- BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incZKExceptionCnt();
logger.error("KeeperException during load offsets from ZooKeeper", e);
return null;
}
@@ -183,7 +183,7 @@ public class ZkOffsetStorage implements OffsetStorage {
try {
ZKUtil.updatePersistentPath(this.zkw, offsetPath, offsetData);
} catch (final Throwable t) {
- BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incZKExceptionCnt();
logger.error("Exception during commit offsets to ZooKeeper", t);
throw new OffsetStoreException(t);
}
@@ -224,7 +224,7 @@ public class ZkOffsetStorage implements OffsetStorage {
offsetMap.put(partitionId, Long.parseLong(offsetInfoStrs[1]));
}
} catch (Throwable e) {
- BrokerMetricsHolder.METRICS.zkExceptionCnt.incrementAndGet();
+ BrokerMetricsHolder.incZKExceptionCnt();
offsetMap.put(partitionId, null);
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 189518b..8a08f2b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -110,7 +110,7 @@ import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.Br
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager;
@@ -164,7 +164,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
private AtomicInteger curCltBalanceParal = new AtomicInteger(0);
private Sleeper stopSleeper = new Sleeper(1000, this);
private SimpleVisitTokenManager visitTokenManager;
- private final MasterMetric masterMetrics;
/**
* constructor
@@ -179,7 +178,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.checkAndCreateBdbDataPath();
this.masterAddInfo =
new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort());
- this.masterMetrics = MasterMetric.create();
+ // register metric bean
+ MasterMetricsHolder.registerMXBean();
this.svrExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
this.cltExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig);
@@ -189,9 +189,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
false, TBaseConstants.META_VALUE_UNDEFINED);
this.producerHolder = new ProducerInfoHolder();
this.consumerHolder = new ConsumerInfoHolder(this);
- this.consumerEventManager = new ConsumerEventManager(consumerHolder, masterMetrics);
+ this.consumerEventManager = new ConsumerEventManager(consumerHolder);
this.topicPSInfoManager = new TopicPSInfoManager(this);
- this.loadBalancer = new DefaultLoadBalancer(masterMetrics);
+ this.loadBalancer = new DefaultLoadBalancer();
heartbeatManager.regConsumerCheckBusiness(masterConfig.getConsumerHeartbeatTimeoutMs(),
new TimeoutListener() {
@Override
@@ -283,10 +283,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return brokerRunManager;
}
- public MasterMetric getMasterMetrics() {
- return masterMetrics;
- }
-
/**
* Producer register request to master
*
@@ -350,10 +346,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : "";
heartbeatManager.regProducerNode(producerId);
- if (producerHolder.setProducerInfo(producerId,
- new HashSet<>(transTopicSet), hostName, overtls)) {
- masterMetrics.producerCnt.incrementAndGet();
- }
+ producerHolder.setProducerInfo(producerId,
+ new HashSet<>(transTopicSet), hostName, overtls);
Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
brokerRunManager.getBrokerStaticInfo(overtls);
builder.setBrokerCheckSum(brokerStaticInfo.getF0());
@@ -1697,11 +1691,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
final List<String> subGroups = groupsNeedToBalance.subList(startIndex, endIndex);
if (subGroups.isEmpty()) {
if (curSvrBalanceParal.decrementAndGet() == 0) {
- long durTime = System.currentTimeMillis() - startBalanceTime;
- masterMetrics.svrBalLatency.set(durTime);
- if (durTime > masterMetrics.svrBalLatencyMax.get()) {
- masterMetrics.svrBalLatencyMax.set(durTime);
- }
+ MasterMetricsHolder.updSvrBalanceDurations(
+ System.currentTimeMillis() - startBalanceTime);
}
continue;
}
@@ -1742,11 +1733,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
logger.warn("[Svr-Balance processor] Error during process", e);
} finally {
if (curSvrBalanceParal.decrementAndGet() == 0) {
- long durTime = System.currentTimeMillis() - startBalanceTime;
- masterMetrics.svrBalLatency.set(durTime);
- if (durTime > masterMetrics.svrBalLatencyMax.get()) {
- masterMetrics.svrBalLatencyMax.set(durTime);
- }
+ MasterMetricsHolder.updSvrBalanceDurations(
+ System.currentTimeMillis() - startBalanceTime);
}
}
}
@@ -2570,24 +2558,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
try {
lid = masterRowLock.getLock(null,
StringUtils.getBytesUtf8(consumerId), true);
- ConsumerInfo info = consumerHolder.removeConsumer(group, consumerId);
+ ConsumerInfo info = consumerHolder.removeConsumer(group, consumerId, isTimeout);
currentSubInfo.remove(consumerId);
consumerEventManager.removeAll(consumerId);
if (info != null) {
if (consumerHolder.isConsumeGroupEmpty(group)) {
topicPSInfoManager.rmvGroupSubTopicInfo(group, info.getTopicSet());
- // metric
- masterMetrics.consumeGroupCnt.decrementAndGet();
- if (info.getConsumeType() == ConsumeType.CONSUME_CLIENT_REB) {
- masterMetrics.cltBalConsumeGroupCnt.decrementAndGet();
- }
- if (isTimeout) {
- masterMetrics.consumeGroupTmoTotCnt.incrementAndGet();
- }
- }
- masterMetrics.consumerCnt.decrementAndGet();
- if (isTimeout) {
- masterMetrics.consumerTmoTotCnt.incrementAndGet();
}
}
} catch (IOException e) {
@@ -2604,13 +2580,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
@Override
void run(String clientId, boolean isTimeout) {
if (clientId != null) {
- ProducerInfo info = producerHolder.removeProducer(clientId);
+ ProducerInfo info = producerHolder.removeProducer(clientId, isTimeout);
if (info != null) {
topicPSInfoManager.rmvProducerTopicPubInfo(clientId, info.getTopicSet());
- masterMetrics.producerCnt.decrementAndGet();
- if (isTimeout) {
- masterMetrics.producerTmoTotCnt.incrementAndGet();
- }
}
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
index 501f45e..93568ed 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
@@ -34,7 +34,7 @@ import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
@@ -48,10 +48,9 @@ import org.slf4j.LoggerFactory;
public class DefaultLoadBalancer implements LoadBalancer {
private static final Logger logger = LoggerFactory.getLogger(LoadBalancer.class);
private static final Random RANDOM = new Random(System.currentTimeMillis());
- private final MasterMetric masterMetrics;
- public DefaultLoadBalancer(MasterMetric masterMetrics) {
- this.masterMetrics = masterMetrics;
+ public DefaultLoadBalancer() {
+ // initial information
}
/**
@@ -663,13 +662,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
}
}
if (consumeGroupInfo.addAllocatedTimes() > 0) {
- long durTime = System.currentTimeMillis() - consumeGroupInfo.getCreateTime();
- if (durTime < masterMetrics.svrBalResetDurMin.get()) {
- masterMetrics.svrBalResetDurMin.set(durTime);
- }
- if (durTime > masterMetrics.svrBalResetDurMax.get()) {
- masterMetrics.svrBalResetDurMax.set(durTime);
- }
+ MasterMetricsHolder.updSvrBalResetDurations(
+ System.currentTimeMillis() - consumeGroupInfo.getCreateTime());
}
}
return finalSubInfoMap;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java
deleted file mode 100644
index 20f93ef..0000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.master.metrics;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.commons.config.metrics.CountMetric;
-import org.apache.inlong.commons.config.metrics.Dimension;
-import org.apache.inlong.commons.config.metrics.GaugeMetric;
-import org.apache.inlong.commons.config.metrics.MetricDomain;
-import org.apache.inlong.commons.config.metrics.MetricItem;
-import org.apache.inlong.commons.config.metrics.MetricRegister;
-
-@MetricDomain(name = "master_metrics")
-public class MasterMetric extends MetricItem {
-
- private static final MasterMetric MASTER_METRICS = new MasterMetric();
- private static final AtomicBoolean REGISTER_ONCE =
- new AtomicBoolean(false);
- private static final String METRIC_NAME = "master_metrics";
-
- @Dimension
- public String tagName;
-
- @GaugeMetric
- public AtomicLong consumeGroupCnt = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong cltBalConsumeGroupCnt = new AtomicLong(0);
-
- @CountMetric
- public AtomicLong consumeGroupTmoTotCnt = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong consumerCnt = new AtomicLong(0);
-
- @CountMetric
- public AtomicLong consumerTmoTotCnt = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong producerCnt = new AtomicLong(0);
-
- @CountMetric
- public AtomicLong producerTmoTotCnt = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong brokerConfigCnt = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong brokerOnlineCnt = new AtomicLong(0);
-
- @CountMetric
- public AtomicLong brokerAbnTotCnt = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong brokerAbnCurCnt = new AtomicLong(0);
-
- @CountMetric
- public AtomicLong brokerFbdTotCnt = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong brokerFbdCurCnt = new AtomicLong(0);
-
- @CountMetric
- public AtomicLong brokerTmoTotCnt = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong svrBalLatency = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong svrBalLatencyMax = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong svrBalResetDurMin = new AtomicLong(Long.MAX_VALUE);
-
- @GaugeMetric
- public AtomicLong svrBalResetDurMax = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong svrBalConEventConsumerCnt = new AtomicLong(0);
-
- @GaugeMetric
- public AtomicLong svrBalDisConEventConsumerCnt = new AtomicLong(0);
-
- public static MasterMetric create() {
- if (REGISTER_ONCE.compareAndSet(false, true)) {
- MASTER_METRICS.tagName = METRIC_NAME;
- MetricRegister.register(MASTER_METRICS);
- }
- return MASTER_METRICS;
- }
-}
-
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricMXBean.java
similarity index 67%
copy from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java
copy to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricMXBean.java
index f8f0c38..fa98f2d 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metrics/BrokerMetricMXBean.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricMXBean.java
@@ -15,19 +15,22 @@
* limitations under the License.
*/
-package org.apache.inlong.tubemq.server.broker.metrics;
+package org.apache.inlong.tubemq.server.master.metrics;
import org.apache.inlong.tubemq.corebase.metric.MetricValues;
/**
- * BrokerMonitorMXBean
- * Provide access interface of a metric item with JMX.<br>
- * Decouple between metric item and monitor system, in particular scene, <br>
- * inlong can depend on user-defined monitor system.
+ * MasterMetricMXBean
+ * Master's metric data access interface, including:
+ * the getMetric() that directly obtains data
+ * the getAndReSetMetrics() that can clear the values of
+ * the counter, maximum and minimum extremum Gauge data
*/
-public interface BrokerMetricMXBean {
+public interface MasterMetricMXBean {
+ // get current metric data by viewing mode
MetricValues getMetrics();
+ // get current metric data and reset the Counter, maximum/minimum Gauge metric
MetricValues getAndReSetMetrics();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetrics.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetrics.java
new file mode 100644
index 0000000..0c0f2d2
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetrics.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.metrics;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.metric.AbsMetricItem;
+import org.apache.inlong.tubemq.corebase.metric.CountMetricItem;
+import org.apache.inlong.tubemq.corebase.metric.GaugeMaxMetricItem;
+import org.apache.inlong.tubemq.corebase.metric.GaugeMinMetricItem;
+import org.apache.inlong.tubemq.corebase.metric.GaugeNormMetricItem;
+import org.apache.inlong.tubemq.corebase.metric.MetricValues;
+import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
+
+public class MasterMetrics implements MasterMetricMXBean {
+
+ // statistics time since last reset
+ private final AtomicLong lastResetTime =
+ new AtomicLong(System.currentTimeMillis());
+ // consume group statistics
+ protected final AbsMetricItem consumeGroupCnt =
+ new GaugeNormMetricItem("consume_group_cnt");
+ protected final AbsMetricItem consumeGroupTmoTotCnt =
+ new CountMetricItem("consume_group_timeout_cnt");
+ protected final AbsMetricItem cltBalConsumeGroupCnt =
+ new GaugeNormMetricItem("client_balance_group_cnt");
+ protected final AbsMetricItem cltBalGroupTmototCnt =
+ new CountMetricItem("clt_balance_timeout_cnt");
+ // consumer client statistics
+ protected final AbsMetricItem consumerOnlineCnt =
+ new GaugeNormMetricItem("consumer_online_cnt");
+ protected final AbsMetricItem consumerTmoTotCnt =
+ new CountMetricItem("consumer_timeout_cnt");
+ // producer client statistics
+ protected final AbsMetricItem producerOnlineCnt =
+ new GaugeNormMetricItem("producer_online_cnt");
+ protected final AbsMetricItem producerTmoTotCnt =
+ new CountMetricItem("producer_timeout_cnt");
+ // broker node statistics
+ protected final AbsMetricItem brokerConfigCnt =
+ new GaugeNormMetricItem("broker_configure_cnt");
+ protected final AbsMetricItem brokerOnlineCnt =
+ new GaugeNormMetricItem("broker_online_cnt");
+ protected final AbsMetricItem brokerTmoTotCnt =
+ new CountMetricItem("broker_timeout_cnt");
+ protected final AbsMetricItem brokerAbnCurCnt =
+ new GaugeNormMetricItem("broker_abn_current_cnt");
+ protected final AbsMetricItem brokerAbnTotCnt =
+ new CountMetricItem("broker_abn_total_cnt");
+ protected final AbsMetricItem brokerFbdCurCnt =
+ new GaugeNormMetricItem("broker_fbd_current_cnt");
+ protected final AbsMetricItem brokerFbdTotCnt =
+ new CountMetricItem("broker_fbd_total_cnt");
+ // server balance statistics
+ protected final AbsMetricItem svrBalDuration =
+ new GaugeNormMetricItem("svrbalance_duration");
+ protected final AbsMetricItem svrBalDurationMin =
+ new GaugeMinMetricItem("svrbalance_duration_min");
+ protected final AbsMetricItem svrBalDurationMax =
+ new GaugeMaxMetricItem("svrbalance_duration_max");
+ protected final AbsMetricItem svrBalResetDurMin =
+ new GaugeMinMetricItem("svrbal_reset_duration_min");
+ protected final AbsMetricItem svrBalResetDurMax =
+ new GaugeMaxMetricItem("svrbal_reset_duration_max");
+ protected final AbsMetricItem svrBalConEventConsumerCnt =
+ new GaugeNormMetricItem("svrbal_con_consumer_cnt");
+ protected final AbsMetricItem svrBalDisConEventConsumerCnt =
+ new GaugeNormMetricItem("svrbal_discon_consumer_cnt");
+
+ @Override
+ public MetricValues getMetrics() {
+ Map<String, Long> metricValues = new HashMap<>();
+ metricValues.put(consumeGroupCnt.getName(), consumeGroupCnt.getValue());
+ metricValues.put(consumeGroupTmoTotCnt.getName(), consumeGroupTmoTotCnt.getValue());
+ metricValues.put(cltBalConsumeGroupCnt.getName(), cltBalConsumeGroupCnt.getValue());
+ metricValues.put(cltBalGroupTmototCnt.getName(), cltBalGroupTmototCnt.getValue());
+ metricValues.put(consumerOnlineCnt.getName(), consumerOnlineCnt.getValue());
+ metricValues.put(consumerTmoTotCnt.getName(), consumerTmoTotCnt.getValue());
+ metricValues.put(producerOnlineCnt.getName(), producerOnlineCnt.getValue());
+ metricValues.put(producerTmoTotCnt.getName(), producerTmoTotCnt.getValue());
+ metricValues.put(brokerConfigCnt.getName(), brokerConfigCnt.getValue());
+ metricValues.put(brokerOnlineCnt.getName(), brokerOnlineCnt.getValue());
+ metricValues.put(brokerTmoTotCnt.getName(), brokerTmoTotCnt.getValue());
+ metricValues.put(brokerAbnCurCnt.getName(), brokerAbnCurCnt.getValue());
+ metricValues.put(brokerAbnTotCnt.getName(), brokerAbnTotCnt.getValue());
+ metricValues.put(brokerFbdCurCnt.getName(), brokerFbdCurCnt.getValue());
+ metricValues.put(brokerFbdTotCnt.getName(), brokerFbdTotCnt.getValue());
+ metricValues.put(svrBalDuration.getName(), svrBalDuration.getValue());
+ metricValues.put(svrBalDurationMin.getName(), svrBalDurationMin.getValue());
+ metricValues.put(svrBalDurationMax.getName(), svrBalDurationMax.getValue());
+ metricValues.put(svrBalResetDurMin.getName(), svrBalResetDurMin.getValue());
+ metricValues.put(svrBalResetDurMax.getName(), svrBalResetDurMax.getValue());
+ metricValues.put(svrBalConEventConsumerCnt.getName(),
+ svrBalConEventConsumerCnt.getValue());
+ metricValues.put(svrBalDisConEventConsumerCnt.getName(),
+ svrBalDisConEventConsumerCnt.getValue());
+ return new MetricValues(WebParameterUtils.date2yyyyMMddHHmmss(
+ new Date(lastResetTime.get())), metricValues);
+ }
+
+ @Override
+ public MetricValues getAndReSetMetrics() {
+ Map<String, Long> metricValues = new HashMap<>();
+ metricValues.put(consumeGroupCnt.getName(), consumeGroupCnt.getAndSet());
+ metricValues.put(consumeGroupTmoTotCnt.getName(), consumeGroupTmoTotCnt.getAndSet());
+ metricValues.put(cltBalConsumeGroupCnt.getName(), cltBalConsumeGroupCnt.getAndSet());
+ metricValues.put(cltBalGroupTmototCnt.getName(), cltBalGroupTmototCnt.getAndSet());
+ metricValues.put(consumerOnlineCnt.getName(), consumerOnlineCnt.getAndSet());
+ metricValues.put(consumerTmoTotCnt.getName(), consumerTmoTotCnt.getAndSet());
+ metricValues.put(producerOnlineCnt.getName(), producerOnlineCnt.getAndSet());
+ metricValues.put(producerTmoTotCnt.getName(), producerTmoTotCnt.getAndSet());
+ metricValues.put(brokerConfigCnt.getName(), brokerConfigCnt.getAndSet());
+ metricValues.put(brokerOnlineCnt.getName(), brokerOnlineCnt.getAndSet());
+ metricValues.put(brokerTmoTotCnt.getName(), brokerTmoTotCnt.getAndSet());
+ metricValues.put(brokerAbnCurCnt.getName(), brokerAbnCurCnt.getAndSet());
+ metricValues.put(brokerAbnTotCnt.getName(), brokerAbnTotCnt.getAndSet());
+ metricValues.put(brokerFbdCurCnt.getName(), brokerFbdCurCnt.getAndSet());
+ metricValues.put(brokerFbdTotCnt.getName(), brokerFbdTotCnt.getAndSet());
+ metricValues.put(svrBalDuration.getName(), svrBalDuration.getAndSet());
+ metricValues.put(svrBalDurationMin.getName(), svrBalDurationMin.getAndSet());
+ metricValues.put(svrBalDurationMax.getName(), svrBalDurationMax.getAndSet());
+ metricValues.put(svrBalResetDurMin.getName(), svrBalResetDurMin.getAndSet());
+ metricValues.put(svrBalResetDurMax.getName(), svrBalResetDurMax.getAndSet());
+ metricValues.put(svrBalConEventConsumerCnt.getName(),
+ svrBalConEventConsumerCnt.getAndSet());
+ metricValues.put(svrBalDisConEventConsumerCnt.getName(),
+ svrBalDisConEventConsumerCnt.getAndSet());
+ alignBrokerFbdMetrics();
+ alignBrokerAbnMetrics();
+ long befTime = lastResetTime.getAndSet(System.currentTimeMillis());
+ return new MetricValues(WebParameterUtils.date2yyyyMMddHHmmss(
+ new Date(befTime)), metricValues);
+ }
+
+ public long getLastResetTime() {
+ return lastResetTime.get();
+ }
+
+ public AbsMetricItem getConsumeGroupCnt() {
+ return consumeGroupCnt;
+ }
+
+ public AbsMetricItem getConsumeGroupTmoTotCnt() {
+ return consumeGroupTmoTotCnt;
+ }
+
+ public AbsMetricItem getCltBalConsumeGroupCnt() {
+ return cltBalConsumeGroupCnt;
+ }
+
+ public AbsMetricItem getCltBalGroupTmototCnt() {
+ return cltBalGroupTmototCnt;
+ }
+
+ public AbsMetricItem getConsumerOnlineCnt() {
+ return consumerOnlineCnt;
+ }
+
+ public AbsMetricItem getConsumerTmoTotCnt() {
+ return consumerTmoTotCnt;
+ }
+
+ public AbsMetricItem getProducerOnlineCnt() {
+ return producerOnlineCnt;
+ }
+
+ public AbsMetricItem getProducerTmoTotCnt() {
+ return producerTmoTotCnt;
+ }
+
+ public AbsMetricItem getBrokerConfigCnt() {
+ return brokerConfigCnt;
+ }
+
+ public AbsMetricItem getBrokerOnlineCnt() {
+ return brokerOnlineCnt;
+ }
+
+ public AbsMetricItem getBrokerTmoTotCnt() {
+ return brokerTmoTotCnt;
+ }
+
+ public AbsMetricItem getBrokerAbnCurCnt() {
+ return brokerAbnCurCnt;
+ }
+
+ public AbsMetricItem getBrokerAbnTotCnt() {
+ return brokerAbnTotCnt;
+ }
+
+ public AbsMetricItem getBrokerFbdCurCnt() {
+ return brokerFbdCurCnt;
+ }
+
+ public AbsMetricItem getBrokerFbdTotCnt() {
+ return brokerFbdTotCnt;
+ }
+
+ public AbsMetricItem getSvrBalDuration() {
+ return svrBalDuration;
+ }
+
+ public AbsMetricItem getSvrBalDurationMin() {
+ return svrBalDurationMin;
+ }
+
+ public AbsMetricItem getSvrBalDurationMax() {
+ return svrBalDurationMax;
+ }
+
+ public AbsMetricItem getSvrBalResetDurMin() {
+ return svrBalResetDurMin;
+ }
+
+ public AbsMetricItem getSvrBalResetDurMax() {
+ return svrBalResetDurMax;
+ }
+
+ public AbsMetricItem getSvrBalConEventConsumerCnt() {
+ return svrBalConEventConsumerCnt;
+ }
+
+ public AbsMetricItem getSvrBalDisConEventConsumerCnt() {
+ return svrBalDisConEventConsumerCnt;
+ }
+
+ private void alignBrokerFbdMetrics() {
+ // Notice: the minimum value of the brokerFbdTotCnt metric value is
+ // the current value of brokerFbdCurCnt, so the metric value
+ // needs to be aligned after reset
+ long curCnt = brokerFbdCurCnt.getValue();
+ long totalCnt = brokerFbdTotCnt.getValue();
+ while (curCnt > totalCnt) {
+ if (brokerFbdTotCnt.compareAndSet(totalCnt, curCnt)) {
+ break;
+ }
+ curCnt = brokerFbdCurCnt.getValue();
+ totalCnt = brokerFbdTotCnt.getValue();
+ }
+ }
+
+ private void alignBrokerAbnMetrics() {
+ // Notice: the minimum value of the brokerAbnTotCnt metric value is
+ // the current value of brokerAbnCurCnt, so the metric value
+ // needs to be aligned after reset
+ long curCnt = brokerAbnCurCnt.getValue();
+ long totalCnt = brokerAbnTotCnt.getValue();
+ while (curCnt > totalCnt) {
+ if (brokerAbnTotCnt.compareAndSet(totalCnt, curCnt)) {
+ break;
+ }
+ curCnt = brokerAbnCurCnt.getValue();
+ totalCnt = brokerAbnTotCnt.getValue();
+ }
+ }
+}
+
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricsHolder.java
new file mode 100644
index 0000000..416420d
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetricsHolder.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MasterMetricsHolder {
+ private static final Logger logger =
+ LoggerFactory.getLogger(MasterMetricsHolder.class);
+ // Registration status indicator
+ private static final AtomicBoolean registered =
+ new AtomicBoolean(false);
+ // master metrics information
+ private static final MasterMetrics statsInfo = new MasterMetrics();
+
+ public static void registerMXBean() {
+ if (!registered.compareAndSet(false, true)) {
+ return;
+ }
+ try {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName mxBeanName =
+ new ObjectName("org.apache.inlong.tubemq.server.master:type=MasterMetrics");
+ mbs.registerMBean(statsInfo, mxBeanName);
+ } catch (Exception ex) {
+ logger.error("Register MasterMXBean error: ", ex);
+ }
+ }
+
+ public static void incConsumerCnt(boolean isGroupEmpty, boolean isCltBal) {
+ statsInfo.consumerOnlineCnt.incrementAndGet();
+ if (isGroupEmpty) {
+ statsInfo.consumeGroupCnt.incrementAndGet();
+ if (isCltBal) {
+ statsInfo.cltBalConsumeGroupCnt.incrementAndGet();
+ }
+ }
+ }
+
+ public static void decConsumerCnt(boolean isTimeout,
+ boolean isGroupEmpty,
+ boolean isCltBal) {
+ statsInfo.consumerOnlineCnt.decrementAndGet();
+ if (isTimeout) {
+ statsInfo.consumerTmoTotCnt.incrementAndGet();
+ }
+ if (isGroupEmpty) {
+ decConsumeGroupCnt(isTimeout, isCltBal);
+ }
+ }
+
+ public static void decConsumeGroupCnt(boolean isTimeout, boolean isCltBal) {
+ statsInfo.consumeGroupCnt.decrementAndGet();
+ if (isTimeout) {
+ statsInfo.consumeGroupTmoTotCnt.incrementAndGet();
+ }
+ if (isCltBal) {
+ statsInfo.cltBalConsumeGroupCnt.decrementAndGet();
+ if (isTimeout) {
+ statsInfo.cltBalGroupTmototCnt.incrementAndGet();
+ }
+ }
+ }
+
+ public static void incProducerCnt() {
+ statsInfo.producerOnlineCnt.incrementAndGet();
+ }
+
+ public static void decProducerCnt(boolean isTimeout) {
+ statsInfo.producerOnlineCnt.decrementAndGet();
+ if (isTimeout) {
+ statsInfo.producerTmoTotCnt.incrementAndGet();
+ }
+ }
+
+ public static void incSvrBalDisConConsumerCnt() {
+ statsInfo.svrBalDisConEventConsumerCnt.incrementAndGet();
+ }
+
+ public static void decSvrBalDisConConsumerCnt() {
+ statsInfo.svrBalDisConEventConsumerCnt.decrementAndGet();
+ }
+
+ public static void incSvrBalConEventConsumerCnt() {
+ statsInfo.svrBalConEventConsumerCnt.incrementAndGet();
+ }
+
+ public static void decSvrBalConEventConsumerCnt() {
+ statsInfo.svrBalConEventConsumerCnt.decrementAndGet();
+ }
+
+ public static void incBrokerConfigCnt() {
+ statsInfo.brokerConfigCnt.incrementAndGet();
+ }
+
+ public static void decBrokerConfigCnt() {
+ statsInfo.brokerConfigCnt.decrementAndGet();
+ }
+
+ public static void incBrokerOnlineCnt() {
+ statsInfo.brokerOnlineCnt.incrementAndGet();
+ }
+
+ public static void decBrokerOnlineCnt(boolean isTimeout) {
+ statsInfo.brokerOnlineCnt.decrementAndGet();
+ if (isTimeout) {
+ statsInfo.brokerTmoTotCnt.incrementAndGet();
+ }
+ }
+
+ public static void incBrokerAbnormalCnt() {
+ statsInfo.brokerAbnCurCnt.incrementAndGet();
+ statsInfo.brokerAbnTotCnt.incrementAndGet();
+ }
+
+ public static void decBrokerAbnormalCnt() {
+ statsInfo.brokerAbnCurCnt.decrementAndGet();
+ }
+
+ public static void incBrokerForbiddenCnt() {
+ statsInfo.brokerFbdCurCnt.incrementAndGet();
+ statsInfo.brokerFbdTotCnt.incrementAndGet();
+ }
+
+ public static void decBrokerForbiddenCnt() {
+ statsInfo.brokerFbdCurCnt.decrementAndGet();
+ }
+
+ public static void updSvrBalanceDurations(long dltTime) {
+ statsInfo.svrBalDuration.update(dltTime);
+ statsInfo.svrBalDurationMin.update(dltTime);
+ statsInfo.svrBalDurationMax.update(dltTime);
+ }
+
+ public static void updSvrBalResetDurations(long dltTime) {
+ statsInfo.svrBalResetDurMin.update(dltTime);
+ statsInfo.svrBalResetDurMax.update(dltTime);
+ }
+
+ public static MasterMetrics getStatsInfo() {
+ return statsInfo;
+ }
+}
+
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 2987225..e22ad84 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -33,7 +33,7 @@ import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,15 +51,11 @@ public class BrokerAbnHolder {
private final MetaDataManager metaDataManager;
private final AtomicInteger brokerForbiddenCount =
new AtomicInteger(0);
- // master metrics
- private final MasterMetric masterMetrics;
public BrokerAbnHolder(final int maxAutoForbiddenCnt,
- final MetaDataManager metaDataManager,
- final MasterMetric masterMetrics) {
+ final MetaDataManager metaDataManager) {
this.maxAutoForbiddenCnt = maxAutoForbiddenCnt;
this.metaDataManager = metaDataManager;
- this.masterMetrics = masterMetrics;
}
/**
@@ -79,7 +75,7 @@ public class BrokerAbnHolder {
if (brokerForbiddenMap.get(brokerId) == null) {
brokerAbnInfo = brokerAbnormalMap.remove(brokerId);
if (brokerAbnInfo != null) {
- masterMetrics.brokerAbnCurCnt.decrementAndGet();
+ MasterMetricsHolder.decBrokerAbnormalCnt();
logger.warn(sBuffer.append("[Broker AutoForbidden] broker ")
.append(brokerId).append(" return to normal!").toString());
sBuffer.delete(0, sBuffer.length());
@@ -104,8 +100,7 @@ public class BrokerAbnHolder {
if (brokerAbnInfo == null) {
if (brokerAbnormalMap.putIfAbsent(brokerId,
new BrokerAbnInfo(brokerId, reportReadStatus, reportWriteStatus)) == null) {
- masterMetrics.brokerAbnTotCnt.incrementAndGet();
- masterMetrics.brokerAbnCurCnt.incrementAndGet();
+ MasterMetricsHolder.incBrokerAbnormalCnt();
logger.warn(sBuffer.append("[Broker AutoForbidden] broker report abnormal, ")
.append(brokerId).append("'s reportReadStatus=")
.append(reportReadStatus).append(", reportWriteStatus=")
@@ -124,8 +119,7 @@ public class BrokerAbnHolder {
if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) {
if (brokerForbiddenMap.putIfAbsent(brokerId, tmpBrokerFbdInfo) == null) {
brokerForbiddenCount.incrementAndGet();
- masterMetrics.brokerFbdTotCnt.incrementAndGet();
- masterMetrics.brokerFbdCurCnt.incrementAndGet();
+ MasterMetricsHolder.incBrokerForbiddenCnt();
logger.warn(sBuffer
.append("[Broker AutoForbidden] master add missing forbidden broker, ")
.append(brokerId).append("'s manage status to ")
@@ -143,7 +137,7 @@ public class BrokerAbnHolder {
brokerForbiddenCount.decrementAndGet();
return;
}
- masterMetrics.brokerFbdCurCnt.incrementAndGet();
+ MasterMetricsHolder.incBrokerForbiddenCnt();
logger.warn(sBuffer
.append("[Broker AutoForbidden] master auto forbidden broker, ")
.append(brokerId).append("'s manage status to ")
@@ -185,12 +179,12 @@ public class BrokerAbnHolder {
public void removeBroker(Integer brokerId) {
BrokerAbnInfo abnInfo = brokerAbnormalMap.remove(brokerId);
if (abnInfo != null) {
- masterMetrics.brokerAbnCurCnt.decrementAndGet();
+ MasterMetricsHolder.decBrokerAbnormalCnt();
}
BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.remove(brokerId);
if (brokerFbdInfo != null) {
this.brokerForbiddenCount.decrementAndGet();
- masterMetrics.brokerFbdCurCnt.decrementAndGet();
+ MasterMetricsHolder.decBrokerForbiddenCnt();
}
}
@@ -275,10 +269,10 @@ public class BrokerAbnHolder {
brokerFbdInfos.add(fbdInfo);
BrokerAbnInfo abnInfo = this.brokerAbnormalMap.remove(brokerId);
if (abnInfo != null) {
- masterMetrics.brokerAbnCurCnt.decrementAndGet();
+ MasterMetricsHolder.decBrokerAbnormalCnt();
}
this.brokerForbiddenCount.decrementAndGet();
- masterMetrics.brokerFbdCurCnt.decrementAndGet();
+ MasterMetricsHolder.decBrokerForbiddenCnt();
}
}
if (!brokerFbdInfos.isEmpty()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index e06fce4..e636531 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -45,7 +45,7 @@ import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,8 +76,6 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
private final BrokerAbnHolder brokerAbnHolder;
// broker topic configure for consumer and producer
private final BrokerPSInfoHolder brokerPubSubInfo = new BrokerPSInfoHolder();
- // master metrics
- private final MasterMetric masterMetrics;
/**
* Constructor by TMaster
@@ -85,13 +83,11 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
* @param tMaster the initial TMaster object
*/
public DefBrokerRunManager(TMaster tMaster) {
- this.masterMetrics = tMaster.getMasterMetrics();
this.metaDataManager = tMaster.getDefMetaDataManager();
this.heartbeatManager = tMaster.getHeartbeatManager();
MasterConfig masterConfig = tMaster.getMasterConfig();
this.brokerAbnHolder =
- new BrokerAbnHolder(masterConfig.getMaxAutoForbiddenCnt(),
- this.metaDataManager, this.masterMetrics);
+ new BrokerAbnHolder(masterConfig.getMaxAutoForbiddenCnt(), this.metaDataManager);
heartbeatManager.regBrokerCheckBusiness(masterConfig.getBrokerHeartbeatTimeoutMs(),
new TimeoutListener() {
@Override
@@ -150,7 +146,7 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
|| !brokerReg.equals(entity.getSimpleBrokerInfo())
|| !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
if (brokerReg == null) {
- masterMetrics.brokerConfigCnt.incrementAndGet();
+ MasterMetricsHolder.incBrokerConfigCnt();
} else {
if (!brokerReg.equals(entity.getSimpleBrokerInfo())) {
this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo());
@@ -248,7 +244,7 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
brokerInfo.getBrokerId(), tmpRunStatusInfo);
if (runStatusInfo == null) {
brokerTotalCount.incrementAndGet();
- masterMetrics.brokerOnlineCnt.incrementAndGet();
+ MasterMetricsHolder.incBrokerOnlineCnt();
runStatusInfo = tmpRunStatusInfo;
}
} else {
@@ -479,10 +475,7 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
if (runStatusInfo == null) {
return false;
}
- masterMetrics.brokerOnlineCnt.decrementAndGet();
- if (isTimeout) {
- masterMetrics.brokerTmoTotCnt.incrementAndGet();
- }
+ MasterMetricsHolder.decBrokerOnlineCnt(isTimeout);
brokerTotalCount.decrementAndGet();
brokerAbnHolder.removeBroker(brokerId);
brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
index f4fa335..1966480 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,12 +42,9 @@ public class ConsumerEventManager {
new ConcurrentHashMap<>();
private final ConsumerInfoHolder consumerHolder;
- private final MasterMetric masterMetrics;
- public ConsumerEventManager(ConsumerInfoHolder consumerHolder,
- MasterMetric masterMetrics) {
+ public ConsumerEventManager(ConsumerInfoHolder consumerHolder) {
this.consumerHolder = consumerHolder;
- this.masterMetrics = masterMetrics;
}
public boolean addDisconnectEvent(String consumerId,
@@ -59,7 +56,7 @@ public class ConsumerEventManager {
LinkedList<ConsumerEvent> tmptList =
disconnectEventMap.putIfAbsent(consumerId, eventList);
if (tmptList == null) {
- masterMetrics.svrBalDisConEventConsumerCnt.incrementAndGet();
+ MasterMetricsHolder.incSvrBalDisConConsumerCnt();
} else {
eventList = tmptList;
}
@@ -78,7 +75,7 @@ public class ConsumerEventManager {
LinkedList<ConsumerEvent> tmptList =
connectEventMap.putIfAbsent(consumerId, eventList);
if (tmptList == null) {
- masterMetrics.svrBalConEventConsumerCnt.incrementAndGet();
+ MasterMetricsHolder.incSvrBalConEventConsumerCnt();
} else {
eventList = tmptList;
}
@@ -138,9 +135,9 @@ public class ConsumerEventManager {
if (eventList.isEmpty()) {
currentEventMap.remove(consumerId);
if (selDisConnMap) {
- masterMetrics.svrBalDisConEventConsumerCnt.decrementAndGet();
+ MasterMetricsHolder.decSvrBalDisConConsumerCnt();
} else {
- masterMetrics.svrBalConEventConsumerCnt.decrementAndGet();
+ MasterMetricsHolder.decSvrBalConEventConsumerCnt();
}
}
}
@@ -199,11 +196,11 @@ public class ConsumerEventManager {
LinkedList<ConsumerEvent> eventInfos =
disconnectEventMap.remove(consumerId);
if (eventInfos != null) {
- masterMetrics.svrBalDisConEventConsumerCnt.decrementAndGet();
+ MasterMetricsHolder.decSvrBalDisConConsumerCnt();
}
eventInfos = connectEventMap.remove(consumerId);
if (eventInfos != null) {
- masterMetrics.svrBalConEventConsumerCnt.decrementAndGet();
+ MasterMetricsHolder.decSvrBalConEventConsumerCnt();
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index 122f668..0fce762 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -30,7 +30,7 @@ import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
import org.apache.inlong.tubemq.server.common.utils.RowLock;
import org.apache.inlong.tubemq.server.master.MasterConfig;
import org.apache.inlong.tubemq.server.master.TMaster;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +39,6 @@ public class ConsumerInfoHolder {
private static final Logger logger =
LoggerFactory.getLogger(ConsumerInfoHolder.class);
private final MasterConfig masterConfig; // master configure
- private final MasterMetric masterMetrics;
private final RowLock groupRowLock; //lock
private final ConcurrentHashMap<String/* group */, ConsumeGroupInfo> groupInfoMap =
new ConcurrentHashMap<>();
@@ -51,7 +50,6 @@ public class ConsumerInfoHolder {
new ConcurrentHashSet<>();
public ConsumerInfoHolder(TMaster tMasterr) {
- this.masterMetrics = tMasterr.getMasterMetrics();
this.masterConfig = tMasterr.getMasterConfig();
this.groupRowLock = new RowLock("Group-RowLock",
this.masterConfig.getRowLockWaitDurMs());
@@ -355,19 +353,19 @@ public class ConsumerInfoHolder {
consumeGroupInfo = groupInfoMap.putIfAbsent(group, tmpGroupInfo);
if (consumeGroupInfo == null) {
consumeGroupInfo = tmpGroupInfo;
- masterMetrics.consumeGroupCnt.incrementAndGet();
if (tmpGroupInfo.isClientBalance()) {
clientBalanceGroupSet.add(group);
- masterMetrics.cltBalConsumeGroupCnt.incrementAndGet();
} else {
serverBalanceGroupSet.add(group);
}
+ MasterMetricsHolder.incConsumerCnt(true,
+ consumeGroupInfo.isClientBalance());
}
}
if (consumeGroupInfo.addConsumer(consumer, sBuffer, result)) {
- Boolean isNewAdd = (Boolean) result.checkData;
- if (isNewAdd) {
- masterMetrics.consumerCnt.incrementAndGet();
+ if ((Boolean) result.checkData) {
+ MasterMetricsHolder.incConsumerCnt(false,
+ consumeGroupInfo.isClientBalance());
}
if (!isNotAllocated) {
consumeGroupInfo.settAllocated();
@@ -391,12 +389,15 @@ public class ConsumerInfoHolder {
*
* @param group group name of consumer
* @param consumerId consumer id
+ * @param isTimeout if timeout
* @return ConsumerInfo
*/
- public ConsumerInfo removeConsumer(String group, String consumerId) {
+ public ConsumerInfo removeConsumer(String group, String consumerId, boolean isTimeout) {
if (group == null || consumerId == null) {
return null;
}
+ boolean isCltBal = false;
+ boolean rmvGroup = false;
ConsumerInfo consumer = null;
Integer lid = null;
try {
@@ -406,11 +407,23 @@ public class ConsumerInfoHolder {
if (consumeGroupInfo != null) {
consumer = consumeGroupInfo.removeConsumer(consumerId);
if (consumeGroupInfo.isGroupEmpty()) {
- groupInfoMap.remove(group);
+ rmvGroup = (groupInfoMap.remove(group) != null);
if (consumeGroupInfo.isClientBalance()) {
- clientBalanceGroupSet.add(group);
+ isCltBal = true;
+ clientBalanceGroupSet.remove(group);
} else {
- serverBalanceGroupSet.add(group);
+ serverBalanceGroupSet.remove(group);
+ }
+ if (rmvGroup) {
+ if (consumer == null) {
+ MasterMetricsHolder.decConsumeGroupCnt(isTimeout, isCltBal);
+ } else {
+ MasterMetricsHolder.decConsumerCnt(isTimeout, true, isCltBal);
+ }
+ }
+ } else {
+ if (consumer != null) {
+ MasterMetricsHolder.decConsumerCnt(isTimeout, false, false);
}
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
index 6acd0d0..22a578c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
@@ -20,6 +20,7 @@ package org.apache.inlong.tubemq.server.master.nodemanage.nodeproducer;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.inlong.tubemq.corebase.cluster.ProducerInfo;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
public class ProducerInfoHolder {
@@ -30,12 +31,13 @@ public class ProducerInfoHolder {
return producerInfoMap.get(producerId);
}
- public boolean setProducerInfo(String producerId,
+ public void setProducerInfo(String producerId,
Set<String> topicSet,
String host, boolean overTLS) {
- ProducerInfo oldObj = producerInfoMap.put(producerId,
- new ProducerInfo(producerId, topicSet, host, overTLS));
- return (oldObj == null);
+ if (producerInfoMap.put(producerId,
+ new ProducerInfo(producerId, topicSet, host, overTLS)) == null) {
+ MasterMetricsHolder.incProducerCnt();
+ }
}
public void updateProducerInfo(String producerId,
@@ -51,8 +53,12 @@ public class ProducerInfoHolder {
}
}
- public ProducerInfo removeProducer(String producerId) {
- return producerInfoMap.remove(producerId);
+ public ProducerInfo removeProducer(String producerId, boolean isTimeout) {
+ ProducerInfo info = producerInfoMap.remove(producerId);
+ if (info != null) {
+ MasterMetricsHolder.decProducerCnt(isTimeout);
+ }
+ return info;
}
public void clear() {
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java
index be7640c..5f84d59 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/BrokerMetricsTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.tubemq.server.broker;
import org.apache.inlong.tubemq.corebase.metric.MetricValues;
import org.apache.inlong.tubemq.server.broker.metrics.BrokerMetrics;
+import org.apache.inlong.tubemq.server.broker.metrics.BrokerMetricsHolder;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -29,45 +30,199 @@ public class BrokerMetricsTest {
LoggerFactory.getLogger(BrokerMetricsTest.class);
@Test
- public void testAgentMetrics() {
+ public void testBrokerMetrics() {
try {
BrokerMetrics metrics = new BrokerMetrics();
- metrics.zkExceptionCnt.incrementAndGet();
- metrics.consumerTmoTotCnt.incrementAndGet();
- metrics.syncDataDurMax.update(10000);
- metrics.syncDataDurMin.update(2000);
- metrics.syncDataDurMax.update(20000);
- metrics.syncDataDurMin.update(1000);
- metrics.syncDataDurMin.update(3000);
- metrics.syncDataDurMax.update(30000);
+ // test case 1, set data
+ metrics.getIoExceptionCnt().incrementAndGet();
+ metrics.getZkExceptionCnt().incrementAndGet();
+ metrics.getConsumerOnlineCnt().incrementAndGet();
+ metrics.getConsumerOnlineCnt().incrementAndGet();
+ metrics.getConsumerTmoTotCnt().incrementAndGet();
+ metrics.getHbExceptionCnt().incrementAndGet();
+ metrics.getMasterNoNodeCnt().incrementAndGet();
+
+ metrics.getSyncDataDurMax().update(20000);
+ metrics.getSyncDataDurMax().update(10000);
+ metrics.getSyncDataDurMax().update(30000);
+ metrics.getSyncDataDurMin().update(2000);
+ metrics.getSyncDataDurMin().update(1000);
+ metrics.getSyncDataDurMin().update(3000);
+
+ metrics.getSyncZkDurMax().update(20000);
+ metrics.getSyncZkDurMax().update(1000);
+ metrics.getSyncZkDurMax().update(30000);
+
+ metrics.getSyncZkDurMin().update(2000);
+ metrics.getSyncZkDurMin().update(100);
+ metrics.getSyncZkDurMin().update(3000);
+ // get metric and compare data
MetricValues result1 = metrics.getMetrics();
- Assert.assertEquals(Long.valueOf(1000),
- result1.getMetricValues().get(metrics.syncDataDurMin.getName()));
- Assert.assertEquals(Long.valueOf(30000),
- result1.getMetricValues().get(metrics.syncDataDurMax.getName()));
Assert.assertEquals(Long.valueOf(1),
- result1.getMetricValues().get(metrics.zkExceptionCnt.getName()));
+ result1.getMetricValues().get(metrics.getIoExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getZkExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
Assert.assertEquals(Long.valueOf(1),
- result1.getMetricValues().get(metrics.consumerTmoTotCnt.getName()));
- // get and reset value
+ result1.getMetricValues().get(metrics.getHbExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getMasterNoNodeCnt().getName()));
+ Assert.assertEquals(Long.valueOf(30000),
+ result1.getMetricValues().get(metrics.getSyncDataDurMax().getName()));
+ Assert.assertEquals(Long.valueOf(1000),
+ result1.getMetricValues().get(metrics.getSyncDataDurMin().getName()));
+ Assert.assertEquals(Long.valueOf(30000),
+ result1.getMetricValues().get(metrics.getSyncZkDurMax().getName()));
+ Assert.assertEquals(Long.valueOf(100),
+ result1.getMetricValues().get(metrics.getSyncZkDurMin().getName()));
+ // get and reset value 2
final MetricValues result2 = metrics.getAndReSetMetrics();
- metrics.zkExceptionCnt.incrementAndGet();
- metrics.zkExceptionCnt.getAndSet();
- metrics.consumerTmoTotCnt.incrementAndGet();
- metrics.consumerTmoTotCnt.update(10);
- metrics.syncDataDurMax.update(20000);
- metrics.syncDataDurMin.update(2000);
+ // update metric data to 3
+ metrics.getIoExceptionCnt().incrementAndGet();
+ metrics.getZkExceptionCnt().incrementAndGet();
+ metrics.getConsumerOnlineCnt().incrementAndGet();
+ metrics.getConsumerOnlineCnt().decrementAndGet();
+ metrics.getConsumerTmoTotCnt().incrementAndGet();
+ metrics.getHbExceptionCnt().incrementAndGet();
+ metrics.getMasterNoNodeCnt().incrementAndGet();
+
+ metrics.getSyncDataDurMax().update(10);
+ metrics.getSyncDataDurMax().update(10000);
+ metrics.getSyncDataDurMax().update(20000);
+ metrics.getSyncDataDurMin().update(10);
+ metrics.getSyncDataDurMin().update(1000);
+ metrics.getSyncDataDurMin().update(5000);
+
+ metrics.getSyncZkDurMax().update(10);
+ metrics.getSyncZkDurMax().update(1000);
+ metrics.getSyncZkDurMax().update(2000);
+
+ metrics.getSyncZkDurMin().update(3000);
+ metrics.getSyncZkDurMin().update(10);
+ metrics.getSyncZkDurMin().update(6000);
+
MetricValues result3 = metrics.getMetrics();
Assert.assertEquals(result1.getLastResetTime(),
result2.getLastResetTime());
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getIoExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getZkExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getHbExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getMasterNoNodeCnt().getName()));
+ Assert.assertEquals(Long.valueOf(20000),
+ result3.getMetricValues().get(metrics.getSyncDataDurMax().getName()));
+ Assert.assertEquals(Long.valueOf(10),
+ result3.getMetricValues().get(metrics.getSyncDataDurMin().getName()));
Assert.assertEquals(Long.valueOf(2000),
- result3.getMetricValues().get(metrics.syncDataDurMin.getName()));
+ result3.getMetricValues().get(metrics.getSyncZkDurMax().getName()));
+ Assert.assertEquals(Long.valueOf(10),
+ result3.getMetricValues().get(metrics.getSyncZkDurMin().getName()));
+ } catch (Exception ex) {
+ logger.error("error happens" + ex);
+ }
+ }
+
+ @Test
+ public void testBrokerMetricsHolder() {
+ try {
+ // case 1, set data
+ BrokerMetricsHolder.incConsumerCnt();
+ BrokerMetricsHolder.decConsumerCnt(false);
+ BrokerMetricsHolder.incConsumerCnt();
+ BrokerMetricsHolder.decConsumerCnt(true);
+ BrokerMetricsHolder.incConsumerCnt();
+
+ BrokerMetricsHolder.incZKExceptionCnt();
+ BrokerMetricsHolder.incZKExceptionCnt();
+
+ BrokerMetricsHolder.incMasterNoNodeCnt();
+ BrokerMetricsHolder.incHBExceptionCnt();
+ BrokerMetricsHolder.incIOExceptionCnt();
+ BrokerMetricsHolder.incZKExceptionCnt();
+
+ BrokerMetricsHolder.updSyncDataDurations(10000);
+ BrokerMetricsHolder.updSyncDataDurations(2000);
+ BrokerMetricsHolder.updSyncDataDurations(20000);
+
+ BrokerMetricsHolder.updSyncZKDurations(1000);
+ BrokerMetricsHolder.updSyncZKDurations(30);
+ BrokerMetricsHolder.updSyncZKDurations(30000);
+ // get data and check
+ BrokerMetrics metrics = BrokerMetricsHolder.getStatsInfo();
+ MetricValues result1 = metrics.getMetrics();
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(3),
+ result1.getMetricValues().get(metrics.getZkExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getMasterNoNodeCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getHbExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getIoExceptionCnt().getName()));
Assert.assertEquals(Long.valueOf(20000),
- result3.getMetricValues().get(metrics.syncDataDurMax.getName()));
+ result1.getMetricValues().get(metrics.getSyncDataDurMax().getName()));
+ Assert.assertEquals(Long.valueOf(2000),
+ result1.getMetricValues().get(metrics.getSyncDataDurMin().getName()));
+ Assert.assertEquals(Long.valueOf(30000),
+ result1.getMetricValues().get(metrics.getSyncZkDurMax().getName()));
+ Assert.assertEquals(Long.valueOf(30),
+ result1.getMetricValues().get(metrics.getSyncZkDurMin().getName()));
+
+ // get and reset value 2
+ final MetricValues result2 = metrics.getAndReSetMetrics();
+ BrokerMetricsHolder.incConsumerCnt();
+ BrokerMetricsHolder.incConsumerCnt();
+ BrokerMetricsHolder.incConsumerCnt();
+ BrokerMetricsHolder.decConsumerCnt(false);
+ BrokerMetricsHolder.decConsumerCnt(true);
+
+ BrokerMetricsHolder.incZKExceptionCnt();
+ BrokerMetricsHolder.incZKExceptionCnt();
+
+ BrokerMetricsHolder.updSyncDataDurations(1);
+ BrokerMetricsHolder.updSyncDataDurations(5000);
+ BrokerMetricsHolder.updSyncDataDurations(30000);
+
+ BrokerMetricsHolder.updSyncZKDurations(100);
+ BrokerMetricsHolder.updSyncZKDurations(10);
+ BrokerMetricsHolder.updSyncZKDurations(5000);
+ // get and check 3
+ MetricValues result3 = metrics.getMetrics();
+ Assert.assertEquals(result1.getLastResetTime(),
+ result2.getLastResetTime());
+ Assert.assertEquals(Long.valueOf(2),
+ result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result3.getMetricValues().get(metrics.getZkExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result3.getMetricValues().get(metrics.getMasterNoNodeCnt().getName()));
Assert.assertEquals(Long.valueOf(0),
- result3.getMetricValues().get(metrics.zkExceptionCnt.getName()));
+ result3.getMetricValues().get(metrics.getHbExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result3.getMetricValues().get(metrics.getIoExceptionCnt().getName()));
+ Assert.assertEquals(Long.valueOf(30000),
+ result3.getMetricValues().get(metrics.getSyncDataDurMax().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getSyncDataDurMin().getName()));
+ Assert.assertEquals(Long.valueOf(5000),
+ result3.getMetricValues().get(metrics.getSyncZkDurMax().getName()));
Assert.assertEquals(Long.valueOf(10),
- result3.getMetricValues().get(metrics.consumerTmoTotCnt.getName()));
+ result3.getMetricValues().get(metrics.getSyncZkDurMin().getName()));
} catch (Exception ex) {
logger.error("error happens" + ex);
}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java
index 193bebe..a318683 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java
@@ -17,26 +17,471 @@
package org.apache.inlong.tubemq.server.master;
-import org.apache.inlong.commons.config.metrics.MetricValue;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.apache.inlong.tubemq.corebase.metric.MetricValues;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetrics;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetricsHolder;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
public class MasterMetricsTest {
private static final Logger logger = LoggerFactory.getLogger(MasterMetricsTest.class);
@Test
- public void testAgentMetrics() {
+ public void testMasterMetrics() {
+ try {
+ MasterMetrics metrics = new MasterMetrics();
+ // test case 1, set data
+ metrics.getConsumerOnlineCnt().incrementAndGet();
+ metrics.getConsumerOnlineCnt().incrementAndGet();
+ metrics.getConsumerTmoTotCnt().incrementAndGet();
+ metrics.getConsumerTmoTotCnt().incrementAndGet();
+ metrics.getConsumeGroupCnt().incrementAndGet();
+ metrics.getConsumeGroupCnt().incrementAndGet();
+ metrics.getConsumeGroupTmoTotCnt().incrementAndGet();
+ metrics.getCltBalConsumeGroupCnt().incrementAndGet();
+ metrics.getCltBalGroupTmototCnt().incrementAndGet();
+
+ metrics.getProducerOnlineCnt().incrementAndGet();
+ metrics.getProducerOnlineCnt().incrementAndGet();
+ metrics.getProducerTmoTotCnt().incrementAndGet();
+ metrics.getProducerTmoTotCnt().incrementAndGet();
+
+ metrics.getBrokerConfigCnt().incrementAndGet();
+ metrics.getBrokerConfigCnt().incrementAndGet();
+ metrics.getBrokerOnlineCnt().incrementAndGet();
+ metrics.getBrokerOnlineCnt().incrementAndGet();
+ metrics.getBrokerTmoTotCnt().incrementAndGet();
+
+ metrics.getBrokerAbnCurCnt().incrementAndGet();
+ metrics.getBrokerAbnCurCnt().incrementAndGet();
+ metrics.getBrokerAbnTotCnt().incrementAndGet();
+ metrics.getBrokerAbnTotCnt().incrementAndGet();
+ metrics.getBrokerFbdCurCnt().incrementAndGet();
+ metrics.getBrokerFbdCurCnt().incrementAndGet();
+ metrics.getBrokerFbdTotCnt().incrementAndGet();
+ metrics.getBrokerFbdTotCnt().incrementAndGet();
+ metrics.getBrokerFbdTotCnt().incrementAndGet();
+
+ metrics.getSvrBalDuration().update(100);
+ metrics.getSvrBalDuration().update(500);
+ metrics.getSvrBalDuration().update(300);
+
+ metrics.getSvrBalDurationMin().update(700);
+ metrics.getSvrBalDurationMin().update(200);
+ metrics.getSvrBalDurationMin().update(300);
+
+ metrics.getSvrBalDurationMax().update(700);
+ metrics.getSvrBalDurationMax().update(1000);
+ metrics.getSvrBalDurationMax().update(300);
+
+ metrics.getSvrBalResetDurMin().update(700);
+ metrics.getSvrBalResetDurMin().update(200);
+ metrics.getSvrBalResetDurMin().update(300);
+
+ metrics.getSvrBalResetDurMax().update(700);
+ metrics.getSvrBalResetDurMax().update(1000);
+ metrics.getSvrBalResetDurMax().update(300);
+
+ metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+ metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+ metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+
+ metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet();
+ metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet();
+ metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet();
+ // get metric and compare data
+ MetricValues result1 = metrics.getMetrics();
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getConsumeGroupCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getProducerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getProducerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getBrokerConfigCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getBrokerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName()));
+ Assert.assertEquals(Long.valueOf(3),
+ result1.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(300),
+ result1.getMetricValues().get(metrics.getSvrBalDuration().getName()));
+ Assert.assertEquals(Long.valueOf(200),
+ result1.getMetricValues().get(metrics.getSvrBalDurationMin().getName()));
+ Assert.assertEquals(Long.valueOf(1000),
+ result1.getMetricValues().get(metrics.getSvrBalDurationMax().getName()));
+ Assert.assertEquals(Long.valueOf(200),
+ result1.getMetricValues().get(metrics.getSvrBalResetDurMin().getName()));
+ Assert.assertEquals(Long.valueOf(1000),
+ result1.getMetricValues().get(metrics.getSvrBalResetDurMax().getName()));
+
+ Assert.assertEquals(Long.valueOf(3),
+ result1.getMetricValues().get(
+ metrics.getSvrBalConEventConsumerCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(3),
+ result1.getMetricValues().get(
+ metrics.getSvrBalDisConEventConsumerCnt().getName()));
+
+ // get and reset value 2
+ final MetricValues result2 = metrics.getAndReSetMetrics();
+ // update metric data to 3
+ metrics.getConsumerOnlineCnt().incrementAndGet();
+ metrics.getConsumerOnlineCnt().decrementAndGet();
+ metrics.getConsumeGroupCnt().incrementAndGet();
+ metrics.getConsumeGroupTmoTotCnt().incrementAndGet();
+ metrics.getCltBalConsumeGroupCnt().incrementAndGet();
+ metrics.getCltBalGroupTmototCnt().incrementAndGet();
+
+ metrics.getProducerOnlineCnt().incrementAndGet();
+ metrics.getProducerOnlineCnt().incrementAndGet();
+
+ metrics.getBrokerConfigCnt().incrementAndGet();
+ metrics.getBrokerConfigCnt().incrementAndGet();
+ metrics.getBrokerOnlineCnt().decrementAndGet();
+ metrics.getBrokerOnlineCnt().decrementAndGet();
+ metrics.getBrokerTmoTotCnt().incrementAndGet();
+
+ metrics.getBrokerAbnCurCnt().incrementAndGet();
+ metrics.getBrokerAbnCurCnt().incrementAndGet();
+ metrics.getBrokerFbdCurCnt().incrementAndGet();
+ metrics.getBrokerFbdCurCnt().incrementAndGet();
+ metrics.getBrokerFbdTotCnt().incrementAndGet();
+
+ metrics.getSvrBalDuration().update(100);
+ metrics.getSvrBalDuration().update(700);
+ metrics.getSvrBalDuration().update(20);
+
+ metrics.getSvrBalDurationMin().update(1000);
+ metrics.getSvrBalDurationMin().update(50);
+ metrics.getSvrBalDurationMin().update(3000);
+
+ metrics.getSvrBalDurationMax().update(700);
+ metrics.getSvrBalDurationMax().update(800);
+ metrics.getSvrBalDurationMax().update(300);
+
+ metrics.getSvrBalResetDurMin().update(700);
+ metrics.getSvrBalResetDurMin().update(10);
+ metrics.getSvrBalResetDurMin().update(300);
+
+ metrics.getSvrBalResetDurMax().update(700);
+ metrics.getSvrBalResetDurMax().update(2000);
+ metrics.getSvrBalResetDurMax().update(300);
+
+ metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+ metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+ metrics.getSvrBalConEventConsumerCnt().incrementAndGet();
+
+ metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet();
+ metrics.getSvrBalDisConEventConsumerCnt().incrementAndGet();
+
+ // get metric and compare data
+ MetricValues result3 = metrics.getMetrics();
+ Assert.assertEquals(result1.getLastResetTime(),
+ result2.getLastResetTime());
+ Assert.assertEquals(Long.valueOf(2),
+ result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(3),
+ result3.getMetricValues().get(metrics.getConsumeGroupCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result3.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(4),
+ result3.getMetricValues().get(metrics.getProducerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result3.getMetricValues().get(metrics.getProducerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(4),
+ result3.getMetricValues().get(metrics.getBrokerConfigCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result3.getMetricValues().get(metrics.getBrokerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(4),
+ result3.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result3.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(4),
+ result3.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName()));
+ Assert.assertEquals(Long.valueOf(3),
+ result3.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(20),
+ result3.getMetricValues().get(metrics.getSvrBalDuration().getName()));
+ Assert.assertEquals(Long.valueOf(50),
+ result3.getMetricValues().get(metrics.getSvrBalDurationMin().getName()));
+ Assert.assertEquals(Long.valueOf(800),
+ result3.getMetricValues().get(metrics.getSvrBalDurationMax().getName()));
+ Assert.assertEquals(Long.valueOf(10),
+ result3.getMetricValues().get(metrics.getSvrBalResetDurMin().getName()));
+ Assert.assertEquals(Long.valueOf(2000),
+ result3.getMetricValues().get(metrics.getSvrBalResetDurMax().getName()));
+
+ Assert.assertEquals(Long.valueOf(6),
+ result3.getMetricValues().get(
+ metrics.getSvrBalConEventConsumerCnt().getName()));
+ Assert.assertEquals(Long.valueOf(5),
+ result3.getMetricValues().get(
+ metrics.getSvrBalDisConEventConsumerCnt().getName()));
+
+ } catch (Exception ex) {
+ logger.error("error happens" + ex);
+ }
+ }
+
+ @Test
+ public void testMasterMetricsHolder() {
try {
- MasterMetric taskMetrics = MasterMetric.create();
- taskMetrics.svrBalLatency.incrementAndGet();
- Map<String, MetricValue> result = taskMetrics.snapshot();
- Assert.assertEquals(1, taskMetrics.svrBalLatency.get());
+ // test case 1, set data
+ // add 12 consumer, 8 group, 4 client balance
+ MasterMetricsHolder.incConsumerCnt(false, false);
+ MasterMetricsHolder.incConsumerCnt(false, true);
+ MasterMetricsHolder.incConsumerCnt(false, false);
+ MasterMetricsHolder.incConsumerCnt(false, true);
+ MasterMetricsHolder.incConsumerCnt(true, false);
+ MasterMetricsHolder.incConsumerCnt(true, false);
+ MasterMetricsHolder.incConsumerCnt(true, true);
+ MasterMetricsHolder.incConsumerCnt(true, true);
+ MasterMetricsHolder.incConsumerCnt(true, false);
+ MasterMetricsHolder.incConsumerCnt(true, false);
+ MasterMetricsHolder.incConsumerCnt(true, true);
+ MasterMetricsHolder.incConsumerCnt(true, true);
+ // dec 8 consumer, add 4 timeout consumer,
+ // dec 4 group, add 2 timeout group, dec 2 client balance group
+ MasterMetricsHolder.decConsumerCnt(false, false, false);
+ MasterMetricsHolder.decConsumerCnt(false, false, true);
+ MasterMetricsHolder.decConsumerCnt(false, true, false);
+ MasterMetricsHolder.decConsumerCnt(false, true, true);
+ MasterMetricsHolder.decConsumerCnt(true, false, false);
+ MasterMetricsHolder.decConsumerCnt(true, false, true);
+ MasterMetricsHolder.decConsumerCnt(true, true, false);
+ MasterMetricsHolder.decConsumerCnt(true, true, true);
+ // dec 4 group, add 2 timeout group, dec 2 client balance group
+ MasterMetricsHolder.decConsumeGroupCnt(false, false);
+ MasterMetricsHolder.decConsumeGroupCnt(false, true);
+ MasterMetricsHolder.decConsumeGroupCnt(true, false);
+ MasterMetricsHolder.decConsumeGroupCnt(true, true);
+ // add 3 producer
+ // dec 3 producer, 2 timeout producer
+ MasterMetricsHolder.incProducerCnt();
+ MasterMetricsHolder.incProducerCnt();
+ MasterMetricsHolder.incProducerCnt();
+ MasterMetricsHolder.decProducerCnt(false);
+ MasterMetricsHolder.decProducerCnt(true);
+ MasterMetricsHolder.decProducerCnt(true);
+ // add 3 disconcnt
+ // dec 2 disconcnt
+ MasterMetricsHolder.incSvrBalDisConConsumerCnt();
+ MasterMetricsHolder.incSvrBalDisConConsumerCnt();
+ MasterMetricsHolder.incSvrBalDisConConsumerCnt();
+ MasterMetricsHolder.decSvrBalDisConConsumerCnt();
+ MasterMetricsHolder.decSvrBalDisConConsumerCnt();
+ // add 3 concnt
+ // dec 2 concnt
+ MasterMetricsHolder.incSvrBalConEventConsumerCnt();
+ MasterMetricsHolder.incSvrBalConEventConsumerCnt();
+ MasterMetricsHolder.incSvrBalConEventConsumerCnt();
+ MasterMetricsHolder.decSvrBalConEventConsumerCnt();
+ MasterMetricsHolder.decSvrBalConEventConsumerCnt();
+ // add 3 broker configure count
+ // dec 2 broker configure count
+ MasterMetricsHolder.incBrokerConfigCnt();
+ MasterMetricsHolder.incBrokerConfigCnt();
+ MasterMetricsHolder.incBrokerConfigCnt();
+ MasterMetricsHolder.decBrokerConfigCnt();
+ MasterMetricsHolder.decBrokerConfigCnt();
+ // add 3 broker online count
+ // dec 2 broker online count, 1 timeout count
+ MasterMetricsHolder.incBrokerOnlineCnt();
+ MasterMetricsHolder.incBrokerOnlineCnt();
+ MasterMetricsHolder.incBrokerOnlineCnt();
+ MasterMetricsHolder.decBrokerOnlineCnt(false);
+ MasterMetricsHolder.decBrokerOnlineCnt(true);
+ // add 3 broker abnormal count, 3 total abnormal count
+ // dec 1 broker abnormal count
+ MasterMetricsHolder.incBrokerAbnormalCnt();
+ MasterMetricsHolder.decBrokerAbnormalCnt();
+ MasterMetricsHolder.incBrokerAbnormalCnt();
+ MasterMetricsHolder.incBrokerAbnormalCnt();
+ // add 4 broker forbidden count, 4 total forbidden count
+ // dec 1 broker forbidden count
+ MasterMetricsHolder.incBrokerForbiddenCnt();
+ MasterMetricsHolder.decBrokerForbiddenCnt();
+ MasterMetricsHolder.incBrokerForbiddenCnt();
+ MasterMetricsHolder.incBrokerForbiddenCnt();
+ MasterMetricsHolder.incBrokerForbiddenCnt();
+ // max: 1000, min 100
+ MasterMetricsHolder.updSvrBalanceDurations(300);
+ MasterMetricsHolder.updSvrBalanceDurations(500);
+ MasterMetricsHolder.updSvrBalanceDurations(100);
+ MasterMetricsHolder.updSvrBalanceDurations(1000);
+ // max: 5000, min 500
+ MasterMetricsHolder.updSvrBalResetDurations(3000);
+ MasterMetricsHolder.updSvrBalResetDurations(500);
+ MasterMetricsHolder.updSvrBalResetDurations(1000);
+ MasterMetricsHolder.updSvrBalResetDurations(5000);
+ // get metric and compare data
+ MasterMetrics metrics = MasterMetricsHolder.getStatsInfo();
+ MetricValues result1 = metrics.getMetrics();
+ Assert.assertEquals(Long.valueOf(4),
+ result1.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(4),
+ result1.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result1.getMetricValues().get(metrics.getConsumeGroupCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result1.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(0),
+ result1.getMetricValues().get(metrics.getProducerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getProducerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getBrokerConfigCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getBrokerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(2),
+ result1.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName()));
+ Assert.assertEquals(Long.valueOf(3),
+ result1.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(3),
+ result1.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName()));
+ Assert.assertEquals(Long.valueOf(4),
+ result1.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(1000),
+ result1.getMetricValues().get(metrics.getSvrBalDuration().getName()));
+ Assert.assertEquals(Long.valueOf(100),
+ result1.getMetricValues().get(metrics.getSvrBalDurationMin().getName()));
+ Assert.assertEquals(Long.valueOf(1000),
+ result1.getMetricValues().get(metrics.getSvrBalDurationMax().getName()));
+ Assert.assertEquals(Long.valueOf(500),
+ result1.getMetricValues().get(metrics.getSvrBalResetDurMin().getName()));
+ Assert.assertEquals(Long.valueOf(5000),
+ result1.getMetricValues().get(metrics.getSvrBalResetDurMax().getName()));
+
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(
+ metrics.getSvrBalConEventConsumerCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result1.getMetricValues().get(
+ metrics.getSvrBalDisConEventConsumerCnt().getName()));
+
+ // get and reset value 2
+ final MetricValues result2 = metrics.getAndReSetMetrics();
+ // update metric data to 3
+ // test case 3, set data
+ // add 3 consumer, 2 group, 1 client balance
+ MasterMetricsHolder.incConsumerCnt(false, false);
+ MasterMetricsHolder.incConsumerCnt(true, false);
+ MasterMetricsHolder.incConsumerCnt(true, true);
+ // dec 2 consumer, add 1 timeout consumer,
+ // dec 1 group, add 1 timeout group
+ MasterMetricsHolder.decConsumerCnt(true, true, true);
+ MasterMetricsHolder.decConsumerCnt(false, false, true);
+ // dec 1 group, add 1 timeout group
+ MasterMetricsHolder.decConsumeGroupCnt(true, false);
+ // add 2 producer
+ // dec 1 producer
+ MasterMetricsHolder.incProducerCnt();
+ MasterMetricsHolder.incProducerCnt();
+ MasterMetricsHolder.decProducerCnt(false);
+ // add 1 abnormal ,dec 1 abnormal
+ MasterMetricsHolder.incBrokerAbnormalCnt();
+ MasterMetricsHolder.decBrokerAbnormalCnt();
+
+ // max: 1000, min 100
+ MasterMetricsHolder.updSvrBalanceDurations(5000);
+ MasterMetricsHolder.updSvrBalanceDurations(500);
+ MasterMetricsHolder.updSvrBalanceDurations(100);
+ MasterMetricsHolder.updSvrBalanceDurations(8000);
+ // max: 5000, min 500
+ MasterMetricsHolder.updSvrBalResetDurations(2000);
+ MasterMetricsHolder.updSvrBalResetDurations(100);
+ MasterMetricsHolder.updSvrBalResetDurations(1000);
+ MasterMetricsHolder.updSvrBalResetDurations(4000);
+
+ // get metric and compare data
+ MetricValues result3 = metrics.getMetrics();
+ Assert.assertEquals(result1.getLastResetTime(),
+ result2.getLastResetTime());
+ Assert.assertEquals(Long.valueOf(5),
+ result3.getMetricValues().get(metrics.getConsumerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getConsumerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result3.getMetricValues().get(metrics.getConsumeGroupCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result3.getMetricValues().get(metrics.getCltBalConsumeGroupCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getCltBalGroupTmototCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getProducerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result3.getMetricValues().get(metrics.getProducerTmoTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getBrokerConfigCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(metrics.getBrokerOnlineCnt().getName()));
+ Assert.assertEquals(Long.valueOf(0),
+ result3.getMetricValues().get(metrics.getBrokerTmoTotCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(2),
+ result3.getMetricValues().get(metrics.getBrokerAbnCurCnt().getName()));
+ Assert.assertEquals(Long.valueOf(3),
+ result3.getMetricValues().get(metrics.getBrokerAbnTotCnt().getName()));
+ Assert.assertEquals(Long.valueOf(3),
+ result3.getMetricValues().get(metrics.getBrokerFbdCurCnt().getName()));
+ Assert.assertEquals(Long.valueOf(3),
+ result3.getMetricValues().get(metrics.getBrokerFbdTotCnt().getName()));
+
+ Assert.assertEquals(Long.valueOf(8000),
+ result3.getMetricValues().get(metrics.getSvrBalDuration().getName()));
+ Assert.assertEquals(Long.valueOf(100),
+ result3.getMetricValues().get(metrics.getSvrBalDurationMin().getName()));
+ Assert.assertEquals(Long.valueOf(8000),
+ result3.getMetricValues().get(metrics.getSvrBalDurationMax().getName()));
+ Assert.assertEquals(Long.valueOf(100),
+ result3.getMetricValues().get(metrics.getSvrBalResetDurMin().getName()));
+ Assert.assertEquals(Long.valueOf(4000),
+ result3.getMetricValues().get(metrics.getSvrBalResetDurMax().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(
+ metrics.getSvrBalConEventConsumerCnt().getName()));
+ Assert.assertEquals(Long.valueOf(1),
+ result3.getMetricValues().get(
+ metrics.getSvrBalDisConEventConsumerCnt().getName()));
} catch (Exception ex) {
logger.error("error happens" + ex);
}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
index 8d93ed0..24d3777 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
@@ -19,7 +19,6 @@ package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
import static org.mockito.Mockito.mock;
import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
-import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -28,14 +27,11 @@ import org.junit.Test;
public class ConsumerEventManagerTest {
private ConsumerEventManager consumerEventManager;
private ConsumerInfoHolder consumerInfoHolder;
- private MasterMetric masterMetrics;
@Before
public void setUp() throws Exception {
consumerInfoHolder = mock(ConsumerInfoHolder.class);
- masterMetrics = MasterMetric.create();
- consumerEventManager =
- new ConsumerEventManager(consumerInfoHolder, masterMetrics);
+ consumerEventManager = new ConsumerEventManager(consumerInfoHolder);
}
@After