You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2017/06/06 03:38:32 UTC
[12/51] [abbrv] incubator-rocketmq git commit: Include client IP per
message queue of consumer progress command output
Include client IP per message queue of consumer progress command output
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8c793c09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8c793c09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8c793c09
Branch: refs/heads/master
Commit: 8c793c09077455bf44e961517514f3551d3d2436
Parents: 3401296
Author: Zhanhui Li <li...@apache.org>
Authored: Wed Mar 29 21:50:59 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800
----------------------------------------------------------------------
.../consumer/ConsumerProgressSubCommand.java | 41 ++++++++++++++++----
1 file changed, 34 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c793c09/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index 35fd260..f341362 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -16,10 +16,6 @@
*/
package org.apache.rocketmq.tools.command.consumer;
-import java.util.Collections;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -30,7 +26,9 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -40,6 +38,13 @@ import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.slf4j.Logger;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
public class ConsumerProgressSubCommand implements SubCommand {
private final Logger log = ClientLogger.getLog();
@@ -62,6 +67,24 @@ public class ConsumerProgressSubCommand implements SubCommand {
return options;
}
+ private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
+ String groupName) {
+ Map<MessageQueue, String> results = new HashMap<>();
+ try {
+ ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
+ for (Connection connection : consumerConnection.getConnectionSet()) {
+ String clientId = connection.getClientId();
+ ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId,
+ false);
+ for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) {
+ results.put(messageQueue, clientId.split("@")[0]);
+ }
+ }
+ } catch (Exception ignore) {
+ }
+ return results;
+ }
+
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -75,13 +98,14 @@ public class ConsumerProgressSubCommand implements SubCommand {
List<MessageQueue> mqList = new LinkedList<MessageQueue>();
mqList.addAll(consumeStats.getOffsetTable().keySet());
Collections.sort(mqList);
-
- System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %s%n",
+ Map<MessageQueue, String> messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
+ System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %-20s %s%n",
"#Topic",
"#Broker Name",
"#QID",
"#Broker Offset",
"#Consumer Offset",
+ "#Client IP",
"#Diff",
"#LastTime");
@@ -95,12 +119,15 @@ public class ConsumerProgressSubCommand implements SubCommand {
lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
} catch (Exception e) {
}
- System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20d %s%n",
+
+ String clientIP = messageQueueAllocationResult.get(mq);
+ System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20s %-20d %s%n",
UtilAll.frontStringAtLeast(mq.getTopic(), 32),
UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
mq.getQueueId(),
offsetWrapper.getBrokerOffset(),
offsetWrapper.getConsumerOffset(),
+ null != clientIP ? clientIP : "NA",
diff,
lastTime
);