You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2017/06/06 03:38:32 UTC

[12/51] [abbrv] incubator-rocketmq git commit: Include client IP per message queue of consumer progress command output

Include client IP per message queue of consumer progress command output


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8c793c09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8c793c09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8c793c09

Branch: refs/heads/master
Commit: 8c793c09077455bf44e961517514f3551d3d2436
Parents: 3401296
Author: Zhanhui Li <li...@apache.org>
Authored: Wed Mar 29 21:50:59 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800

----------------------------------------------------------------------
 .../consumer/ConsumerProgressSubCommand.java    | 41 ++++++++++++++++----
 1 file changed, 34 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c793c09/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index 35fd260..f341362 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -16,10 +16,6 @@
  */
 package org.apache.rocketmq.tools.command.consumer;
 
-import java.util.Collections;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -30,7 +26,9 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.Connection;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -40,6 +38,13 @@ import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 public class ConsumerProgressSubCommand implements SubCommand {
     private final Logger log = ClientLogger.getLog();
 
@@ -62,6 +67,24 @@ public class ConsumerProgressSubCommand implements SubCommand {
         return options;
     }
 
+    private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
+        String groupName) {
+        Map<MessageQueue, String> results = new HashMap<>();
+        try {
+            ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
+            for (Connection connection : consumerConnection.getConnectionSet()) {
+                String clientId = connection.getClientId();
+                ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId,
+                    false);
+                for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) {
+                    results.put(messageQueue, clientId.split("@")[0]);
+                }
+            }
+        } catch (Exception ignore) {
+        }
+        return results;
+    }
+
     @Override
     public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -75,13 +98,14 @@ public class ConsumerProgressSubCommand implements SubCommand {
                 List<MessageQueue> mqList = new LinkedList<MessageQueue>();
                 mqList.addAll(consumeStats.getOffsetTable().keySet());
                 Collections.sort(mqList);
-
-                System.out.printf("%-32s  %-32s  %-4s  %-20s  %-20s  %-20s  %s%n",
+                Map<MessageQueue, String> messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
+                System.out.printf("%-32s  %-32s  %-4s  %-20s  %-20s  %-20s %-20s  %s%n",
                     "#Topic",
                     "#Broker Name",
                     "#QID",
                     "#Broker Offset",
                     "#Consumer Offset",
+                    "#Client IP",
                     "#Diff",
                     "#LastTime");
 
@@ -95,12 +119,15 @@ public class ConsumerProgressSubCommand implements SubCommand {
                         lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
                     } catch (Exception e) {
                     }
-                    System.out.printf("%-32s  %-32s  %-4d  %-20d  %-20d  %-20d  %s%n",
+
+                    String clientIP = messageQueueAllocationResult.get(mq);
+                    System.out.printf("%-32s  %-32s  %-4d  %-20d  %-20d  %-20s %-20d  %s%n",
                         UtilAll.frontStringAtLeast(mq.getTopic(), 32),
                         UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
                         mq.getQueueId(),
                         offsetWrapper.getBrokerOffset(),
                         offsetWrapper.getConsumerOffset(),
+                        null != clientIP ? clientIP : "NA",
                         diff,
                         lastTime
                     );