You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/10/27 06:01:41 UTC
[rocketmq-exporter] branch master updated: [ISSUE #36] Fix broker
stats still report old metrics data on dledger mode
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git
The following commit(s) were added to refs/heads/master by this push:
new e583bd1 [ISSUE #36] Fix broker stats still report old metrics data on dledger mode
e583bd1 is described below
commit e583bd16ce1c6f4b41e9ebc183acac04a24b906c
Author: 张旭 <ma...@gmail.com>
AuthorDate: Tue Oct 27 14:01:34 2020 +0800
[ISSUE #36] Fix broker stats still report old metrics data on dledger mode
---
.../exporter/collector/RMQMetricsCollector.java | 18 +++++++++++++++-
.../rocketmq/exporter/task/MetricsCollectTask.java | 24 ++++++++++++++--------
2 files changed, 32 insertions(+), 10 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
index 4718b11..d3a79c2 100644
--- a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
+++ b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class RMQMetricsCollector extends Collector {
@@ -624,15 +625,30 @@ public class RMQMetricsCollector extends Collector {
consumerClientPullTPS.put(new ConsumerRuntimePullTPSMetric(group, topic, clientAddr, clientId), value);
}
-
public void addBrokerPutNumsMetric(String clusterName, String brokerIP, String brokerName, double value) {
brokerPutNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
}
+ public void clearBrokerPutNumsMetric(Set<BrokerMetric> masterMetricsSet) {
+ for (BrokerMetric brokerMetric : brokerPutNums.keySet()) {
+ if (!masterMetricsSet.contains(brokerMetric)) {
+ brokerPutNums.remove(brokerMetric);
+ }
+ }
+ }
+
public void addBrokerGetNumsMetric(String clusterName, String brokerIP, String brokerName, double value) {
brokerGetNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
}
+ public void clearBrokerGetNumsMetric(Set<BrokerMetric> masterMetricsSet) {
+ for (BrokerMetric brokerMetric : brokerGetNums.keySet()) {
+ if (!masterMetricsSet.contains(brokerMetric)) {
+ brokerGetNums.remove(brokerMetric);
+ }
+ }
+ }
+
public void addBrokerRuntimeStatsMetric(BrokerRuntimeStats stats, String clusterName, String brokerAddress, String brokerHost) {
addBrokerRuntimePutMessageDistributeTimeMap(
clusterName, brokerAddress, brokerHost,
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index b82f454..3829afc 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -43,6 +43,7 @@ import org.apache.rocketmq.exporter.config.CollectClientMetricExecutorConfig;
import org.apache.rocketmq.exporter.config.RMQConfigure;
import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
import org.apache.rocketmq.exporter.model.common.TwoTuple;
+import org.apache.rocketmq.exporter.model.metrics.BrokerMetric;
import org.apache.rocketmq.exporter.service.RMQMetricsService;
import org.apache.rocketmq.exporter.service.client.MQAdminExtImpl;
import org.apache.rocketmq.exporter.util.Utils;
@@ -543,6 +544,7 @@ public class MetricsCollectTask {
return;
}
+ Set<BrokerMetric> masterMetricsSet = new HashSet<>();
Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
@@ -550,29 +552,34 @@ public class MetricsCollectTask {
continue;
}
BrokerStatsData bsd = null;
+ String clusterName = clusterEntry.getValue().getCluster();
+ String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+ String brokerName = clusterEntry.getValue().getBrokerName();
try {
- bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS, clusterEntry.getValue().getCluster());
- String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+ bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS, clusterName);
metricsService.getCollector().addBrokerPutNumsMetric(
- clusterEntry.getValue().getCluster(),
+ clusterName,
brokerIP,
- clusterEntry.getValue().getBrokerName(),
+ brokerName,
Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
} catch (Exception ex) {
log.error(String.format("BROKER_PUT_NUMS-error, master broker=%s", masterAddr), ex);
}
try {
- bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterEntry.getValue().getCluster());
- String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+ bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterName);
metricsService.getCollector().addBrokerGetNumsMetric(
- clusterEntry.getValue().getCluster(),
+ clusterName,
brokerIP,
- clusterEntry.getValue().getBrokerName(),
+ brokerName,
Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
} catch (Exception ex) {
log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);
}
+ masterMetricsSet.add(new BrokerMetric(clusterName, brokerIP, brokerName));
}
+
+ metricsService.getCollector().clearBrokerPutNumsMetric(masterMetricsSet);
+ metricsService.getCollector().clearBrokerGetNumsMetric(masterMetricsSet);
log.info("broker stats collection task finished...." + (System.currentTimeMillis() - start));
}
@@ -619,7 +626,6 @@ public class MetricsCollectTask {
} catch (Exception ex) {
log.error(String.format("collectBrokerRuntimeStats-parse or report broker runtime stats error, %s", JSON.toJSONString(kvTable)), ex);
}
-
}
log.info("broker runtime stats collection task finished...." + (System.currentTimeMillis() - start));