You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2021/12/02 02:42:15 UTC
[incubator-inlong] branch master updated: [INLONG-1852]The Master of TubeMQ supports monitoring indicators with JMX. (#1870)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 43363b9 [INLONG-1852]The Master of TubeMQ supports monitoring indicators with JMX. (#1870)
43363b9 is described below
commit 43363b95fc94762474b0cc02cf40973b747af99b
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Dec 2 10:42:07 2021 +0800
[INLONG-1852]The Master of TubeMQ supports monitoring indicators with JMX. (#1870)
---
inlong-tubemq/pom.xml | 5 +
inlong-tubemq/tubemq-server/pom.xml | 4 +
.../inlong/tubemq/server/master/TMaster.java | 72 +++++++++++---
.../server/master/balance/DefaultLoadBalancer.java | 16 ++-
.../tubemq/server/master/metrics/MasterMetric.java | 108 +++++++++++++++++++++
.../nodemanage/nodebroker/BrokerAbnHolder.java | 25 ++++-
.../nodemanage/nodebroker/DefBrokerRunManager.java | 20 +++-
.../nodemanage/nodeconsumer/ConsumeGroupInfo.java | 8 +-
.../nodeconsumer/ConsumerEventManager.java | 33 +++++--
.../nodeconsumer/ConsumerInfoHolder.java | 14 ++-
.../nodeproducer/ProducerInfoHolder.java | 9 +-
.../tubemq/server/master/MasterMetricsTest.java | 44 +++++++++
.../nodeconsumer/ConsumerEventManagerTest.java | 6 +-
13 files changed, 321 insertions(+), 43 deletions(-)
diff --git a/inlong-tubemq/pom.xml b/inlong-tubemq/pom.xml
index 6b66310..315ae70 100644
--- a/inlong-tubemq/pom.xml
+++ b/inlong-tubemq/pom.xml
@@ -187,6 +187,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
diff --git a/inlong-tubemq/tubemq-server/pom.xml b/inlong-tubemq/tubemq-server/pom.xml
index 1a463d9..dfa48cd 100644
--- a/inlong-tubemq/tubemq-server/pom.xml
+++ b/inlong-tubemq/tubemq-server/pom.xml
@@ -129,6 +129,10 @@
<artifactId>tubemq-example</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-common</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index f323287..189518b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -110,6 +110,7 @@ import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.Br
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager;
@@ -163,6 +164,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
private AtomicInteger curCltBalanceParal = new AtomicInteger(0);
private Sleeper stopSleeper = new Sleeper(1000, this);
private SimpleVisitTokenManager visitTokenManager;
+ private final MasterMetric masterMetrics;
/**
* constructor
@@ -177,6 +179,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.checkAndCreateBdbDataPath();
this.masterAddInfo =
new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort());
+ this.masterMetrics = MasterMetric.create();
this.svrExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
this.cltExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig);
@@ -185,17 +188,17 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
false, TBaseConstants.META_VALUE_UNDEFINED);
this.producerHolder = new ProducerInfoHolder();
- this.consumerHolder = new ConsumerInfoHolder(this.masterConfig);
- this.consumerEventManager = new ConsumerEventManager(consumerHolder);
+ this.consumerHolder = new ConsumerInfoHolder(this);
+ this.consumerEventManager = new ConsumerEventManager(consumerHolder, masterMetrics);
this.topicPSInfoManager = new TopicPSInfoManager(this);
- this.loadBalancer = new DefaultLoadBalancer();
+ this.loadBalancer = new DefaultLoadBalancer(masterMetrics);
heartbeatManager.regConsumerCheckBusiness(masterConfig.getConsumerHeartbeatTimeoutMs(),
new TimeoutListener() {
@Override
public void onTimeout(String nodeId, TimeoutInfo nodeInfo) {
logger.info(new StringBuilder(512).append("[Consumer Timeout] ")
.append(nodeId).toString());
- new ReleaseConsumer().run(nodeId);
+ new ReleaseConsumer().run(nodeId, true);
}
});
heartbeatManager.regProducerCheckBusiness(masterConfig.getProducerHeartbeatTimeoutMs(),
@@ -204,7 +207,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
public void onTimeout(final String nodeId, TimeoutInfo nodeInfo) {
logger.info(new StringBuilder(512).append("[Producer Timeout] ")
.append(nodeId).toString());
- new ReleaseProducer().run(nodeId);
+ new ReleaseProducer().run(nodeId, true);
}
});
this.defMetaDataManager = new MetaDataManager(this);
@@ -280,6 +283,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return brokerRunManager;
}
+ public MasterMetric getMasterMetrics() {
+ return masterMetrics;
+ }
+
/**
* Producer register request to master
*
@@ -343,8 +350,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : "";
heartbeatManager.regProducerNode(producerId);
- producerHolder.setProducerInfo(producerId,
- new HashSet<>(transTopicSet), hostName, overtls);
+ if (producerHolder.setProducerInfo(producerId,
+ new HashSet<>(transTopicSet), hostName, overtls)) {
+ masterMetrics.producerCnt.incrementAndGet();
+ }
Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
brokerRunManager.getBrokerStaticInfo(overtls);
builder.setBrokerCheckSum(brokerStaticInfo.getF0());
@@ -493,7 +502,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
final String producerId = (String) paramCheckResult.checkData;
checkNodeStatus(producerId, strBuffer);
- new ReleaseProducer().run(producerId);
+ new ReleaseProducer().run(producerId, false);
heartbeatManager.unRegProducerNode(producerId);
logger.info(strBuffer.append("[Producer Closed] ")
.append(producerId).append(", isOverTLS=").append(overtls).toString());
@@ -913,7 +922,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
String nodeId = getConsumerKey(groupName, clientId);
logger.info(strBuffer.append("[Consumer Closed]").append(nodeId)
.append(", isOverTLS=").append(overtls).toString());
- new ReleaseConsumer().run(nodeId);
+ new ReleaseConsumer().run(nodeId, false);
heartbeatManager.unRegConsumerNode(nodeId);
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -1679,6 +1688,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
int startIndex = 0;
int endIndex = 0;
// set parallel balance signal
+ final long startBalanceTime = System.currentTimeMillis();
curSvrBalanceParal.set(masterConfig.getRebalanceParallel());
for (int i = 0; i < masterConfig.getRebalanceParallel(); i++) {
// get groups need to balance
@@ -1686,7 +1696,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
endIndex = Math.min((i + 1) * unitNum, balanceTaskCnt);
final List<String> subGroups = groupsNeedToBalance.subList(startIndex, endIndex);
if (subGroups.isEmpty()) {
- curSvrBalanceParal.decrementAndGet();
+ if (curSvrBalanceParal.decrementAndGet() == 0) {
+ long durTime = System.currentTimeMillis() - startBalanceTime;
+ masterMetrics.svrBalLatency.set(durTime);
+ if (durTime > masterMetrics.svrBalLatencyMax.get()) {
+ masterMetrics.svrBalLatencyMax.set(durTime);
+ }
+ }
continue;
}
// execute balance
@@ -1725,7 +1741,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
} catch (Throwable e) {
logger.warn("[Svr-Balance processor] Error during process", e);
} finally {
- curSvrBalanceParal.decrementAndGet();
+ if (curSvrBalanceParal.decrementAndGet() == 0) {
+ long durTime = System.currentTimeMillis() - startBalanceTime;
+ masterMetrics.svrBalLatency.set(durTime);
+ if (durTime > masterMetrics.svrBalLatencyMax.get()) {
+ masterMetrics.svrBalLatencyMax.set(durTime);
+ }
+ }
}
}
});
@@ -2535,12 +2557,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
private abstract static class AbstractReleaseRunner {
- abstract void run(String arg);
+ abstract void run(String arg, boolean isTimeout);
}
private class ReleaseConsumer extends AbstractReleaseRunner {
@Override
- void run(String arg) {
+ void run(String arg, boolean isTimeout) {
String[] nodeStrs = arg.split(TokenConstants.GROUP_SEP);
String consumerId = nodeStrs[0];
String group = nodeStrs[1];
@@ -2551,8 +2573,22 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
ConsumerInfo info = consumerHolder.removeConsumer(group, consumerId);
currentSubInfo.remove(consumerId);
consumerEventManager.removeAll(consumerId);
- if (info != null && consumerHolder.isConsumeGroupEmpty(group)) {
- topicPSInfoManager.rmvGroupSubTopicInfo(group, info.getTopicSet());
+ if (info != null) {
+ if (consumerHolder.isConsumeGroupEmpty(group)) {
+ topicPSInfoManager.rmvGroupSubTopicInfo(group, info.getTopicSet());
+ // metric
+ masterMetrics.consumeGroupCnt.decrementAndGet();
+ if (info.getConsumeType() == ConsumeType.CONSUME_CLIENT_REB) {
+ masterMetrics.cltBalConsumeGroupCnt.decrementAndGet();
+ }
+ if (isTimeout) {
+ masterMetrics.consumeGroupTmoTotCnt.incrementAndGet();
+ }
+ }
+ masterMetrics.consumerCnt.decrementAndGet();
+ if (isTimeout) {
+ masterMetrics.consumerTmoTotCnt.incrementAndGet();
+ }
}
} catch (IOException e) {
logger.warn("Failed to lock.", e);
@@ -2566,11 +2602,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
private class ReleaseProducer extends AbstractReleaseRunner {
@Override
- void run(String clientId) {
+ void run(String clientId, boolean isTimeout) {
if (clientId != null) {
ProducerInfo info = producerHolder.removeProducer(clientId);
if (info != null) {
topicPSInfoManager.rmvProducerTopicPubInfo(clientId, info.getTopicSet());
+ masterMetrics.producerCnt.decrementAndGet();
+ if (isTimeout) {
+ masterMetrics.producerTmoTotCnt.incrementAndGet();
+ }
}
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
index 39e4830..501f45e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
@@ -34,6 +34,7 @@ import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
@@ -47,9 +48,10 @@ import org.slf4j.LoggerFactory;
public class DefaultLoadBalancer implements LoadBalancer {
private static final Logger logger = LoggerFactory.getLogger(LoadBalancer.class);
private static final Random RANDOM = new Random(System.currentTimeMillis());
+ private final MasterMetric masterMetrics;
- public DefaultLoadBalancer() {
-
+ public DefaultLoadBalancer(MasterMetric masterMetrics) {
+ this.masterMetrics = masterMetrics;
}
/**
@@ -660,7 +662,15 @@ public class DefaultLoadBalancer implements LoadBalancer {
partitionMap.remove(entry.getKey());
}
}
- consumeGroupInfo.addAllocatedTimes();
+ if (consumeGroupInfo.addAllocatedTimes() > 0) {
+ long durTime = System.currentTimeMillis() - consumeGroupInfo.getCreateTime();
+ if (durTime < masterMetrics.svrBalResetDurMin.get()) {
+ masterMetrics.svrBalResetDurMin.set(durTime);
+ }
+ if (durTime > masterMetrics.svrBalResetDurMax.get()) {
+ masterMetrics.svrBalResetDurMax.set(durTime);
+ }
+ }
}
return finalSubInfoMap;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java
new file mode 100644
index 0000000..20f93ef
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metrics/MasterMetric.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.metrics;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.commons.config.metrics.CountMetric;
+import org.apache.inlong.commons.config.metrics.Dimension;
+import org.apache.inlong.commons.config.metrics.GaugeMetric;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+
+@MetricDomain(name = "master_metrics")
+public class MasterMetric extends MetricItem {
+
+ private static final MasterMetric MASTER_METRICS = new MasterMetric();
+ private static final AtomicBoolean REGISTER_ONCE =
+ new AtomicBoolean(false);
+ private static final String METRIC_NAME = "master_metrics";
+
+ @Dimension
+ public String tagName;
+
+ @GaugeMetric
+ public AtomicLong consumeGroupCnt = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong cltBalConsumeGroupCnt = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong consumeGroupTmoTotCnt = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong consumerCnt = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong consumerTmoTotCnt = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong producerCnt = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong producerTmoTotCnt = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong brokerConfigCnt = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong brokerOnlineCnt = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong brokerAbnTotCnt = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong brokerAbnCurCnt = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong brokerFbdTotCnt = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong brokerFbdCurCnt = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong brokerTmoTotCnt = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong svrBalLatency = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong svrBalLatencyMax = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong svrBalResetDurMin = new AtomicLong(Long.MAX_VALUE);
+
+ @GaugeMetric
+ public AtomicLong svrBalResetDurMax = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong svrBalConEventConsumerCnt = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong svrBalDisConEventConsumerCnt = new AtomicLong(0);
+
+ public static MasterMetric create() {
+ if (REGISTER_ONCE.compareAndSet(false, true)) {
+ MASTER_METRICS.tagName = METRIC_NAME;
+ MetricRegister.register(MASTER_METRICS);
+ }
+ return MASTER_METRICS;
+ }
+}
+
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 666f840..2987225 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -33,6 +33,7 @@ import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,11 +51,15 @@ public class BrokerAbnHolder {
private final MetaDataManager metaDataManager;
private final AtomicInteger brokerForbiddenCount =
new AtomicInteger(0);
+ // master metrics
+ private final MasterMetric masterMetrics;
public BrokerAbnHolder(final int maxAutoForbiddenCnt,
- final MetaDataManager metaDataManager) {
+ final MetaDataManager metaDataManager,
+ final MasterMetric masterMetrics) {
this.maxAutoForbiddenCnt = maxAutoForbiddenCnt;
this.metaDataManager = metaDataManager;
+ this.masterMetrics = masterMetrics;
}
/**
@@ -74,6 +79,7 @@ public class BrokerAbnHolder {
if (brokerForbiddenMap.get(brokerId) == null) {
brokerAbnInfo = brokerAbnormalMap.remove(brokerId);
if (brokerAbnInfo != null) {
+ masterMetrics.brokerAbnCurCnt.decrementAndGet();
logger.warn(sBuffer.append("[Broker AutoForbidden] broker ")
.append(brokerId).append(" return to normal!").toString());
sBuffer.delete(0, sBuffer.length());
@@ -98,6 +104,8 @@ public class BrokerAbnHolder {
if (brokerAbnInfo == null) {
if (brokerAbnormalMap.putIfAbsent(brokerId,
new BrokerAbnInfo(brokerId, reportReadStatus, reportWriteStatus)) == null) {
+ masterMetrics.brokerAbnTotCnt.incrementAndGet();
+ masterMetrics.brokerAbnCurCnt.incrementAndGet();
logger.warn(sBuffer.append("[Broker AutoForbidden] broker report abnormal, ")
.append(brokerId).append("'s reportReadStatus=")
.append(reportReadStatus).append(", reportWriteStatus=")
@@ -116,6 +124,8 @@ public class BrokerAbnHolder {
if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) {
if (brokerForbiddenMap.putIfAbsent(brokerId, tmpBrokerFbdInfo) == null) {
brokerForbiddenCount.incrementAndGet();
+ masterMetrics.brokerFbdTotCnt.incrementAndGet();
+ masterMetrics.brokerFbdCurCnt.incrementAndGet();
logger.warn(sBuffer
.append("[Broker AutoForbidden] master add missing forbidden broker, ")
.append(brokerId).append("'s manage status to ")
@@ -133,6 +143,7 @@ public class BrokerAbnHolder {
brokerForbiddenCount.decrementAndGet();
return;
}
+ masterMetrics.brokerFbdCurCnt.incrementAndGet();
logger.warn(sBuffer
.append("[Broker AutoForbidden] master auto forbidden broker, ")
.append(brokerId).append("'s manage status to ")
@@ -172,10 +183,14 @@ public class BrokerAbnHolder {
* @param brokerId the deleted broker id
*/
public void removeBroker(Integer brokerId) {
- brokerAbnormalMap.remove(brokerId);
+ BrokerAbnInfo abnInfo = brokerAbnormalMap.remove(brokerId);
+ if (abnInfo != null) {
+ masterMetrics.brokerAbnCurCnt.decrementAndGet();
+ }
BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.remove(brokerId);
if (brokerFbdInfo != null) {
this.brokerForbiddenCount.decrementAndGet();
+ masterMetrics.brokerFbdCurCnt.decrementAndGet();
}
}
@@ -258,8 +273,12 @@ public class BrokerAbnHolder {
BrokerFbdInfo fbdInfo = this.brokerForbiddenMap.remove(brokerId);
if (fbdInfo != null) {
brokerFbdInfos.add(fbdInfo);
- this.brokerAbnormalMap.remove(brokerId);
+ BrokerAbnInfo abnInfo = this.brokerAbnormalMap.remove(brokerId);
+ if (abnInfo != null) {
+ masterMetrics.brokerAbnCurCnt.decrementAndGet();
+ }
this.brokerForbiddenCount.decrementAndGet();
+ masterMetrics.brokerFbdCurCnt.decrementAndGet();
}
}
if (!brokerFbdInfos.isEmpty()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index 5775029..e06fce4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -45,6 +45,7 @@ import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,6 +76,8 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
private final BrokerAbnHolder brokerAbnHolder;
// broker topic configure for consumer and producer
private final BrokerPSInfoHolder brokerPubSubInfo = new BrokerPSInfoHolder();
+ // master metrics
+ private final MasterMetric masterMetrics;
/**
* Constructor by TMaster
@@ -82,12 +85,13 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
* @param tMaster the initial TMaster object
*/
public DefBrokerRunManager(TMaster tMaster) {
+ this.masterMetrics = tMaster.getMasterMetrics();
this.metaDataManager = tMaster.getDefMetaDataManager();
this.heartbeatManager = tMaster.getHeartbeatManager();
MasterConfig masterConfig = tMaster.getMasterConfig();
this.brokerAbnHolder =
new BrokerAbnHolder(masterConfig.getMaxAutoForbiddenCnt(),
- this.metaDataManager);
+ this.metaDataManager, this.masterMetrics);
heartbeatManager.regBrokerCheckBusiness(masterConfig.getBrokerHeartbeatTimeoutMs(),
new TimeoutListener() {
@Override
@@ -145,9 +149,12 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
|| brokerTLSReg == null
|| !brokerReg.equals(entity.getSimpleBrokerInfo())
|| !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
- if (brokerReg != null
- && !brokerReg.equals(entity.getSimpleBrokerInfo())) {
- this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo());
+ if (brokerReg == null) {
+ masterMetrics.brokerConfigCnt.incrementAndGet();
+ } else {
+ if (!brokerReg.equals(entity.getSimpleBrokerInfo())) {
+ this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo());
+ }
}
if (brokerTLSReg != null
&& !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
@@ -241,6 +248,7 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
brokerInfo.getBrokerId(), tmpRunStatusInfo);
if (runStatusInfo == null) {
brokerTotalCount.incrementAndGet();
+ masterMetrics.brokerOnlineCnt.incrementAndGet();
runStatusInfo = tmpRunStatusInfo;
}
} else {
@@ -471,6 +479,10 @@ public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
if (runStatusInfo == null) {
return false;
}
+ masterMetrics.brokerOnlineCnt.decrementAndGet();
+ if (isTimeout) {
+ masterMetrics.brokerTmoTotCnt.incrementAndGet();
+ }
brokerTotalCount.decrementAndGet();
brokerAbnHolder.removeBroker(brokerId);
brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
index a13c8b2..f3d0ac3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -131,7 +131,7 @@ public class ConsumeGroupInfo {
consumerInfoMap.get(inConsumer.getConsumerId());
if (curConsumerInfo != null) {
curConsumerInfo.updCurConsumerInfo(inConsumer);
- result.setCheckData("Ok!");
+ result.setCheckData(false);
return true;
}
}
@@ -139,7 +139,7 @@ public class ConsumeGroupInfo {
if (consumeType == ConsumeType.CONSUME_BAND) {
bookPartitionInfo(inConsumer);
}
- result.setCheckData("Ok!");
+ result.setCheckData(true);
return true;
} finally {
csmInfoRWLock.writeLock().unlock();
@@ -349,8 +349,8 @@ public class ConsumeGroupInfo {
return allocatedTimes.get();
}
- public void addAllocatedTimes() {
- this.allocatedTimes.incrementAndGet();
+ public int addAllocatedTimes() {
+ return this.allocatedTimes.incrementAndGet();
}
public boolean isGroupEmpty() {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
index e3b0046..f4fa335 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,9 +42,12 @@ public class ConsumerEventManager {
new ConcurrentHashMap<>();
private final ConsumerInfoHolder consumerHolder;
+ private final MasterMetric masterMetrics;
- public ConsumerEventManager(ConsumerInfoHolder consumerHolder) {
+ public ConsumerEventManager(ConsumerInfoHolder consumerHolder,
+ MasterMetric masterMetrics) {
this.consumerHolder = consumerHolder;
+ this.masterMetrics = masterMetrics;
}
public boolean addDisconnectEvent(String consumerId,
@@ -54,7 +58,9 @@ public class ConsumerEventManager {
eventList = new LinkedList<>();
LinkedList<ConsumerEvent> tmptList =
disconnectEventMap.putIfAbsent(consumerId, eventList);
- if (tmptList != null) {
+ if (tmptList == null) {
+ masterMetrics.svrBalDisConEventConsumerCnt.incrementAndGet();
+ } else {
eventList = tmptList;
}
}
@@ -71,7 +77,9 @@ public class ConsumerEventManager {
eventList = new LinkedList<>();
LinkedList<ConsumerEvent> tmptList =
connectEventMap.putIfAbsent(consumerId, eventList);
- if (tmptList != null) {
+ if (tmptList == null) {
+ masterMetrics.svrBalConEventConsumerCnt.incrementAndGet();
+ } else {
eventList = tmptList;
}
}
@@ -119,8 +127,9 @@ public class ConsumerEventManager {
public ConsumerEvent removeFirst(String consumerId) {
ConsumerEvent event = null;
String group = consumerHolder.getGroupName(consumerId);
+ boolean selDisConnMap = hasDisconnectEvent(group);
ConcurrentHashMap<String, LinkedList<ConsumerEvent>> currentEventMap =
- hasDisconnectEvent(group) ? disconnectEventMap : connectEventMap;
+ selDisConnMap ? disconnectEventMap : connectEventMap;
LinkedList<ConsumerEvent> eventList = currentEventMap.get(consumerId);
if (eventList != null) {
synchronized (eventList) {
@@ -128,6 +137,11 @@ public class ConsumerEventManager {
event = eventList.removeFirst();
if (eventList.isEmpty()) {
currentEventMap.remove(consumerId);
+ if (selDisConnMap) {
+ masterMetrics.svrBalDisConEventConsumerCnt.decrementAndGet();
+ } else {
+ masterMetrics.svrBalConEventConsumerCnt.decrementAndGet();
+ }
}
}
}
@@ -182,8 +196,15 @@ public class ConsumerEventManager {
}
public void removeAll(String consumerId) {
- disconnectEventMap.remove(consumerId);
- connectEventMap.remove(consumerId);
+ LinkedList<ConsumerEvent> eventInfos =
+ disconnectEventMap.remove(consumerId);
+ if (eventInfos != null) {
+ masterMetrics.svrBalDisConEventConsumerCnt.decrementAndGet();
+ }
+ eventInfos = connectEventMap.remove(consumerId);
+ if (eventInfos != null) {
+ masterMetrics.svrBalConEventConsumerCnt.decrementAndGet();
+ }
}
/**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index fea5dcb..122f668 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -29,6 +29,8 @@ import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
import org.apache.inlong.tubemq.server.common.utils.RowLock;
import org.apache.inlong.tubemq.server.master.MasterConfig;
+import org.apache.inlong.tubemq.server.master.TMaster;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +39,7 @@ public class ConsumerInfoHolder {
private static final Logger logger =
LoggerFactory.getLogger(ConsumerInfoHolder.class);
private final MasterConfig masterConfig; // master configure
+ private final MasterMetric masterMetrics;
private final RowLock groupRowLock; //lock
private final ConcurrentHashMap<String/* group */, ConsumeGroupInfo> groupInfoMap =
new ConcurrentHashMap<>();
@@ -47,8 +50,9 @@ public class ConsumerInfoHolder {
private final ConcurrentHashSet<String/* group */> clientBalanceGroupSet =
new ConcurrentHashSet<>();
- public ConsumerInfoHolder(MasterConfig masterConfig) {
- this.masterConfig = masterConfig;
+ public ConsumerInfoHolder(TMaster tMasterr) {
+ this.masterMetrics = tMasterr.getMasterMetrics();
+ this.masterConfig = tMasterr.getMasterConfig();
this.groupRowLock = new RowLock("Group-RowLock",
this.masterConfig.getRowLockWaitDurMs());
}
@@ -351,14 +355,20 @@ public class ConsumerInfoHolder {
consumeGroupInfo = groupInfoMap.putIfAbsent(group, tmpGroupInfo);
if (consumeGroupInfo == null) {
consumeGroupInfo = tmpGroupInfo;
+ masterMetrics.consumeGroupCnt.incrementAndGet();
if (tmpGroupInfo.isClientBalance()) {
clientBalanceGroupSet.add(group);
+ masterMetrics.cltBalConsumeGroupCnt.incrementAndGet();
} else {
serverBalanceGroupSet.add(group);
}
}
}
if (consumeGroupInfo.addConsumer(consumer, sBuffer, result)) {
+ Boolean isNewAdd = (Boolean) result.checkData;
+ if (isNewAdd) {
+ masterMetrics.consumerCnt.incrementAndGet();
+ }
if (!isNotAllocated) {
consumeGroupInfo.settAllocated();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
index 3b01843..6acd0d0 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
@@ -30,11 +30,12 @@ public class ProducerInfoHolder {
return producerInfoMap.get(producerId);
}
- public void setProducerInfo(String producerId,
- Set<String> topicSet,
- String host, boolean overTLS) {
- producerInfoMap.put(producerId,
+ public boolean setProducerInfo(String producerId,
+ Set<String> topicSet,
+ String host, boolean overTLS) {
+ ProducerInfo oldObj = producerInfoMap.put(producerId,
new ProducerInfo(producerId, topicSet, host, overTLS));
+ return (oldObj == null);
}
public void updateProducerInfo(String producerId,
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java
new file mode 100644
index 0000000..193bebe
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/MasterMetricsTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master;
+
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class MasterMetricsTest {
+ private static final Logger logger = LoggerFactory.getLogger(MasterMetricsTest.class);
+
+ @Test
+ public void testAgentMetrics() {
+ try {
+ MasterMetric taskMetrics = MasterMetric.create();
+ taskMetrics.svrBalLatency.incrementAndGet();
+ Map<String, MetricValue> result = taskMetrics.snapshot();
+ Assert.assertEquals(1, taskMetrics.svrBalLatency.get());
+
+ } catch (Exception ex) {
+ logger.error("error happens" + ex);
+ }
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
index 24d3777..8d93ed0 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
import static org.mockito.Mockito.mock;
import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
+import org.apache.inlong.tubemq.server.master.metrics.MasterMetric;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -27,11 +28,14 @@ import org.junit.Test;
public class ConsumerEventManagerTest {
private ConsumerEventManager consumerEventManager;
private ConsumerInfoHolder consumerInfoHolder;
+ private MasterMetric masterMetrics;
@Before
public void setUp() throws Exception {
consumerInfoHolder = mock(ConsumerInfoHolder.class);
- consumerEventManager = new ConsumerEventManager(consumerInfoHolder);
+ masterMetrics = MasterMetric.create();
+ consumerEventManager =
+ new ConsumerEventManager(consumerInfoHolder, masterMetrics);
}
@After