You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:21 UTC
[04/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
new file mode 100644
index 0000000..5f5409b
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.common.protocol.body.KVTable;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerStatusSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "brokerStatus";
+ }
+
+
+ @Override
+ public String commandDesc() {
+ return "Fetch broker runtime status data";
+ }
+
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "Broker address");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "which cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+
+ String brokerAddr = commandLine.hasOption('b') ? commandLine.getOptionValue('b').trim() : null;
+ String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null;
+ if (brokerAddr != null) {
+ printBrokerRuntimeStats(defaultMQAdminExt, brokerAddr, false);
+ } else if (clusterName != null) {
+ Set<String> masterSet =
+ CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String ba : masterSet) {
+ try {
+ printBrokerRuntimeStats(defaultMQAdminExt, ba, true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+ public void printBrokerRuntimeStats(final DefaultMQAdminExt defaultMQAdminExt, final String brokerAddr, final boolean printBroker) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(brokerAddr);
+
+ TreeMap<String, String> tmp = new TreeMap<String, String>();
+ tmp.putAll(kvTable.getTable());
+
+ Iterator<Entry<String, String>> it = tmp.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, String> next = it.next();
+ if (printBroker) {
+ System.out.printf("%-24s %-32s: %s%n", brokerAddr, next.getKey(), next.getValue());
+ } else {
+ System.out.printf("%-32s: %s%n", next.getKey(), next.getValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java
new file mode 100644
index 0000000..c2918c1
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class CleanExpiredCQSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "cleanExpiredCQ";
+ }
+
+
+ @Override
+ public String commandDesc() {
+ return "Clean expired ConsumeQueue on broker.";
+ }
+
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "Broker address");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "cluster", true, "clustername");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ boolean result = false;
+ defaultMQAdminExt.start();
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+ result = defaultMQAdminExt.cleanExpiredConsumerQueueByAddr(addr);
+
+ } else {
+ String cluster = commandLine.getOptionValue('c');
+ if (null != cluster)
+ cluster = cluster.trim();
+ result = defaultMQAdminExt.cleanExpiredConsumerQueue(cluster);
+ }
+ System.out.printf(result ? "success" : "false");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java
new file mode 100644
index 0000000..f7b543f
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class CleanUnusedTopicCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "cleanUnusedTopic";
+ }
+
+
+ @Override
+ public String commandDesc() {
+ return "Clean unused topic on broker.";
+ }
+
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "Broker address");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "cluster", true, "cluster name");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ boolean result = false;
+ defaultMQAdminExt.start();
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+ result = defaultMQAdminExt.cleanUnusedTopicByAddr(addr);
+
+ } else {
+ String cluster = commandLine.getOptionValue('c');
+ if (null != cluster)
+ cluster = cluster.trim();
+ result = defaultMQAdminExt.cleanUnusedTopicByAddr(cluster);
+ }
+ System.out.printf(result ? "success" : "false");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
new file mode 100644
index 0000000..703d69b
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.admin.MQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author xigu.lx
+ */
+public class GetBrokerConfigCommand implements SubCommand {
+ @Override
+ public String commandName() {
+ return "getBrokerConfig";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get broker config by cluster or special broker!";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "update which broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "update which cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+
+ if (commandLine.hasOption('b')) {
+ String brokerAddr = commandLine.getOptionValue('b').trim();
+ defaultMQAdminExt.start();
+
+ getAndPrint(defaultMQAdminExt,
+ String.format("============%s============\n", brokerAddr),
+ brokerAddr);
+
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+ defaultMQAdminExt.start();
+
+ Map<String, List<String>> masterAndSlaveMap
+ = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
+
+ for (String masterAddr : masterAndSlaveMap.keySet()) {
+
+ getAndPrint(
+ defaultMQAdminExt,
+ String.format("============Master: %s============\n", masterAddr),
+ masterAddr
+ );
+ for (String slaveAddr : masterAndSlaveMap.get(masterAddr)) {
+
+ getAndPrint(
+ defaultMQAdminExt,
+ String.format("============My Master: %s=====Slave: %s============\n", masterAddr, slaveAddr),
+ slaveAddr
+ );
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+ protected void getAndPrint(final MQAdminExt defaultMQAdminExt, final String printPrefix, final String addr)
+ throws InterruptedException, RemotingConnectException,
+ UnsupportedEncodingException, RemotingTimeoutException,
+ MQBrokerException, RemotingSendRequestException {
+
+ System.out.print(printPrefix);
+
+ Properties properties = defaultMQAdminExt.getBrokerConfig(addr);
+ if (properties == null) {
+ System.out.printf("Broker[%s] has no config property!\n", addr);
+ return;
+ }
+
+ for (Object key : properties.keySet()) {
+ System.out.printf("%-50s= %s\n", key, properties.get(key));
+ }
+
+ System.out.printf("%n");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java
new file mode 100644
index 0000000..165e397
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.io.UnsupportedEncodingException;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class SendMsgStatusCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "sendMsgStatus";
+ }
+
+
+ @Override
+ public String commandDesc() {
+ return "send msg to broker.";
+ }
+
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerName", true, "Broker Name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("s", "messageSize", true, "Message Size, Default: 128");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "count", true, "send message count, Default: 50");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ final DefaultMQProducer producer = new DefaultMQProducer("PID_SMSC", rpcHook);
+ producer.setInstanceName("PID_SMSC_" + System.currentTimeMillis());
+
+ try {
+ producer.start();
+ String brokerName = commandLine.getOptionValue('b').trim();
+ int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
+ int count = commandLine.hasOption('c') ? Integer.parseInt(commandLine.getOptionValue('c')) : 50;
+
+ producer.send(buildMessage(brokerName, 16));
+
+ for (int i = 0; i < count; i++) {
+ long begin = System.currentTimeMillis();
+ SendResult result = producer.send(buildMessage(brokerName, messageSize));
+ System.out.printf("rt:" + (System.currentTimeMillis() - begin) + "ms, SendResult=" + result);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ producer.shutdown();
+ }
+ }
+
+
+ private static Message buildMessage(final String topic, final int messageSize) throws UnsupportedEncodingException {
+ Message msg = new Message();
+ msg.setTopic(topic);
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < messageSize; i += 11) {
+ sb.append("hello jodie");
+ }
+ msg.setBody(sb.toString().getBytes(MixAll.DEFAULT_CHARSET));
+ return msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
new file mode 100644
index 0000000..86938a7
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class UpdateBrokerConfigSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "updateBrokerConfig";
+ }
+
+
+ @Override
+ public String commandDesc() {
+ return "Update broker's config";
+ }
+
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "update which broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "update which cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("k", "key", true, "config key");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("v", "value", true, "config value");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ String key = commandLine.getOptionValue('k').trim();
+ String value = commandLine.getOptionValue('v').trim();
+ Properties properties = new Properties();
+ properties.put(key, value);
+
+ if (commandLine.hasOption('b')) {
+ String brokerAddr = commandLine.getOptionValue('b').trim();
+
+ defaultMQAdminExt.start();
+
+ defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties);
+ System.out.printf("update broker config success, %s\n", brokerAddr);
+ return;
+
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+
+ defaultMQAdminExt.start();
+
+ Set<String> masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String brokerAddr : masterSet) {
+ try {
+ defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties);
+ System.out.printf("update broker config success, %s\n", brokerAddr);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
new file mode 100644
index 0000000..3a28522
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.cluster;
+
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * @author fengliang.hfl
+ */
+public class CLusterSendMsgRTCommand implements SubCommand {
+
+ public static void main(String args[]) {
+ }
+
+ @Override
+ public String commandName() {
+ return "clusterRT";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "List All clusters Message Send RT";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("a", "amout", true, "message amout | default 100");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("s", "size", true, "message size | default 128 Byte");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("c", "cluster", true, "cluster name | default display all cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("p", "print log", true, "print as tlog | default false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("m", "machine room", true, "machine room name | default noname");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("i", "interval", true, "print interval | default 10 seconds");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
+ producer.setProducerGroup(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+ producer.start();
+
+ ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
+ HashMap<String, Set<String>> clusterAddr = clusterInfoSerializeWrapper
+ .getClusterAddrTable();
+
+ Set<String> clusterNames = null;
+
+ long amount = !commandLine.hasOption('a') ? 50 : Long.parseLong(commandLine
+ .getOptionValue('a').trim());
+
+ long size = !commandLine.hasOption('s') ? 128 : Long.parseLong(commandLine
+ .getOptionValue('s').trim());
+
+ long interval = !commandLine.hasOption('i') ? 10 : Long.parseLong(commandLine
+ .getOptionValue('i').trim());
+
+ boolean printAsTlog = !commandLine.hasOption('p') ? false : Boolean
+ .parseBoolean(commandLine.getOptionValue('p').trim());
+
+ String machineRoom = !commandLine.hasOption('m') ? "noname" : commandLine
+ .getOptionValue('m').trim();
+
+ if (commandLine.hasOption('c')) {
+ clusterNames = new TreeSet<String>();
+ clusterNames.add(commandLine.getOptionValue('c').trim());
+ } else {
+ clusterNames = clusterAddr.keySet();
+ }
+
+ if (!printAsTlog) {
+ System.out.printf("%-24s %-24s %-4s %-8s %-8s%n",
+ "#Cluster Name",
+ "#Broker Name",
+ "#RT",
+ "#successCount",
+ "#failCount"
+ );
+ }
+
+ while (true) {
+ for (String clusterName : clusterNames) {
+ Set<String> brokerNames = clusterAddr.get(clusterName);
+ if (brokerNames == null) {
+ System.out.printf("cluster [%s] not exist", clusterName);
+ break;
+ }
+
+ for (String brokerName : brokerNames) {
+ Message msg = new Message(brokerName, getStringBySize(size).getBytes(MixAll.DEFAULT_CHARSET));
+ long start = 0;
+ long end = 0;
+ long elapsed = 0;
+ int successCount = 0;
+ int failCount = 0;
+
+ for (int i = 0; i < amount; i++) {
+ start = System.currentTimeMillis();
+ try {
+ producer.send(msg);
+ successCount++;
+ end = System.currentTimeMillis();
+ } catch (Exception e) {
+ failCount++;
+ end = System.currentTimeMillis();
+ }
+
+ if (i != 0) {
+ elapsed += end - start;
+ }
+ }
+
+ double rt = (double) elapsed / (amount - 1);
+ if (!printAsTlog) {
+ System.out.printf("%-24s %-24s %-8s %-16s %-16s%n",
+ clusterName,
+ brokerName,
+ String.format("%.2f", rt),
+ successCount,
+ failCount
+ );
+ } else {
+ System.out.printf(String.format("%s|%s|%s|%s|%s%n", getCurTime(),
+ machineRoom, clusterName, brokerName,
+ new BigDecimal(rt).setScale(0, BigDecimal.ROUND_HALF_UP)));
+ }
+
+ }
+
+ }
+
+ Thread.sleep(interval * 1000);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ producer.shutdown();
+ }
+ }
+
+ public String getStringBySize(long size) {
+ StringBuilder res = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ res.append('a');
+ }
+ return res.toString();
+ }
+
+ public String getCurTime() {
+ String fromTimeZone = "GMT+8";
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ Date date = new Date();
+ format.setTimeZone(TimeZone.getTimeZone(fromTimeZone));
+ String chinaDate = format.format(date);
+ return chinaDate;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java
new file mode 100644
index 0000000..baf4f3c
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.cluster;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
+import com.alibaba.rocketmq.common.protocol.body.KVTable;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClusterListSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "clusterList";
+ }
+
+
+ @Override
+ public String commandDesc() {
+ return "List all of clusters";
+ }
+
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("m", "moreStats", false, "Print more stats");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("i", "interval", true, "specify intervals numbers, it is in seconds");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ long printInterval = 1;
+ boolean enableInterval = commandLine.hasOption('i');
+
+ if (enableInterval) {
+ printInterval = Long.parseLong(commandLine.getOptionValue('i')) * 1000;
+ }
+
+ try {
+ defaultMQAdminExt.start();
+ long i = 0;
+
+ do {
+ if (i++ > 0) {
+ Thread.sleep(printInterval);
+ }
+ if (commandLine.hasOption('m')) {
+ this.printClusterMoreStats(defaultMQAdminExt);
+ } else {
+ this.printClusterBaseInfo(defaultMQAdminExt);
+ }
+ } while (enableInterval);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+ private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException,
+ RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException {
+
+ ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
+
+ System.out.printf("%-16s %-32s %14s %14s %14s %14s%n",
+ "#Cluster Name",
+ "#Broker Name",
+ "#InTotalYest",
+ "#OutTotalYest",
+ "#InTotalToday",
+ "#OutTotalToday"
+ );
+
+ Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator();
+ while (itCluster.hasNext()) {
+ Map.Entry<String, Set<String>> next = itCluster.next();
+ String clusterName = next.getKey();
+ TreeSet<String> brokerNameSet = new TreeSet<String>();
+ brokerNameSet.addAll(next.getValue());
+
+ for (String brokerName : brokerNameSet) {
+ BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+ if (brokerData != null) {
+
+ Iterator<Map.Entry<Long, String>> itAddr = brokerData.getBrokerAddrs().entrySet().iterator();
+ while (itAddr.hasNext()) {
+ Map.Entry<Long, String> next1 = itAddr.next();
+ long inTotalYest = 0;
+ long outTotalYest = 0;
+ long inTotalToday = 0;
+ long outTotalToday = 0;
+
+ try {
+ KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(next1.getValue());
+ String msgPutTotalYesterdayMorning = kvTable.getTable().get("msgPutTotalYesterdayMorning");
+ String msgPutTotalTodayMorning = kvTable.getTable().get("msgPutTotalTodayMorning");
+ String msgPutTotalTodayNow = kvTable.getTable().get("msgPutTotalTodayNow");
+ String msgGetTotalYesterdayMorning = kvTable.getTable().get("msgGetTotalYesterdayMorning");
+ String msgGetTotalTodayMorning = kvTable.getTable().get("msgGetTotalTodayMorning");
+ String msgGetTotalTodayNow = kvTable.getTable().get("msgGetTotalTodayNow");
+
+ inTotalYest = Long.parseLong(msgPutTotalTodayMorning) - Long.parseLong(msgPutTotalYesterdayMorning);
+ outTotalYest = Long.parseLong(msgGetTotalTodayMorning) - Long.parseLong(msgGetTotalYesterdayMorning);
+
+ inTotalToday = Long.parseLong(msgPutTotalTodayNow) - Long.parseLong(msgPutTotalTodayMorning);
+ outTotalToday = Long.parseLong(msgGetTotalTodayNow) - Long.parseLong(msgGetTotalTodayMorning);
+
+ } catch (Exception e) {
+ }
+
+ System.out.printf("%-16s %-32s %14d %14d %14d %14d%n",
+ clusterName,
+ brokerName,
+ inTotalYest,
+ outTotalYest,
+ inTotalToday,
+ outTotalToday
+ );
+ }
+ }
+ }
+
+ if (itCluster.hasNext()) {
+ System.out.printf("");
+ }
+ }
+ }
+
+ private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException,
+ RemotingSendRequestException, InterruptedException, MQBrokerException {
+
+ ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
+
+ System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",
+ "#Cluster Name",
+ "#Broker Name",
+ "#BID",
+ "#Addr",
+ "#Version",
+ "#InTPS(LOAD)",
+ "#OutTPS(LOAD)",
+ "#PCWait(ms)",
+ "#Hour",
+ "#SPACE"
+ );
+
+ Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator();
+ while (itCluster.hasNext()) {
+ Map.Entry<String, Set<String>> next = itCluster.next();
+ String clusterName = next.getKey();
+ TreeSet<String> brokerNameSet = new TreeSet<String>();
+ brokerNameSet.addAll(next.getValue());
+
+ for (String brokerName : brokerNameSet) {
+ BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+ if (brokerData != null) {
+
+ Iterator<Map.Entry<Long, String>> itAddr = brokerData.getBrokerAddrs().entrySet().iterator();
+ while (itAddr.hasNext()) {
+ Map.Entry<Long, String> next1 = itAddr.next();
+ double in = 0;
+ double out = 0;
+ String version = "";
+ String sendThreadPoolQueueSize = "";
+ String pullThreadPoolQueueSize = "";
+ String sendThreadPoolQueueHeadWaitTimeMills = "";
+ String pullThreadPoolQueueHeadWaitTimeMills = "";
+ String pageCacheLockTimeMills = "";
+ String earliestMessageTimeStamp = "";
+ String commitLogDiskRatio = "";
+ try {
+ KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(next1.getValue());
+ String putTps = kvTable.getTable().get("putTps");
+ String getTransferedTps = kvTable.getTable().get("getTransferedTps");
+ sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize");
+ pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize");
+
+ sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize");
+ pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize");
+
+ sendThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills");
+ pullThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills");
+ pageCacheLockTimeMills = kvTable.getTable().get("pageCacheLockTimeMills");
+ earliestMessageTimeStamp = kvTable.getTable().get("earliestMessageTimeStamp");
+ commitLogDiskRatio = kvTable.getTable().get("commitLogDiskRatio");
+
+ version = kvTable.getTable().get("brokerVersionDesc");
+ {
+ String[] tpss = putTps.split(" ");
+ if (tpss != null && tpss.length > 0) {
+ in = Double.parseDouble(tpss[0]);
+ }
+ }
+
+ {
+ String[] tpss = getTransferedTps.split(" ");
+ if (tpss != null && tpss.length > 0) {
+ out = Double.parseDouble(tpss[0]);
+ }
+ }
+ } catch (Exception e) {
+ }
+
+ double hour = 0.0;
+ double space = 0.0;
+
+ if (earliestMessageTimeStamp != null && earliestMessageTimeStamp.length() > 0) {
+ long mills = System.currentTimeMillis() - Long.valueOf(earliestMessageTimeStamp);
+ hour = mills / 1000.0 / 60.0 / 60.0;
+ }
+
+ if (commitLogDiskRatio != null && commitLogDiskRatio.length() > 0) {
+ space = Double.valueOf(commitLogDiskRatio);
+ }
+
+ System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",
+ clusterName,
+ brokerName,
+ next1.getKey().longValue(),
+ next1.getValue(),
+ version,
+ String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
+ String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
+ pageCacheLockTimeMills,
+ String.format("%2.2f", hour),
+ String.format("%.4f", space)
+ );
+ }
+ }
+ }
+
+ if (itCluster.hasNext()) {
+ System.out.printf("");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
new file mode 100644
index 0000000..aa0598e
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.connection;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.protocol.body.Connection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerConnectionSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "consumerConnection";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Query consumer's socket connection, client version and subscription";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("g", "consumerGroup", true, "consumer group name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+
+ String group = commandLine.getOptionValue('g').trim();
+
+ ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group);
+
+
+ int i = 1;
+ for (Connection conn : cc.getConnectionSet()) {
+ System.out.printf("%03d %-32s %-22s %-8s %s%n",
+ i++,
+ conn.getClientId(),
+ conn.getClientAddr(),
+ conn.getLanguage(),
+ MQVersion.getVersionDesc(conn.getVersion())
+ );
+ }
+
+ System.out.printf("%nBelow is subscription:");
+ Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator();
+ i = 1;
+ while (it.hasNext()) {
+ Entry<String, SubscriptionData> entry = it.next();
+ SubscriptionData sd = entry.getValue();
+ System.out.printf("%03d Topic: %-40s SubExpression: %s%n",
+ i++,
+ sd.getTopic(),
+ sd.getSubString()
+ );
+ }
+
+ System.out.printf("");
+ System.out.printf("ConsumeType: %s%n", cc.getConsumeType());
+ System.out.printf("MessageModel: %s%n", cc.getMessageModel());
+ System.out.printf("ConsumeFromWhere: %s%n", cc.getConsumeFromWhere());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
new file mode 100644
index 0000000..97ba792
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.connection;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.protocol.body.Connection;
+import com.alibaba.rocketmq.common.protocol.body.ProducerConnection;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ProducerConnectionSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "producerConnection";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Query producer's socket connection and client version";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("g", "producerGroup", true, "producer group name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", true, "topic name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+
+ String group = commandLine.getOptionValue('g').trim();
+ String topic = commandLine.getOptionValue('t').trim();
+
+ ProducerConnection pc = defaultMQAdminExt.examineProducerConnectionInfo(group, topic);
+
+ int i = 1;
+ for (Connection conn : pc.getConnectionSet()) {
+ System.out.printf("%04d %-32s %-22s %-8s %s%n",
+ i++,
+ conn.getClientId(),
+ conn.getClientAddr(),
+ conn.getLanguage(),
+ MQVersion.getVersionDesc(conn.getVersion())
+ );
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
new file mode 100644
index 0000000..d09b74a
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.consumer;
+
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.admin.OffsetWrapper;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
+import com.alibaba.rocketmq.common.protocol.body.TopicList;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerProgressSubCommand implements SubCommand {
+ private final Logger log = ClientLogger.getLog();
+
+ @Override
+ public String commandName() {
+ return "consumerProgress";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Query consumers's progress, speed";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("g", "groupName", true, "consumer group name");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+ if (commandLine.hasOption('g')) {
+ String consumerGroup = commandLine.getOptionValue('g').trim();
+ ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
+ List<MessageQueue> mqList = new LinkedList<MessageQueue>();
+ mqList.addAll(consumeStats.getOffsetTable().keySet());
+ Collections.sort(mqList);
+
+ System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %s%n",
+ "#Topic",
+ "#Broker Name",
+ "#QID",
+ "#Broker Offset",
+ "#Consumer Offset",
+ "#Diff",
+ "#LastTime");
+
+ long diffTotal = 0L;
+ for (MessageQueue mq : mqList) {
+ OffsetWrapper offsetWrapper = consumeStats.getOffsetTable().get(mq);
+ long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
+ diffTotal += diff;
+ String lastTime = "";
+ try {
+ lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
+ } catch (Exception e) {
+ }
+ System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20d %s%n",
+ UtilAll.frontStringAtLeast(mq.getTopic(), 32),
+ UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
+ mq.getQueueId(),
+ offsetWrapper.getBrokerOffset(),
+ offsetWrapper.getConsumerOffset(),
+ diff,
+ lastTime
+ );
+ }
+
+ System.out.printf("%n");
+ System.out.printf("Consume TPS: %s%n", consumeStats.getConsumeTps());
+ System.out.printf("Diff Total: %d%n", diffTotal);
+ } else {
+ System.out.printf("%-32s %-6s %-24s %-5s %-14s %-7s %s%n",
+ "#Group",
+ "#Count",
+ "#Version",
+ "#Type",
+ "#Model",
+ "#TPS",
+ "#Diff Total"
+ );
+ TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
+ for (String topic : topicList.getTopicList()) {
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ try {
+ ConsumeStats consumeStats = null;
+ try {
+ consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
+ } catch (Exception e) {
+ log.warn("examineConsumeStats exception, " + consumerGroup, e);
+ }
+
+ ConsumerConnection cc = null;
+ try {
+ cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
+ } catch (Exception e) {
+ log.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e);
+ }
+
+ GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
+ groupConsumeInfo.setGroup(consumerGroup);
+
+ if (consumeStats != null) {
+ groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps());
+ groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
+ }
+
+ if (cc != null) {
+ groupConsumeInfo.setCount(cc.getConnectionSet().size());
+ groupConsumeInfo.setMessageModel(cc.getMessageModel());
+ groupConsumeInfo.setConsumeType(cc.getConsumeType());
+ groupConsumeInfo.setVersion(cc.computeMinVersion());
+ }
+
+ System.out.printf("%-32s %-6d %-24s %-5s %-14s %-7d %d%n",
+ UtilAll.frontStringAtLeast(groupConsumeInfo.getGroup(), 32),
+ groupConsumeInfo.getCount(),
+ groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE",
+ groupConsumeInfo.consumeTypeDesc(),
+ groupConsumeInfo.messageModelDesc(),
+ groupConsumeInfo.getConsumeTps(),
+ groupConsumeInfo.getDiffTotal()
+ );
+ } catch (Exception e) {
+ log.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + consumerGroup, e);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
+
+
+class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
+ private String group;
+ private int version;
+ private int count;
+ private ConsumeType consumeType;
+ private MessageModel messageModel;
+ private int consumeTps;
+ private long diffTotal;
+
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String consumeTypeDesc() {
+ if (this.count != 0) {
+ return this.getConsumeType() == ConsumeType.CONSUME_ACTIVELY ? "PULL" : "PUSH";
+ }
+ return "";
+ }
+
+ public ConsumeType getConsumeType() {
+ return consumeType;
+ }
+
+ public void setConsumeType(ConsumeType consumeType) {
+ this.consumeType = consumeType;
+ }
+
+ public String messageModelDesc() {
+ if (this.count != 0 && this.getConsumeType() == ConsumeType.CONSUME_PASSIVELY) {
+ return this.getMessageModel().toString();
+ }
+ return "";
+ }
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+
+ public String versionDesc() {
+ if (this.count != 0) {
+ return MQVersion.getVersionDesc(this.version);
+ }
+ return "";
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+
+ public long getDiffTotal() {
+ return diffTotal;
+ }
+
+
+ public void setDiffTotal(long diffTotal) {
+ this.diffTotal = diffTotal;
+ }
+
+
+ @Override
+ public int compareTo(GroupConsumeInfo o) {
+ if (this.count != o.count) {
+ return o.count - this.count;
+ }
+
+ return (int) (o.diffTotal - diffTotal);
+ }
+
+
+ public int getConsumeTps() {
+ return consumeTps;
+ }
+
+
+ public void setConsumeTps(int consumeTps) {
+ this.consumeTps = consumeTps;
+ }
+
+
+ public int getVersion() {
+ return version;
+ }
+
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
new file mode 100644
index 0000000..cf796d8
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.consumer;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.protocol.body.Connection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.MQAdminStartup;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerStatusSubCommand implements SubCommand {
+
+ public static void main(String[] args) {
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
+ MQAdminStartup.main(new String[]{new ConsumerStatusSubCommand().commandName(), "-g", "benchmark_consumer"});
+ }
+
+ @Override
+ public String commandName() {
+ return "consumerStatus";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Query consumer's internal data structure";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("g", "consumerGroup", true, "consumer group name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("i", "clientId", true, "The consumer's client id");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("s", "jstack", false, "Run jstack command in the consumer progress");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+ String group = commandLine.getOptionValue('g').trim();
+ ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group);
+ boolean jstack = commandLine.hasOption('s');
+ if (!commandLine.hasOption('i')) {
+ int i = 1;
+ long now = System.currentTimeMillis();
+ final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable = new TreeMap<String, ConsumerRunningInfo>();
+ for (Connection conn : cc.getConnectionSet()) {
+ try {
+ ConsumerRunningInfo consumerRunningInfo =
+ defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack);
+ if (consumerRunningInfo != null) {
+ criTable.put(conn.getClientId(), consumerRunningInfo);
+ String filePath = now + "/" + conn.getClientId();
+ MixAll.string2FileNotSafe(consumerRunningInfo.formatString(), filePath);
+ System.out.printf("%03d %-40s %-20s %s%n",
+ i++,
+ conn.getClientId(),
+ MQVersion.getVersionDesc(conn.getVersion()),
+ filePath);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ if (!criTable.isEmpty()) {
+ boolean subSame = ConsumerRunningInfo.analyzeSubscription(criTable);
+
+ boolean rebalanceOK = subSame && ConsumerRunningInfo.analyzeRebalance(criTable);
+
+ if (subSame) {
+ System.out.printf("%n%nSame subscription in the same group of consumer");
+ System.out.printf("%n%nRebalance %s%n", rebalanceOK ? "OK" : "Failed");
+ Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerRunningInfo> next = it.next();
+ String result =
+ ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
+ if (result.length() > 0) {
+ System.out.printf(result);
+ }
+ }
+ } else {
+ System.out.printf("%n%nWARN: Different subscription in the same group of consumer!!!");
+ }
+ }
+ } else {
+ String clientId = commandLine.getOptionValue('i').trim();
+ ConsumerRunningInfo consumerRunningInfo =
+ defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack);
+ if (consumerRunningInfo != null) {
+ System.out.printf("%s", consumerRunningInfo.formatString());
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java
new file mode 100644
index 0000000..373da1e
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.consumer;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.protocol.body.Connection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.MQAdminStartup;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerSubCommand implements SubCommand {
+
+ public static void main(String[] args) {
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
+ MQAdminStartup.main(new String[]{new ConsumerSubCommand().commandName(), "-g", "benchmark_consumer"});
+ }
+
+ @Override
+ public String commandName() {
+ return "consumer";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Query consumer's connection, status, etc.";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("g", "consumerGroup", true, "consumer group name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("s", "jstack", false, "Run jstack command in the consumer progress");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+ String group = commandLine.getOptionValue('g').trim();
+ ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group);
+ boolean jstack = commandLine.hasOption('s');
+
+ if (!commandLine.hasOption('i')) {
+
+ int i = 1;
+ long now = System.currentTimeMillis();
+ final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable =
+ new TreeMap<String, ConsumerRunningInfo>();
+ for (Connection conn : cc.getConnectionSet()) {
+ try {
+ ConsumerRunningInfo consumerRunningInfo =
+ defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack);
+ if (consumerRunningInfo != null) {
+ criTable.put(conn.getClientId(), consumerRunningInfo);
+ String filePath = now + "/" + conn.getClientId();
+ MixAll.string2FileNotSafe(consumerRunningInfo.formatString(), filePath);
+ System.out.printf("%03d %-40s %-20s %s%n",
+ i++,
+ conn.getClientId(),
+ MQVersion.getVersionDesc(conn.getVersion()),
+ filePath);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ if (!criTable.isEmpty()) {
+ boolean subSame = ConsumerRunningInfo.analyzeSubscription(criTable);
+ boolean rebalanceOK = subSame && ConsumerRunningInfo.analyzeRebalance(criTable);
+
+ if (subSame) {
+ System.out.printf("%n%nSame subscription in the same group of consumer");
+ System.out.printf("%n%nRebalance %s%n", rebalanceOK ? "OK" : "Failed");
+
+ Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerRunningInfo> next = it.next();
+ String result =
+ ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
+ if (result.length() > 0) {
+ System.out.printf(result);
+ }
+ }
+ } else {
+ System.out.printf("%n%nWARN: Different subscription in the same group of consumer!!!");
+ }
+ }
+ } else {
+ String clientId = commandLine.getOptionValue('i').trim();
+ ConsumerRunningInfo consumerRunningInfo =
+ defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack);
+ if (consumerRunningInfo != null) {
+ System.out.printf(consumerRunningInfo.formatString());
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
new file mode 100644
index 0000000..712a0d0
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.consumer;
+
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import com.alibaba.rocketmq.tools.command.topic.DeleteTopicSubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Set;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class DeleteSubscriptionGroupCommand implements SubCommand {
+ @Override
+ public String commandName() {
+ return "deleteSubGroup";
+ }
+
+
+ @Override
+ public String commandDesc() {
+ return "Delete subscription group from broker.";
+ }
+
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "delete subscription group from which broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "delete subscription group from which cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "groupName", true, "subscription group name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
+ adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ try {
+ // groupName
+ String groupName = commandLine.getOptionValue('g').trim();
+
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+ adminExt.start();
+
+ adminExt.deleteSubscriptionGroup(addr, groupName);
+ System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName,
+ addr);
+
+ return;
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+ adminExt.start();
+
+ Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+ for (String master : masterSet) {
+ adminExt.deleteSubscriptionGroup(master, groupName);
+ System.out.printf(
+ "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n",
+ groupName, master, clusterName);
+ }
+
+ try {
+ DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.RETRY_GROUP_TOPIC_PREFIX
+ + groupName);
+ DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.DLQ_GROUP_TOPIC_PREFIX
+ + groupName);
+ } catch (Exception e) {
+ }
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ adminExt.shutdown();
+ }
+ }
+}