You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:21 UTC

[04/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
new file mode 100644
index 0000000..5f5409b
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.common.protocol.body.KVTable;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerStatusSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "brokerStatus";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Fetch broker runtime status data";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "Broker address");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+
+            String brokerAddr = commandLine.hasOption('b') ? commandLine.getOptionValue('b').trim() : null;
+            String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null;
+            if (brokerAddr != null) {
+                printBrokerRuntimeStats(defaultMQAdminExt, brokerAddr, false);
+            } else if (clusterName != null) {
+                Set<String> masterSet =
+                        CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String ba : masterSet) {
+                    try {
+                        printBrokerRuntimeStats(defaultMQAdminExt, ba, true);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    public void printBrokerRuntimeStats(final DefaultMQAdminExt defaultMQAdminExt, final String brokerAddr, final boolean printBroker) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(brokerAddr);
+
+        TreeMap<String, String> tmp = new TreeMap<String, String>();
+        tmp.putAll(kvTable.getTable());
+
+        Iterator<Entry<String, String>> it = tmp.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, String> next = it.next();
+            if (printBroker) {
+                System.out.printf("%-24s %-32s: %s%n", brokerAddr, next.getKey(), next.getValue());
+            } else {
+                System.out.printf("%-32s: %s%n", next.getKey(), next.getValue());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java
new file mode 100644
index 0000000..c2918c1
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class CleanExpiredCQSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "cleanExpiredCQ";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Clean expired ConsumeQueue on broker.";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "Broker address");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "cluster", true, "clustername");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            boolean result = false;
+            defaultMQAdminExt.start();
+            if (commandLine.hasOption('b')) {
+                String addr = commandLine.getOptionValue('b').trim();
+                result = defaultMQAdminExt.cleanExpiredConsumerQueueByAddr(addr);
+
+            } else {
+                String cluster = commandLine.getOptionValue('c');
+                if (null != cluster)
+                    cluster = cluster.trim();
+                result = defaultMQAdminExt.cleanExpiredConsumerQueue(cluster);
+            }
+            System.out.printf(result ? "success" : "false");
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java
new file mode 100644
index 0000000..f7b543f
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class CleanUnusedTopicCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "cleanUnusedTopic";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Clean unused topic on broker.";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "Broker address");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "cluster", true, "cluster name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            boolean result = false;
+            defaultMQAdminExt.start();
+            if (commandLine.hasOption('b')) {
+                String addr = commandLine.getOptionValue('b').trim();
+                result = defaultMQAdminExt.cleanUnusedTopicByAddr(addr);
+
+            } else {
+                String cluster = commandLine.getOptionValue('c');
+                if (null != cluster)
+                    cluster = cluster.trim();
+                result = defaultMQAdminExt.cleanUnusedTopicByAddr(cluster);
+            }
+            System.out.printf(result ? "success" : "false");
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
new file mode 100644
index 0000000..703d69b
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.admin.MQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author xigu.lx
+ */
+public class GetBrokerConfigCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "getBrokerConfig";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Get broker config by cluster or special broker!";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "update which broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "update which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+
+            if (commandLine.hasOption('b')) {
+                String brokerAddr = commandLine.getOptionValue('b').trim();
+                defaultMQAdminExt.start();
+
+                getAndPrint(defaultMQAdminExt,
+                        String.format("============%s============\n", brokerAddr),
+                        brokerAddr);
+
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+                defaultMQAdminExt.start();
+
+                Map<String, List<String>> masterAndSlaveMap
+                        = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
+
+                for (String masterAddr : masterAndSlaveMap.keySet()) {
+
+                    getAndPrint(
+                            defaultMQAdminExt,
+                            String.format("============Master: %s============\n", masterAddr),
+                            masterAddr
+                    );
+                    for (String slaveAddr : masterAndSlaveMap.get(masterAddr)) {
+
+                        getAndPrint(
+                                defaultMQAdminExt,
+                                String.format("============My Master: %s=====Slave: %s============\n", masterAddr, slaveAddr),
+                                slaveAddr
+                        );
+                    }
+                }
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    protected void getAndPrint(final MQAdminExt defaultMQAdminExt, final String printPrefix, final String addr)
+            throws InterruptedException, RemotingConnectException,
+            UnsupportedEncodingException, RemotingTimeoutException,
+            MQBrokerException, RemotingSendRequestException {
+
+        System.out.print(printPrefix);
+
+        Properties properties = defaultMQAdminExt.getBrokerConfig(addr);
+        if (properties == null) {
+            System.out.printf("Broker[%s] has no config property!\n", addr);
+            return;
+        }
+
+        for (Object key : properties.keySet()) {
+            System.out.printf("%-50s=  %s\n", key, properties.get(key));
+        }
+
+        System.out.printf("%n");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java
new file mode 100644
index 0000000..165e397
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/SendMsgStatusCommand.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.io.UnsupportedEncodingException;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class SendMsgStatusCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "sendMsgStatus";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "send msg to broker.";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerName", true, "Broker Name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("s", "messageSize", true, "Message Size, Default: 128");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "count", true, "send message count, Default: 50");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        final DefaultMQProducer producer = new DefaultMQProducer("PID_SMSC", rpcHook);
+        producer.setInstanceName("PID_SMSC_" + System.currentTimeMillis());
+
+        try {
+            producer.start();
+            String brokerName = commandLine.getOptionValue('b').trim();
+            int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
+            int count = commandLine.hasOption('c') ? Integer.parseInt(commandLine.getOptionValue('c')) : 50;
+
+            producer.send(buildMessage(brokerName, 16));
+
+            for (int i = 0; i < count; i++) {
+                long begin = System.currentTimeMillis();
+                SendResult result = producer.send(buildMessage(brokerName, messageSize));
+                System.out.printf("rt:" + (System.currentTimeMillis() - begin) + "ms, SendResult=" + result);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            producer.shutdown();
+        }
+    }
+
+
+    private static Message buildMessage(final String topic, final int messageSize) throws UnsupportedEncodingException {
+        Message msg = new Message();
+        msg.setTopic(topic);
+
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < messageSize; i += 11) {
+            sb.append("hello jodie");
+        }
+        msg.setBody(sb.toString().getBytes(MixAll.DEFAULT_CHARSET));
+        return msg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
new file mode 100644
index 0000000..86938a7
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class UpdateBrokerConfigSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "updateBrokerConfig";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Update broker's config";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "update which broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "update which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("k", "key", true, "config key");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("v", "value", true, "config value");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            String key = commandLine.getOptionValue('k').trim();
+            String value = commandLine.getOptionValue('v').trim();
+            Properties properties = new Properties();
+            properties.put(key, value);
+
+            if (commandLine.hasOption('b')) {
+                String brokerAddr = commandLine.getOptionValue('b').trim();
+
+                defaultMQAdminExt.start();
+
+                defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties);
+                System.out.printf("update broker config success, %s\n", brokerAddr);
+                return;
+
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+
+                defaultMQAdminExt.start();
+
+                Set<String> masterSet =
+                        CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String brokerAddr : masterSet) {
+                    try {
+                        defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties);
+                        System.out.printf("update broker config success, %s\n", brokerAddr);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+                return;
+            }
+
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
new file mode 100644
index 0000000..3a28522
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.cluster;
+
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * @author fengliang.hfl
+ */
+public class CLusterSendMsgRTCommand implements SubCommand {
+
+    public static void main(String args[]) {
+    }
+
+    @Override
+    public String commandName() {
+        return "clusterRT";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "List All clusters Message Send RT";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("a", "amout", true, "message amout | default 100");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("s", "size", true, "message size | default 128 Byte");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("c", "cluster", true, "cluster name | default display all cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("p", "print log", true, "print as tlog | default false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("m", "machine room", true, "machine room name | default noname");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("i", "interval", true, "print interval | default 10 seconds");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
+        producer.setProducerGroup(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            producer.start();
+
+            ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
+            HashMap<String, Set<String>> clusterAddr = clusterInfoSerializeWrapper
+                    .getClusterAddrTable();
+
+            Set<String> clusterNames = null;
+
+            long amount = !commandLine.hasOption('a') ? 50 : Long.parseLong(commandLine
+                    .getOptionValue('a').trim());
+
+            long size = !commandLine.hasOption('s') ? 128 : Long.parseLong(commandLine
+                    .getOptionValue('s').trim());
+
+            long interval = !commandLine.hasOption('i') ? 10 : Long.parseLong(commandLine
+                    .getOptionValue('i').trim());
+
+            boolean printAsTlog = !commandLine.hasOption('p') ? false : Boolean
+                    .parseBoolean(commandLine.getOptionValue('p').trim());
+
+            String machineRoom = !commandLine.hasOption('m') ? "noname" : commandLine
+                    .getOptionValue('m').trim();
+
+            if (commandLine.hasOption('c')) {
+                clusterNames = new TreeSet<String>();
+                clusterNames.add(commandLine.getOptionValue('c').trim());
+            } else {
+                clusterNames = clusterAddr.keySet();
+            }
+
+            if (!printAsTlog) {
+                System.out.printf("%-24s  %-24s  %-4s  %-8s  %-8s%n",
+                        "#Cluster Name",
+                        "#Broker Name",
+                        "#RT",
+                        "#successCount",
+                        "#failCount"
+                );
+            }
+
+            while (true) {
+                for (String clusterName : clusterNames) {
+                    Set<String> brokerNames = clusterAddr.get(clusterName);
+                    if (brokerNames == null) {
+                        System.out.printf("cluster [%s] not exist", clusterName);
+                        break;
+                    }
+
+                    for (String brokerName : brokerNames) {
+                        Message msg = new Message(brokerName, getStringBySize(size).getBytes(MixAll.DEFAULT_CHARSET));
+                        long start = 0;
+                        long end = 0;
+                        long elapsed = 0;
+                        int successCount = 0;
+                        int failCount = 0;
+
+                        for (int i = 0; i < amount; i++) {
+                            start = System.currentTimeMillis();
+                            try {
+                                producer.send(msg);
+                                successCount++;
+                                end = System.currentTimeMillis();
+                            } catch (Exception e) {
+                                failCount++;
+                                end = System.currentTimeMillis();
+                            }
+
+                            if (i != 0) {
+                                elapsed += end - start;
+                            }
+                        }
+
+                        double rt = (double) elapsed / (amount - 1);
+                        if (!printAsTlog) {
+                            System.out.printf("%-24s  %-24s  %-8s  %-16s  %-16s%n",
+                                    clusterName,
+                                    brokerName,
+                                    String.format("%.2f", rt),
+                                    successCount,
+                                    failCount
+                            );
+                        } else {
+                            System.out.printf(String.format("%s|%s|%s|%s|%s%n", getCurTime(),
+                                    machineRoom, clusterName, brokerName,
+                                    new BigDecimal(rt).setScale(0, BigDecimal.ROUND_HALF_UP)));
+                        }
+
+                    }
+
+                }
+
+                Thread.sleep(interval * 1000);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+            producer.shutdown();
+        }
+    }
+
+    public String getStringBySize(long size) {
+        StringBuilder res = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+            res.append('a');
+        }
+        return res.toString();
+    }
+
+    public String getCurTime() {
+        String fromTimeZone = "GMT+8";
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        Date date = new Date();
+        format.setTimeZone(TimeZone.getTimeZone(fromTimeZone));
+        String chinaDate = format.format(date);
+        return chinaDate;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java
new file mode 100644
index 0000000..baf4f3c
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.cluster;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
+import com.alibaba.rocketmq.common.protocol.body.KVTable;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClusterListSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "clusterList";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "List all of clusters";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("m", "moreStats", false, "Print more stats");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("i", "interval", true, "specify intervals numbers, it is in seconds");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        long printInterval = 1;
+        boolean enableInterval = commandLine.hasOption('i');
+
+        if (enableInterval) {
+            printInterval = Long.parseLong(commandLine.getOptionValue('i')) * 1000;
+        }
+
+        try {
+            defaultMQAdminExt.start();
+            long i = 0;
+
+            do {
+                if (i++ > 0) {
+                    Thread.sleep(printInterval);
+                }
+                if (commandLine.hasOption('m')) {
+                    this.printClusterMoreStats(defaultMQAdminExt);
+                } else {
+                    this.printClusterBaseInfo(defaultMQAdminExt);
+                }
+            } while (enableInterval);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException,
+            RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException {
+
+        ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
+
+        System.out.printf("%-16s  %-32s %14s %14s %14s %14s%n",
+                "#Cluster Name",
+                "#Broker Name",
+                "#InTotalYest",
+                "#OutTotalYest",
+                "#InTotalToday",
+                "#OutTotalToday"
+        );
+
+        Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator();
+        while (itCluster.hasNext()) {
+            Map.Entry<String, Set<String>> next = itCluster.next();
+            String clusterName = next.getKey();
+            TreeSet<String> brokerNameSet = new TreeSet<String>();
+            brokerNameSet.addAll(next.getValue());
+
+            for (String brokerName : brokerNameSet) {
+                BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+                if (brokerData != null) {
+
+                    Iterator<Map.Entry<Long, String>> itAddr = brokerData.getBrokerAddrs().entrySet().iterator();
+                    while (itAddr.hasNext()) {
+                        Map.Entry<Long, String> next1 = itAddr.next();
+                        long inTotalYest = 0;
+                        long outTotalYest = 0;
+                        long inTotalToday = 0;
+                        long outTotalToday = 0;
+
+                        try {
+                            KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(next1.getValue());
+                            String msgPutTotalYesterdayMorning = kvTable.getTable().get("msgPutTotalYesterdayMorning");
+                            String msgPutTotalTodayMorning = kvTable.getTable().get("msgPutTotalTodayMorning");
+                            String msgPutTotalTodayNow = kvTable.getTable().get("msgPutTotalTodayNow");
+                            String msgGetTotalYesterdayMorning = kvTable.getTable().get("msgGetTotalYesterdayMorning");
+                            String msgGetTotalTodayMorning = kvTable.getTable().get("msgGetTotalTodayMorning");
+                            String msgGetTotalTodayNow = kvTable.getTable().get("msgGetTotalTodayNow");
+
+                            inTotalYest = Long.parseLong(msgPutTotalTodayMorning) - Long.parseLong(msgPutTotalYesterdayMorning);
+                            outTotalYest = Long.parseLong(msgGetTotalTodayMorning) - Long.parseLong(msgGetTotalYesterdayMorning);
+
+                            inTotalToday = Long.parseLong(msgPutTotalTodayNow) - Long.parseLong(msgPutTotalTodayMorning);
+                            outTotalToday = Long.parseLong(msgGetTotalTodayNow) - Long.parseLong(msgGetTotalTodayMorning);
+
+                        } catch (Exception e) {
+                        }
+
+                        System.out.printf("%-16s  %-32s %14d %14d %14d %14d%n",
+                                clusterName,
+                                brokerName,
+                                inTotalYest,
+                                outTotalYest,
+                                inTotalToday,
+                                outTotalToday
+                        );
+                    }
+                }
+            }
+
+            if (itCluster.hasNext()) {
+                System.out.printf("");
+            }
+        }
+    }
+
+    private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException,
+            RemotingSendRequestException, InterruptedException, MQBrokerException {
+
+        ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
+
+        System.out.printf("%-16s  %-22s  %-4s  %-22s %-16s %19s %19s %10s %5s %6s%n",
+                "#Cluster Name",
+                "#Broker Name",
+                "#BID",
+                "#Addr",
+                "#Version",
+                "#InTPS(LOAD)",
+                "#OutTPS(LOAD)",
+                "#PCWait(ms)",
+                "#Hour",
+                "#SPACE"
+        );
+
+        Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator();
+        while (itCluster.hasNext()) {
+            Map.Entry<String, Set<String>> next = itCluster.next();
+            String clusterName = next.getKey();
+            TreeSet<String> brokerNameSet = new TreeSet<String>();
+            brokerNameSet.addAll(next.getValue());
+
+            for (String brokerName : brokerNameSet) {
+                BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+                if (brokerData != null) {
+
+                    Iterator<Map.Entry<Long, String>> itAddr = brokerData.getBrokerAddrs().entrySet().iterator();
+                    while (itAddr.hasNext()) {
+                        Map.Entry<Long, String> next1 = itAddr.next();
+                        double in = 0;
+                        double out = 0;
+                        String version = "";
+                        String sendThreadPoolQueueSize = "";
+                        String pullThreadPoolQueueSize = "";
+                        String sendThreadPoolQueueHeadWaitTimeMills = "";
+                        String pullThreadPoolQueueHeadWaitTimeMills = "";
+                        String pageCacheLockTimeMills = "";
+                        String earliestMessageTimeStamp = "";
+                        String commitLogDiskRatio = "";
+                        try {
+                            KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(next1.getValue());
+                            String putTps = kvTable.getTable().get("putTps");
+                            String getTransferedTps = kvTable.getTable().get("getTransferedTps");
+                            sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize");
+                            pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize");
+
+                            sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize");
+                            pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize");
+
+                            sendThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills");
+                            pullThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills");
+                            pageCacheLockTimeMills = kvTable.getTable().get("pageCacheLockTimeMills");
+                            earliestMessageTimeStamp = kvTable.getTable().get("earliestMessageTimeStamp");
+                            commitLogDiskRatio = kvTable.getTable().get("commitLogDiskRatio");
+
+                            version = kvTable.getTable().get("brokerVersionDesc");
+                            {
+                                String[] tpss = putTps.split(" ");
+                                if (tpss != null && tpss.length > 0) {
+                                    in = Double.parseDouble(tpss[0]);
+                                }
+                            }
+
+                            {
+                                String[] tpss = getTransferedTps.split(" ");
+                                if (tpss != null && tpss.length > 0) {
+                                    out = Double.parseDouble(tpss[0]);
+                                }
+                            }
+                        } catch (Exception e) {
+                        }
+
+                        double hour = 0.0;
+                        double space = 0.0;
+
+                        if (earliestMessageTimeStamp != null && earliestMessageTimeStamp.length() > 0) {
+                            long mills = System.currentTimeMillis() - Long.valueOf(earliestMessageTimeStamp);
+                            hour = mills / 1000.0 / 60.0 / 60.0;
+                        }
+
+                        if (commitLogDiskRatio != null && commitLogDiskRatio.length() > 0) {
+                            space = Double.valueOf(commitLogDiskRatio);
+                        }
+
+                        System.out.printf("%-16s  %-22s  %-4s  %-22s %-16s %19s %19s %10s %5s %6s%n",
+                                clusterName,
+                                brokerName,
+                                next1.getKey().longValue(),
+                                next1.getValue(),
+                                version,
+                                String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
+                                String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
+                                pageCacheLockTimeMills,
+                                String.format("%2.2f", hour),
+                                String.format("%.4f", space)
+                        );
+                    }
+                }
+            }
+
+            if (itCluster.hasNext()) {
+                System.out.printf("");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
new file mode 100644
index 0000000..aa0598e
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.connection;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.protocol.body.Connection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerConnectionSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "consumerConnection";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Query consumer's socket connection, client version and subscription";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("g", "consumerGroup", true, "consumer group name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+
+            String group = commandLine.getOptionValue('g').trim();
+
+            ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group);
+
+
+            int i = 1;
+            for (Connection conn : cc.getConnectionSet()) {
+                System.out.printf("%03d  %-32s %-22s %-8s %s%n",
+                        i++,
+                        conn.getClientId(),
+                        conn.getClientAddr(),
+                        conn.getLanguage(),
+                        MQVersion.getVersionDesc(conn.getVersion())
+                );
+            }
+
+            System.out.printf("%nBelow is subscription:");
+            Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator();
+            i = 1;
+            while (it.hasNext()) {
+                Entry<String, SubscriptionData> entry = it.next();
+                SubscriptionData sd = entry.getValue();
+                System.out.printf("%03d  Topic: %-40s SubExpression: %s%n",
+                        i++,
+                        sd.getTopic(),
+                        sd.getSubString()
+                );
+            }
+
+            System.out.printf("");
+            System.out.printf("ConsumeType: %s%n", cc.getConsumeType());
+            System.out.printf("MessageModel: %s%n", cc.getMessageModel());
+            System.out.printf("ConsumeFromWhere: %s%n", cc.getConsumeFromWhere());
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
new file mode 100644
index 0000000..97ba792
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.connection;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.protocol.body.Connection;
+import com.alibaba.rocketmq.common.protocol.body.ProducerConnection;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ProducerConnectionSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "producerConnection";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Query producer's socket connection and client version";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("g", "producerGroup", true, "producer group name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+
+            String group = commandLine.getOptionValue('g').trim();
+            String topic = commandLine.getOptionValue('t').trim();
+
+            ProducerConnection pc = defaultMQAdminExt.examineProducerConnectionInfo(group, topic);
+
+            int i = 1;
+            for (Connection conn : pc.getConnectionSet()) {
+                System.out.printf("%04d  %-32s %-22s %-8s %s%n",
+                        i++,
+                        conn.getClientId(),
+                        conn.getClientAddr(),
+                        conn.getLanguage(),
+                        MQVersion.getVersionDesc(conn.getVersion())
+                );
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
new file mode 100644
index 0000000..d09b74a
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.consumer;
+
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.admin.OffsetWrapper;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
+import com.alibaba.rocketmq.common.protocol.body.TopicList;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerProgressSubCommand implements SubCommand {
+    private final Logger log = ClientLogger.getLog();
+
+    @Override
+    public String commandName() {
+        return "consumerProgress";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Query consumers's progress, speed";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("g", "groupName", true, "consumer group name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            if (commandLine.hasOption('g')) {
+                String consumerGroup = commandLine.getOptionValue('g').trim();
+                ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
+                List<MessageQueue> mqList = new LinkedList<MessageQueue>();
+                mqList.addAll(consumeStats.getOffsetTable().keySet());
+                Collections.sort(mqList);
+
+                System.out.printf("%-32s  %-32s  %-4s  %-20s  %-20s  %-20s  %s%n",
+                        "#Topic",
+                        "#Broker Name",
+                        "#QID",
+                        "#Broker Offset",
+                        "#Consumer Offset",
+                        "#Diff",
+                        "#LastTime");
+
+                long diffTotal = 0L;
+                for (MessageQueue mq : mqList) {
+                    OffsetWrapper offsetWrapper = consumeStats.getOffsetTable().get(mq);
+                    long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
+                    diffTotal += diff;
+                    String lastTime = "";
+                    try {
+                        lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
+                    } catch (Exception e) {
+                    }
+                    System.out.printf("%-32s  %-32s  %-4d  %-20d  %-20d  %-20d  %s%n",
+                            UtilAll.frontStringAtLeast(mq.getTopic(), 32),
+                            UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
+                            mq.getQueueId(),
+                            offsetWrapper.getBrokerOffset(),
+                            offsetWrapper.getConsumerOffset(),
+                            diff,
+                            lastTime
+                    );
+                }
+
+                System.out.printf("%n");
+                System.out.printf("Consume TPS: %s%n", consumeStats.getConsumeTps());
+                System.out.printf("Diff Total: %d%n", diffTotal);
+            } else {
+                System.out.printf("%-32s  %-6s  %-24s %-5s  %-14s  %-7s  %s%n",
+                        "#Group",
+                        "#Count",
+                        "#Version",
+                        "#Type",
+                        "#Model",
+                        "#TPS",
+                        "#Diff Total"
+                );
+                TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
+                for (String topic : topicList.getTopicList()) {
+                    if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                        String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+                        try {
+                            ConsumeStats consumeStats = null;
+                            try {
+                                consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
+                            } catch (Exception e) {
+                                log.warn("examineConsumeStats exception, " + consumerGroup, e);
+                            }
+
+                            ConsumerConnection cc = null;
+                            try {
+                                cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
+                            } catch (Exception e) {
+                                log.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e);
+                            }
+
+                            GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
+                            groupConsumeInfo.setGroup(consumerGroup);
+
+                            if (consumeStats != null) {
+                                groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps());
+                                groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
+                            }
+
+                            if (cc != null) {
+                                groupConsumeInfo.setCount(cc.getConnectionSet().size());
+                                groupConsumeInfo.setMessageModel(cc.getMessageModel());
+                                groupConsumeInfo.setConsumeType(cc.getConsumeType());
+                                groupConsumeInfo.setVersion(cc.computeMinVersion());
+                            }
+
+                            System.out.printf("%-32s  %-6d  %-24s %-5s  %-14s  %-7d  %d%n",
+                                    UtilAll.frontStringAtLeast(groupConsumeInfo.getGroup(), 32),
+                                    groupConsumeInfo.getCount(),
+                                    groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE",
+                                    groupConsumeInfo.consumeTypeDesc(),
+                                    groupConsumeInfo.messageModelDesc(),
+                                    groupConsumeInfo.getConsumeTps(),
+                                    groupConsumeInfo.getDiffTotal()
+                            );
+                        } catch (Exception e) {
+                            log.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + consumerGroup, e);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
+
+
+class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
+    private String group;
+    private int version;
+    private int count;
+    private ConsumeType consumeType;
+    private MessageModel messageModel;
+    private int consumeTps;
+    private long diffTotal;
+
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    public String consumeTypeDesc() {
+        if (this.count != 0) {
+            return this.getConsumeType() == ConsumeType.CONSUME_ACTIVELY ? "PULL" : "PUSH";
+        }
+        return "";
+    }
+
+    public ConsumeType getConsumeType() {
+        return consumeType;
+    }
+
+    public void setConsumeType(ConsumeType consumeType) {
+        this.consumeType = consumeType;
+    }
+
+    public String messageModelDesc() {
+        if (this.count != 0 && this.getConsumeType() == ConsumeType.CONSUME_PASSIVELY) {
+            return this.getMessageModel().toString();
+        }
+        return "";
+    }
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+    public String versionDesc() {
+        if (this.count != 0) {
+            return MQVersion.getVersionDesc(this.version);
+        }
+        return "";
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+    public void setCount(int count) {
+        this.count = count;
+    }
+
+    public long getDiffTotal() {
+        return diffTotal;
+    }
+
+
+    public void setDiffTotal(long diffTotal) {
+        this.diffTotal = diffTotal;
+    }
+
+
+    @Override
+    public int compareTo(GroupConsumeInfo o) {
+        if (this.count != o.count) {
+            return o.count - this.count;
+        }
+
+        return (int) (o.diffTotal - diffTotal);
+    }
+
+
+    public int getConsumeTps() {
+        return consumeTps;
+    }
+
+
+    public void setConsumeTps(int consumeTps) {
+        this.consumeTps = consumeTps;
+    }
+
+
+    public int getVersion() {
+        return version;
+    }
+
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
new file mode 100644
index 0000000..cf796d8
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.consumer;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.protocol.body.Connection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.MQAdminStartup;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerStatusSubCommand implements SubCommand {
+
+    public static void main(String[] args) {
+        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
+        MQAdminStartup.main(new String[]{new ConsumerStatusSubCommand().commandName(), "-g", "benchmark_consumer"});
+    }
+
+    @Override
+    public String commandName() {
+        return "consumerStatus";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Query consumer's internal data structure";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("g", "consumerGroup", true, "consumer group name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("i", "clientId", true, "The consumer's client id");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("s", "jstack", false, "Run jstack command in the consumer progress");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            String group = commandLine.getOptionValue('g').trim();
+            ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group);
+            boolean jstack = commandLine.hasOption('s');
+            if (!commandLine.hasOption('i')) {
+                int i = 1;
+                long now = System.currentTimeMillis();
+                final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable = new TreeMap<String, ConsumerRunningInfo>();
+                for (Connection conn : cc.getConnectionSet()) {
+                    try {
+                        ConsumerRunningInfo consumerRunningInfo =
+                                defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack);
+                        if (consumerRunningInfo != null) {
+                            criTable.put(conn.getClientId(), consumerRunningInfo);
+                            String filePath = now + "/" + conn.getClientId();
+                            MixAll.string2FileNotSafe(consumerRunningInfo.formatString(), filePath);
+                            System.out.printf("%03d  %-40s %-20s %s%n",
+                                    i++,
+                                    conn.getClientId(),
+                                    MQVersion.getVersionDesc(conn.getVersion()),
+                                    filePath);
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+
+                if (!criTable.isEmpty()) {
+                    boolean subSame = ConsumerRunningInfo.analyzeSubscription(criTable);
+
+                    boolean rebalanceOK = subSame && ConsumerRunningInfo.analyzeRebalance(criTable);
+
+                    if (subSame) {
+                        System.out.printf("%n%nSame subscription in the same group of consumer");
+                        System.out.printf("%n%nRebalance %s%n", rebalanceOK ? "OK" : "Failed");
+                        Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
+                        while (it.hasNext()) {
+                            Entry<String, ConsumerRunningInfo> next = it.next();
+                            String result =
+                                    ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
+                            if (result.length() > 0) {
+                                System.out.printf(result);
+                            }
+                        }
+                    } else {
+                        System.out.printf("%n%nWARN: Different subscription in the same group of consumer!!!");
+                    }
+                }
+            } else {
+                String clientId = commandLine.getOptionValue('i').trim();
+                ConsumerRunningInfo consumerRunningInfo =
+                        defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack);
+                if (consumerRunningInfo != null) {
+                    System.out.printf("%s", consumerRunningInfo.formatString());
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java
new file mode 100644
index 0000000..373da1e
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerSubCommand.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.consumer;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.protocol.body.Connection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
+import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.MQAdminStartup;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerSubCommand implements SubCommand {
+
+    public static void main(String[] args) {
+        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
+        MQAdminStartup.main(new String[]{new ConsumerSubCommand().commandName(), "-g", "benchmark_consumer"});
+    }
+
+    @Override
+    public String commandName() {
+        return "consumer";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Query consumer's connection, status, etc.";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("g", "consumerGroup", true, "consumer group name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("s", "jstack", false, "Run jstack command in the consumer progress");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            String group = commandLine.getOptionValue('g').trim();
+            ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group);
+            boolean jstack = commandLine.hasOption('s');
+
+            if (!commandLine.hasOption('i')) {
+
+                int i = 1;
+                long now = System.currentTimeMillis();
+                final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable =
+                        new TreeMap<String, ConsumerRunningInfo>();
+                for (Connection conn : cc.getConnectionSet()) {
+                    try {
+                        ConsumerRunningInfo consumerRunningInfo =
+                                defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack);
+                        if (consumerRunningInfo != null) {
+                            criTable.put(conn.getClientId(), consumerRunningInfo);
+                            String filePath = now + "/" + conn.getClientId();
+                            MixAll.string2FileNotSafe(consumerRunningInfo.formatString(), filePath);
+                            System.out.printf("%03d  %-40s %-20s %s%n",
+                                    i++,
+                                    conn.getClientId(),
+                                    MQVersion.getVersionDesc(conn.getVersion()),
+                                    filePath);
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+
+                if (!criTable.isEmpty()) {
+                    boolean subSame = ConsumerRunningInfo.analyzeSubscription(criTable);
+                    boolean rebalanceOK = subSame && ConsumerRunningInfo.analyzeRebalance(criTable);
+
+                    if (subSame) {
+                        System.out.printf("%n%nSame subscription in the same group of consumer");
+                        System.out.printf("%n%nRebalance %s%n", rebalanceOK ? "OK" : "Failed");
+
+                        Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
+                        while (it.hasNext()) {
+                            Entry<String, ConsumerRunningInfo> next = it.next();
+                            String result =
+                                    ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
+                            if (result.length() > 0) {
+                                System.out.printf(result);
+                            }
+                        }
+                    } else {
+                        System.out.printf("%n%nWARN: Different subscription in the same group of consumer!!!");
+                    }
+                }
+            } else {
+                String clientId = commandLine.getOptionValue('i').trim();
+                ConsumerRunningInfo consumerRunningInfo =
+                        defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack);
+                if (consumerRunningInfo != null) {
+                    System.out.printf(consumerRunningInfo.formatString());
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
new file mode 100644
index 0000000..712a0d0
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.consumer;
+
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import com.alibaba.rocketmq.tools.command.topic.DeleteTopicSubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Set;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class DeleteSubscriptionGroupCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "deleteSubGroup";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Delete subscription group from broker.";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "delete subscription group from which broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "delete subscription group from which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("g", "groupName", true, "subscription group name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
+        adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            // groupName
+            String groupName = commandLine.getOptionValue('g').trim();
+
+            if (commandLine.hasOption('b')) {
+                String addr = commandLine.getOptionValue('b').trim();
+                adminExt.start();
+
+                adminExt.deleteSubscriptionGroup(addr, groupName);
+                System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName,
+                        addr);
+
+                return;
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+                adminExt.start();
+
+                Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+                for (String master : masterSet) {
+                    adminExt.deleteSubscriptionGroup(master, groupName);
+                    System.out.printf(
+                            "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n",
+                            groupName, master, clusterName);
+                }
+
+                try {
+                    DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.RETRY_GROUP_TOPIC_PREFIX
+                            + groupName);
+                    DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.DLQ_GROUP_TOPIC_PREFIX
+                            + groupName);
+                } catch (Exception e) {
+                }
+                return;
+            }
+
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            adminExt.shutdown();
+        }
+    }
+}