You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by wl...@apache.org on 2020/03/08 13:16:27 UTC

[rocketmq-exporter] 07/43: Avoid crash when get consumer offsets as a group of topic consuming

This is an automated email from the ASF dual-hosted git repository.

wlliqipeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git

commit 378184de875742e83cbfc1e92eda98d25c85ef26
Author: breezecoolyang <br...@users.noreply.github.com>
AuthorDate: Mon Jun 3 10:47:05 2019 +0800

    Avoid crash when get consumer offsets as a group of topic consuming
---
 .../rocketmq/exporter/task/MetricsCollectTask.java | 23 ++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index 3f359e9..6ae442d 100644
--- a/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/rocketmq-prometheus-exporter/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -106,17 +106,20 @@ public class MetricsCollectTask {
                 GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
                 if (groupList != null && !groupList.getGroupList().isEmpty()) {
                     for (String group : groupList.getGroupList()) {
-                        ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group,topic);
-                        Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStatus.getOffsetTable().entrySet();
-                        for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
-                            MessageQueue q          =   consumeStatusEntry.getKey();
-                            OffsetWrapper offset    =   consumeStatusEntry.getValue();
-                            if (consumeOffsetMap.containsKey(q.getBrokerName())) {
-                                consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName()) + offset.getConsumerOffset());
-                            }
-                            else {
-                                consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset());
+                        try {
+                            ConsumeStats consumeStatus = mqAdminExt.examineConsumeStats(group, topic);
+                            Set<Map.Entry<MessageQueue, OffsetWrapper>> consumeStatusEntries = consumeStatus.getOffsetTable().entrySet();
+                            for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStatusEntries) {
+                                MessageQueue q = consumeStatusEntry.getKey();
+                                OffsetWrapper offset = consumeStatusEntry.getValue();
+                                if (consumeOffsetMap.containsKey(q.getBrokerName())) {
+                                    consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName()) + offset.getConsumerOffset());
+                                } else {
+                                    consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset());
+                                }
                             }
+                        } catch (Exception e) {
+                            log.info("ignore this consumer", e.getMessage());
                         }
                         Set<Map.Entry<String, Long>> consumeOffsetEntries = consumeOffsetMap.entrySet();
                         for (Map.Entry<String, Long> consumeOffsetEntry : consumeOffsetEntries) {