You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/10/27 06:01:41 UTC

[rocketmq-exporter] branch master updated: [ISSUE #36] Fix broker stats still report old metrics data on dledger mode

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git


The following commit(s) were added to refs/heads/master by this push:
     new e583bd1  [ISSUE #36] Fix broker stats still report old metrics data on dledger mode
e583bd1 is described below

commit e583bd16ce1c6f4b41e9ebc183acac04a24b906c
Author: 张旭 <ma...@gmail.com>
AuthorDate: Tue Oct 27 14:01:34 2020 +0800

    [ISSUE #36] Fix broker stats still report old metrics data on dledger mode
---
 .../exporter/collector/RMQMetricsCollector.java    | 18 +++++++++++++++-
 .../rocketmq/exporter/task/MetricsCollectTask.java | 24 ++++++++++++++--------
 2 files changed, 32 insertions(+), 10 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
index 4718b11..d3a79c2 100644
--- a/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
+++ b/src/main/java/org/apache/rocketmq/exporter/collector/RMQMetricsCollector.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class RMQMetricsCollector extends Collector {
@@ -624,15 +625,30 @@ public class RMQMetricsCollector extends Collector {
         consumerClientPullTPS.put(new ConsumerRuntimePullTPSMetric(group, topic, clientAddr, clientId), value);
     }
 
-
     public void addBrokerPutNumsMetric(String clusterName, String brokerIP, String brokerName, double value) {
         brokerPutNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
     }
 
+    public void clearBrokerPutNumsMetric(Set<BrokerMetric> masterMetricsSet) {
+        for (BrokerMetric brokerMetric : brokerPutNums.keySet()) {
+            if (!masterMetricsSet.contains(brokerMetric)) {
+                brokerPutNums.remove(brokerMetric);
+            }
+        }
+    }
+
     public void addBrokerGetNumsMetric(String clusterName, String brokerIP, String brokerName, double value) {
         brokerGetNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
     }
 
+    public void clearBrokerGetNumsMetric(Set<BrokerMetric> masterMetricsSet) {
+        for (BrokerMetric brokerMetric : brokerGetNums.keySet()) {
+            if (!masterMetricsSet.contains(brokerMetric)) {
+                brokerGetNums.remove(brokerMetric);
+            }
+        }
+    }
+
     public void addBrokerRuntimeStatsMetric(BrokerRuntimeStats stats, String clusterName, String brokerAddress, String brokerHost) {
         addBrokerRuntimePutMessageDistributeTimeMap(
                 clusterName, brokerAddress, brokerHost,
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index b82f454..3829afc 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -43,6 +43,7 @@ import org.apache.rocketmq.exporter.config.CollectClientMetricExecutorConfig;
 import org.apache.rocketmq.exporter.config.RMQConfigure;
 import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
 import org.apache.rocketmq.exporter.model.common.TwoTuple;
+import org.apache.rocketmq.exporter.model.metrics.BrokerMetric;
 import org.apache.rocketmq.exporter.service.RMQMetricsService;
 import org.apache.rocketmq.exporter.service.client.MQAdminExtImpl;
 import org.apache.rocketmq.exporter.util.Utils;
@@ -543,6 +544,7 @@ public class MetricsCollectTask {
             return;
         }
 
+        Set<BrokerMetric> masterMetricsSet = new HashSet<>();
         Set<Map.Entry<String, BrokerData>> clusterEntries = clusterInfo.getBrokerAddrTable().entrySet();
         for (Map.Entry<String, BrokerData> clusterEntry : clusterEntries) {
             String masterAddr = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
@@ -550,29 +552,34 @@ public class MetricsCollectTask {
                 continue;
             }
             BrokerStatsData bsd = null;
+            String clusterName = clusterEntry.getValue().getCluster();
+            String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+            String brokerName = clusterEntry.getValue().getBrokerName();
             try {
-                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS, clusterEntry.getValue().getCluster());
-                String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS, clusterName);
                 metricsService.getCollector().addBrokerPutNumsMetric(
-                    clusterEntry.getValue().getCluster(),
+                    clusterName,
                     brokerIP,
-                    clusterEntry.getValue().getBrokerName(),
+                    brokerName,
                     Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
             } catch (Exception ex) {
                 log.error(String.format("BROKER_PUT_NUMS-error, master broker=%s", masterAddr), ex);
             }
             try {
-                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterEntry.getValue().getCluster());
-                String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+                bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterName);
                 metricsService.getCollector().addBrokerGetNumsMetric(
-                    clusterEntry.getValue().getCluster(),
+                    clusterName,
                     brokerIP,
-                    clusterEntry.getValue().getBrokerName(),
+                    brokerName,
                     Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
             } catch (Exception ex) {
                 log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);
             }
+            masterMetricsSet.add(new BrokerMetric(clusterName, brokerIP, brokerName));
         }
+
+        metricsService.getCollector().clearBrokerPutNumsMetric(masterMetricsSet);
+        metricsService.getCollector().clearBrokerGetNumsMetric(masterMetricsSet);
         log.info("broker stats collection task finished...." + (System.currentTimeMillis() - start));
     }
 
@@ -619,7 +626,6 @@ public class MetricsCollectTask {
             } catch (Exception ex) {
                 log.error(String.format("collectBrokerRuntimeStats-parse or report broker runtime stats error, %s", JSON.toJSONString(kvTable)), ex);
             }
-
         }
 
         log.info("broker runtime stats collection task finished...." + (System.currentTimeMillis() - start));