You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/14 06:52:24 UTC

[rocketmq] 03/07: feature(logging&tools&distribution&docs):[RIP-31][PART-A]Support RocketMQ BrokerContainer (#3977)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-tmp
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit fb9e3f05bb96ca7a951baa83b5cd3891d568ee44
Author: rongtong <ji...@163.com>
AuthorDate: Mon Mar 14 14:06:24 2022 +0800

    feature(logging&tools&distribution&docs):[RIP-31][PART-A]Support RocketMQ BrokerContainer (#3977)
---
 distribution/bin/{mqshutdown => mqbrokercontainer} |   46 +-
 distribution/bin/mqshutdown                        |   14 +
 .../2container-2m-2s/broker-a-in-container1.conf   |   20 +
 .../2container-2m-2s/broker-a-in-container2.conf   |   20 +
 .../2container-2m-2s/broker-b-in-container1.conf   |   20 +
 .../2container-2m-2s/broker-b-in-container2.conf   |   20 +
 .../2container-2m-2s/broker-container1.conf        |    9 +
 .../2container-2m-2s/broker-container2.conf        |    9 +
 .../container/2container-2m-2s/nameserver.conf     |    1 +
 distribution/conf/container/broker-a.conf          |   19 +
 distribution/conf/container/broker-b.conf          |   19 +
 distribution/conf/container/broker-container.conf  |    9 +
 distribution/pom.xml                               |    4 +
 distribution/release.xml                           |    1 +
 docs/cn/BrokerContainer.md                         |  152 +++
 docs/cn/QuorumACK.md                               |   70 ++
 docs/cn/README.md                                  |   12 +-
 docs/cn/SlaveActingMasterMode.md                   |  161 +++
 .../rocketmq/logging/InternalLoggerFactory.java    |   11 +
 .../rocketmq/logging/Slf4jLoggerFactory.java       |   87 +-
 .../org/apache/rocketmq/logging/inner/Level.java   |    1 +
 .../apache/rocketmq/logging/inner/SysLogger.java   |    4 +-
 .../rocketmq/logging/Slf4jLoggerFactoryTest.java   |    2 +-
 .../rocketmq/logging/inner/LoggingBuilderTest.java |    8 +-
 srvutil/pom.xml                                    |    8 +
 .../apache/rocketmq/srvutil/FileWatchService.java  |    4 +-
 .../rocketmq/util/cache/CacheEvictHandler.java     |   23 +
 .../apache/rocketmq/util/cache/CacheObject.java    |   36 +
 .../rocketmq/util/cache/ExpiredLocalCache.java     |   84 ++
 .../org/apache/rocketmq/util/cache/LocalCache.java |   58 ++
 .../apache/rocketmq/util/cache/LockManager.java    |   54 +
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  157 ++-
 .../tools/admin/DefaultMQAdminExtImpl.java         | 1098 ++++++++++++++------
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   71 +-
 .../tools/admin/api/BrokerOperatorResult.java      |   50 +
 .../tools/admin/common/AdminToolHandler.java       |   21 +
 .../tools/admin/common/AdminToolResult.java        |   76 ++
 .../admin/common/AdminToolsResultCodeEnum.java     |   43 +
 .../apache/rocketmq/tools/command/CommandUtil.java |   17 +-
 .../rocketmq/tools/command/MQAdminStartup.java     |   18 +-
 .../command/broker/GetBrokerConfigCommand.java     |   29 +-
 .../broker/ResetMasterFlushOffsetSubCommand.java   |   71 ++
 .../broker/UpdateBrokerConfigSubCommand.java       |   16 +-
 .../command/cluster/CLusterSendMsgRTCommand.java   |    4 +-
 .../command/cluster/ClusterListSubCommand.java     |  106 +-
 .../consumer/ConsumerProgressSubCommand.java       |   17 +-
 .../consumer/DeleteSubscriptionGroupCommand.java   |   19 +-
 .../consumer/GetConsumerConfigSubCommand.java      |    2 +-
 .../command/container/AddBrokerSubCommand.java     |   66 ++
 .../command/container/RemoveBrokerSubCommand.java  |   79 ++
 .../tools/command/ha/HAStatusSubCommand.java       |  151 +++
 .../command/offset/ResetOffsetByTimeCommand.java   |   34 +
 .../tools/admin/DefaultMQAdminExtTest.java         |    4 +-
 .../rocketmq/tools/command/CommandUtilTest.java    |    2 +-
 .../message/QueryMsgByUniqueKeySubCommandTest.java |   18 +-
 55 files changed, 2655 insertions(+), 500 deletions(-)

diff --git a/distribution/bin/mqshutdown b/distribution/bin/mqbrokercontainer
similarity index 51%
copy from distribution/bin/mqshutdown
copy to distribution/bin/mqbrokercontainer
index d2d51fc..0ce383f 100644
--- a/distribution/bin/mqshutdown
+++ b/distribution/bin/mqbrokercontainer
@@ -15,35 +15,31 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-case $1 in
-    broker)
-
-    pid=`ps ax | grep -i 'org.apache.rocketmq.broker.BrokerStartup' |grep java | grep -v grep | awk '{print $1}'`
-    if [ -z "$pid" ] ; then
-            echo "No mqbroker running."
-            exit -1;
+if [ -z "$ROCKETMQ_HOME" ] ; then
+  ## resolve links - $0 may be a link to maven's home
+  PRG="$0"
+
+  # need this for relative symlinks
+  while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+      PRG="$link"
+    else
+      PRG="`dirname "$PRG"`/$link"
     fi
+  done
 
-    echo "The mqbroker(${pid}) is running..."
-
-    kill ${pid}
+  saveddir=`pwd`
 
-    echo "Send shutdown request to mqbroker(${pid}) OK"
-    ;;
-    namesrv)
+  ROCKETMQ_HOME=`dirname "$PRG"`/..
 
-    pid=`ps ax | grep -i 'org.apache.rocketmq.namesrv.NamesrvStartup' |grep java | grep -v grep | awk '{print $1}'`
-    if [ -z "$pid" ] ; then
-            echo "No mqnamesrv running."
-            exit -1;
-    fi
+  # make it fully qualified
+  ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`
 
-    echo "The mqnamesrv(${pid}) is running..."
+  cd "$saveddir"
+fi
 
-    kill ${pid}
+export ROCKETMQ_HOME
 
-    echo "Send shutdown request to mqnamesrv(${pid}) OK"
-    ;;
-    *)
-    echo "Useage: mqshutdown broker | namesrv"
-esac
+sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.container.BrokerContainerStartup $@
diff --git a/distribution/bin/mqshutdown b/distribution/bin/mqshutdown
index d2d51fc..d91fce9 100644
--- a/distribution/bin/mqshutdown
+++ b/distribution/bin/mqshutdown
@@ -30,6 +30,20 @@ case $1 in
 
     echo "Send shutdown request to mqbroker(${pid}) OK"
     ;;
+    brokerContainer)
+
+    pid=`ps ax | grep -i 'org.apache.rocketmq.container.BrokerContainerStartup' |grep java | grep -v grep | awk '{print $1}'`
+    if [ -z "$pid" ] ; then
+            echo "No broker container running."
+            exit -1;
+    fi
+
+    echo "The broker container(${pid}) is running..."
+
+    kill ${pid}
+
+    echo "Send shutdown request to broker container(${pid}) OK"
+    ;;
     namesrv)
 
     pid=`ps ax | grep -i 'org.apache.rocketmq.namesrv.NamesrvStartup' |grep java | grep -v grep | awk '{print $1}'`
diff --git a/distribution/conf/container/2container-2m-2s/broker-a-in-container1.conf b/distribution/conf/container/2container-2m-2s/broker-a-in-container1.conf
new file mode 100644
index 0000000..6421fc8
--- /dev/null
+++ b/distribution/conf/container/2container-2m-2s/broker-a-in-container1.conf
@@ -0,0 +1,20 @@
+#Master配置
+brokerClusterName=DefaultCluster
+brokerName=broker-a
+brokerId=0
+brokerRole=SYNC_MASTER
+flushDiskType=ASYNC_FLUSH
+storePathRootDir=/root/broker-a/store
+storePathCommitLog=/root/broker-a/store/commitlog
+listenPort=10911
+haListenPort=10912
+totalReplicas=2
+inSyncReplicas=2
+minInSyncReplicas=1
+enableAutoInSyncReplicas=true
+slaveReadEnable=true
+brokerHeartbeatInterval=1000
+brokerNotActiveTimeoutMillis=5000
+sendHeartbeatTimeoutMillis=1000
+enableSlaveActingMaster=true
+isolateLogEnable=true
\ No newline at end of file
diff --git a/distribution/conf/container/2container-2m-2s/broker-a-in-container2.conf b/distribution/conf/container/2container-2m-2s/broker-a-in-container2.conf
new file mode 100644
index 0000000..4be2249
--- /dev/null
+++ b/distribution/conf/container/2container-2m-2s/broker-a-in-container2.conf
@@ -0,0 +1,20 @@
+#Master配置
+brokerClusterName=DefaultCluster
+brokerName=broker-a
+brokerId=1
+brokerRole=SLAVE
+flushDiskType=ASYNC_FLUSH
+storePathRootDir=/root/broker-a/store
+storePathCommitLog=/root/broker-a/store/commitlog
+listenPort=10911
+haListenPort=10912
+totalReplicas=2
+inSyncReplicas=2
+minInSyncReplicas=1
+enableAutoInSyncReplicas=true
+slaveReadEnable=true
+brokerHeartbeatInterval=1000
+brokerNotActiveTimeoutMillis=5000
+sendHeartbeatTimeoutMillis=1000
+enableSlaveActingMaster=true
+isolateLogEnable=true
\ No newline at end of file
diff --git a/distribution/conf/container/2container-2m-2s/broker-b-in-container1.conf b/distribution/conf/container/2container-2m-2s/broker-b-in-container1.conf
new file mode 100644
index 0000000..35d469a
--- /dev/null
+++ b/distribution/conf/container/2container-2m-2s/broker-b-in-container1.conf
@@ -0,0 +1,20 @@
+#Slave配置
+brokerClusterName=DefaultCluster
+brokerName=broker-b
+brokerId=1
+brokerRole=SLAVE
+flushDiskType=ASYNC_FLUSH
+storePathRootDir=/root/broker-b/store
+storePathCommitLog=/root/broker-b/store/commitlog
+listenPort=20911
+haListenPort=20912
+totalReplicas=2
+inSyncReplicas=2
+minInSyncReplicas=1
+enableAutoInSyncReplicas=true
+slaveReadEnable=true
+brokerHeartbeatInterval=1000
+brokerNotActiveTimeoutMillis=5000
+sendHeartbeatTimeoutMillis=1000
+enableSlaveActingMaster=true
+isolateLogEnable=true
\ No newline at end of file
diff --git a/distribution/conf/container/2container-2m-2s/broker-b-in-container2.conf b/distribution/conf/container/2container-2m-2s/broker-b-in-container2.conf
new file mode 100644
index 0000000..1594975
--- /dev/null
+++ b/distribution/conf/container/2container-2m-2s/broker-b-in-container2.conf
@@ -0,0 +1,20 @@
+#Slave配置
+brokerClusterName=DefaultCluster
+brokerName=broker-b
+brokerId=0
+brokerRole=SYNC_MASTER
+flushDiskType=ASYNC_FLUSH
+storePathRootDir=/root/broker-b/store
+storePathCommitLog=/root/broker-b/store/commitlog
+listenPort=20911
+haListenPort=20912
+totalReplicas=2
+inSyncReplicas=2
+minInSyncReplicas=1
+enableAutoInSyncReplicas=true
+slaveReadEnable=true
+brokerHeartbeatInterval=1000
+brokerNotActiveTimeoutMillis=5000
+sendHeartbeatTimeoutMillis=1000
+enableSlaveActingMaster=true
+isolateLogEnable=true
\ No newline at end of file
diff --git a/distribution/conf/container/2container-2m-2s/broker-container1.conf b/distribution/conf/container/2container-2m-2s/broker-container1.conf
new file mode 100644
index 0000000..e5b7aac
--- /dev/null
+++ b/distribution/conf/container/2container-2m-2s/broker-container1.conf
@@ -0,0 +1,9 @@
+#配置端口,用于接收mqadmin命令
+listenPort=10811
+#指定namesrv
+namesrvAddr=172.22.144.49:9876
+#或指定自动获取namesrv
+fetchNamesrvAddrByAddressServer=false
+#指定要向BrokerContainer内添加的brokerConfig路径,多个config间用“:”分隔;
+#不指定则只启动BrokerConainer,具体broker可通过mqadmin工具添加
+brokerConfigPaths=/root/2container-2m-2s/broker-a-in-container1.conf:/root/2container-2m-2s/broker-b-in-container1.conf
\ No newline at end of file
diff --git a/distribution/conf/container/2container-2m-2s/broker-container2.conf b/distribution/conf/container/2container-2m-2s/broker-container2.conf
new file mode 100644
index 0000000..1d4f1c3
--- /dev/null
+++ b/distribution/conf/container/2container-2m-2s/broker-container2.conf
@@ -0,0 +1,9 @@
+#配置端口,用于接收mqadmin命令
+listenPort=10811
+#指定namesrv
+namesrvAddr=172.22.144.49:9876
+#或指定自动获取namesrv
+fetchNamesrvAddrByAddressServer=false
+#指定要向BrokerContainer内添加的brokerConfig路径,多个config间用“:”分隔;
+#不指定则只启动BrokerConainer,具体broker可通过mqadmin工具添加
+brokerConfigPaths=/root/2container-2m-2s/broker-a-in-container2.conf:/root/2container-2m-2s/broker-b-in-container2.conf
\ No newline at end of file
diff --git a/distribution/conf/container/2container-2m-2s/nameserver.conf b/distribution/conf/container/2container-2m-2s/nameserver.conf
new file mode 100644
index 0000000..d716e7f
--- /dev/null
+++ b/distribution/conf/container/2container-2m-2s/nameserver.conf
@@ -0,0 +1 @@
+supportActingMaster=true
\ No newline at end of file
diff --git a/distribution/conf/container/broker-a.conf b/distribution/conf/container/broker-a.conf
new file mode 100644
index 0000000..5324f26
--- /dev/null
+++ b/distribution/conf/container/broker-a.conf
@@ -0,0 +1,19 @@
+#Master配置
+brokerClusterName=DefaultCluster
+brokerName=broker-a
+brokerId=0
+brokerRole=SYNC_MASTER
+flushDiskType=ASYNC_FLUSH
+storePathRootDir=/disk1/rocketmq/broker-a/store
+storePathCommitLog=/disk1/rocketmq/broker-a/store/commitlog
+listenPort=10911
+haListenPort=10912
+totalReplicas=2
+inSyncReplicas=2
+minInSyncReplicas=1
+enableAutoInSyncReplicas=true
+slaveReadEnable=true
+brokerHeartbeatInterval=1000
+brokerNotActiveTimeoutMillis=5000
+sendHeartbeatTimeoutMillis=1000
+enableSlaveActingMaster=true
\ No newline at end of file
diff --git a/distribution/conf/container/broker-b.conf b/distribution/conf/container/broker-b.conf
new file mode 100644
index 0000000..82ca2ea
--- /dev/null
+++ b/distribution/conf/container/broker-b.conf
@@ -0,0 +1,19 @@
+#Slave配置
+brokerClusterName=DefaultCluster
+brokerName=broker-b
+brokerId=1
+brokerRole=SLAVE
+flushDiskType=ASYNC_FLUSH
+storePathRootDir=/disk2/rocketmq/broker-b/store
+storePathCommitLog=/disk2/rocketmq/broker-b/store/commitlog
+listenPort=20911
+haListenPort=20912
+totalReplicas=2
+inSyncReplicas=2
+minInSyncReplicas=1
+enableAutoInSyncReplicas=true
+slaveReadEnable=true
+brokerHeartbeatInterval=1000
+brokerNotActiveTimeoutMillis=5000
+sendHeartbeatTimeoutMillis=1000
+enableSlaveActingMaster=true
\ No newline at end of file
diff --git a/distribution/conf/container/broker-container.conf b/distribution/conf/container/broker-container.conf
new file mode 100644
index 0000000..4798eae
--- /dev/null
+++ b/distribution/conf/container/broker-container.conf
@@ -0,0 +1,9 @@
+#配置端口,用于接收mqadmin命令
+listenPort=10811
+#指定namesrv
+namesrvAddr=127.0.0.1:9876
+#或指定自动获取namesrv
+fetchNamesrvAddrByAddressServer=true
+#指定要向BrokerContainer内添加的brokerConfig路径,多个config间用“:”分隔;
+#不指定则只启动BrokerConainer,具体broker可通过mqadmin工具添加
+brokerConfigPaths=/home/admin/broker-a.conf:/home/admin/broker-b.conf
\ No newline at end of file
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 8059a9a..439e477 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -32,6 +32,10 @@
             <dependencies>
                 <dependency>
                     <groupId>org.apache.rocketmq</groupId>
+                    <artifactId>rocketmq-container</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.rocketmq</groupId>
                     <artifactId>rocketmq-broker</artifactId>
                 </dependency>
                 <dependency>
diff --git a/distribution/release.xml b/distribution/release.xml
index fd6e3db..b3a9e5e 100644
--- a/distribution/release.xml
+++ b/distribution/release.xml
@@ -61,6 +61,7 @@
         <moduleSet>
             <useAllReactorProjects>true</useAllReactorProjects>
             <includes>
+                <include>org.apache.rocketmq:rocketmq-container</include>
                 <include>org.apache.rocketmq:rocketmq-broker</include>
                 <include>org.apache.rocketmq:rocketmq-tools</include>
                 <include>org.apache.rocketmq:rocketmq-client</include>
diff --git a/docs/cn/BrokerContainer.md b/docs/cn/BrokerContainer.md
new file mode 100644
index 0000000..94a2355
--- /dev/null
+++ b/docs/cn/BrokerContainer.md
@@ -0,0 +1,152 @@
+# BrokerContainer
+
+## 背景
+
+在RocketMQ 4.x版本中,一个进程只有一个broker,通常会以主备或者DLedger(Raft)的形式部署,但是一个进程中只有一个broker,而slave一般只承担冷备或热备的作用,节点之间角色的不对等导致slave节点资源没有充分被利用。
+因此在RocketMQ 5.x版本中,提供一种新的模式BrokerContainer,在一个BrokerContainer进程中可以加入多个Broker(Master Broker、Slave Broker、DLedger Broker),来提高单个节点的资源利用率,并且可以通过各种形式的交叉部署来实现节点之间的对等部署。
+该特性的优点包括:
+
+1. 一个BrokerContainer进程中可以加入多个broker,通过进程内混部来提高单个节点的资源利用率
+2. 通过各种形式的交叉部署来实现节点之间的对等部署,增强单节点的高可用能力
+3. 利用BrokerContainer可以实现单进程内多CommitLog写入,也可以实现单机的多磁盘写入
+4. BrokerContainer中的CommitLog天然隔离的,不同的CommitLog(broker)可以采取不同作用,比如可以用来比如创建单独的broker做不同TTL的CommitLog。
+
+## 架构
+
+### 单进程视图
+
+![](https://s4.ax1x.com/2022/01/26/7LMZHP.png)
+
+相比于原来一个Broker一个进程,RocketMQ 5.0将增加BrokerContainer概念,一个BrokerContainer可以存放多个Broker,每个Broker拥有不同的端口,但它们共享同一个传输层(remoting层),而每一个broker在功能上是完全独立的。BrokerContainer也拥有自己端口,在运行时可以通过admin命令来增加或减少Broker。
+
+### 对等部署形态
+
+在BrokerContainer模式下,可以通过各种形式的交叉部署完成节点的对等部署
+
+- 二副本对等部署
+
+![](https://s4.ax1x.com/2022/01/26/7LQi5T.png)
+
+二副本对等部署情况下,每个节点都会有一主一备,资源利用率均等。另外假设图中Node1宕机,由于Node2的broker_2可读可写,broker_1可以备读,因此普通消息的收发不会收到影响,单节点的高可用能力得到了增强。
+
+- 三副本对等部署
+
+![](https://s4.ax1x.com/2022/01/26/7LQMa6.png)
+
+三副本对等部署情况下,每个节点都会有一主两备,资源利用率均等。此外,和二副本一样,任意一个节点的宕机也不会影响到普通消息的收发。
+
+### 传输层共享
+
+![](https://s4.ax1x.com/2022/02/07/HMNIVs.png)
+
+BrokerContainer中的所有broker共享同一个传输层,就像RocketMQ客户端中同进程的Consumer和Producer共享同一个传输层一样。
+
+这里为NettyRemotingServer提供SubRemotingServer支持,通过为一个RemotingServer绑定另一个端口即可生成SubRemotingServer,其共享NettyRemotingServer的Netty实例、计算资源、以及协议栈等,但拥有不同的端口以及ProcessorTable。另外同一个BrokerContainer中的所有的broker也会共享同一个BrokerOutAPI(RemotingClient)。
+
+## 启动方式和配置
+
+![](https://s4.ax1x.com/2022/01/26/7LQ1PO.png)
+
+像Broker启动利用BrokerStartup一样,使用BrokerContainerStartup来启动BrokerContainer。我们可以通过两种方式向BrokerContainer中增加broker,一种是通过启动时通过在配置文件中指定
+
+BrokerContainer配置文件内容主要是Netty网络层参数(由于传输层共享),BrokerContainer的监听端口、namesrv配置,以及最重要的brokerConfigPaths参数,brokerConfigPaths是指需要向BrokerContainer内添加的brokerConfig路径,多个config间用“:”分隔,不指定则只启动BrokerConainer,具体broker可通过mqadmin工具添加
+
+broker-container.conf(distribution/conf/container/broker-container.conf):
+
+```
+#配置端口,用于接收mqadmin命令
+listenPort=10811
+#指定namesrv
+namesrvAddr=127.0.0.1:9876
+#或指定自动获取namesrv
+fetchNamesrvAddrByAddressServer=true
+#指定要向BrokerContainer内添加的brokerConfig路径,多个config间用“:”分隔;
+#不指定则只启动BrokerConainer,具体broker可通过mqadmin工具添加
+brokerConfigPaths=/home/admin/broker-a.conf:/home/admin/broker-b.conf
+```
+broker的配置和以前一样,但在BrokerContainer模式下broker配置文件中下Netty网络层参数和nameserver参数不生效,均使用BrokerContainer的配置参数。
+
+完成配置文件后,可以以如下命令启动
+```
+sh mqbrokercontainer -c broker-container.conf
+```
+mqbrokercontainer脚本路径为distribution/bin/mqbrokercontainer。
+
+## 运行时增加或较少Broker
+
+当BrokerContainer进程启动后,也可以通过Admin命令来增加或减少Broker。
+
+AddBrokerCommand
+```
+usage: mqadmin addBroker -b <arg> -c <arg> [-h] [-n <arg>]
+ -b,--brokerConfigPath <arg>      Broker config path
+ -c,--brokerContainerAddr <arg>   Broker container address
+ -h,--help                        Print help
+ -n,--namesrvAddr <arg>           Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
+```
+
+RemoveBroker Command
+```
+usage: mqadmin removeBroker -b <arg> -c <arg> [-h] [-n <arg>]
+ -b,--brokerIdentity <arg>        Information to identify a broker: clusterName:brokerName:brokerId
+ -c,--brokerContainerAddr <arg>   Broker container address
+ -h,--help                        Print help
+ -n,--namesrvAddr <arg>           Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
+```
+
+## 存储变化
+
+storePathRootDir, storePathCommitLog路径依然为MessageStoreConfig中配置值,需要注意的是同一个brokerContainer中的broker不能使用相同的storePathRootDir, storePathCommitLog,否则不同的broker占用同一个存储目录,发生数据混乱。
+
+在文件删除策略上,仍然单个Broker的视角来进行删除,但MessageStoreConfig新增replicasPerDiskPartition参数和logicalDiskSpaceCleanForciblyThreshold。
+
+replicasPerDiskPartition表示同一磁盘分区上有多少个副本,即该broker的存储目录所在的磁盘分区被几个broker共享,默认值为1。该配置用于计算当同一节点上的多个broker共享同一磁盘分区时,各broker的磁盘配额
+
+e.g. replicasPerDiskPartition==2且broker所在磁盘空间为1T时,则该broker磁盘配额为512G,该broker的逻辑磁盘空间利用率基于512G的空间进行计算。
+
+logicalDiskSpaceCleanForciblyThreshold,该值只在quotaPercentForDiskPartition小于1时生效,表示逻辑磁盘空间强制清理阈值,默认为0.80(80%), 逻辑磁盘空间利用率为该broker在自身磁盘配额内的空间利用率,物理磁盘空间利用率为该磁盘分区总空间利用率。由于在BrokerContainer实现中,考虑计算效率的情况下,仅统计了commitLog+consumeQueue(+ BCQ)+indexFile作为broker的存储空间占用,其余文件如元数据、消费进度、磁盘脏数据等未统计在内,故在多个broker存储空间达到动态平衡时,各broker所占空间可能有相差,以一个BrokerContainer中有两个broker为例,两broker存储空间差异可表示为:
+![](https://s4.ax1x.com/2022/01/26/7L14v4.png)
+其中,R_logical为logicalDiskSpaceCleanForciblyThreshold,R_phy为diskSpaceCleanForciblyRatio,T为磁盘分区总空间,x为除上述计算的broker存储空间外的其他文件所占磁盘总空间比例,可见,当
+![](https://s4.ax1x.com/2022/01/26/7L1TbR.png)
+时,可保证BrokerContainer各Broker存储空间在达到动态平衡时相差无几。
+
+eg.假设broker获取到的配额是500g(根据replicasPerDiskPartition计算获得),logicalDiskSpaceCleanForciblyThreshold为默认值0.8,则默认commitLog+consumeQueue(+ BCQ)+indexFile总量超过400g就会强制清理文件。
+
+其他清理阈值(diskSpaceCleanForciblyRatio、diskSpaceWarningLevelRatio),文件保存时间(fileReservedTime)等逻辑与之前保持一致。
+
+注意:当以普通broker方式启动而非brokerContainer启动时,且replicasPerDiskPartition=1(默认值)时,清理逻辑与之前完全一致。replicasPerDiskPartition>1时,逻辑磁盘空间强制清理阈值logicalDiskSpaceCleanForciblyThreshold将会生效。
+
+
+## 日志变化
+
+在BrokerContainer模式下并开启日志分离后,日志的默认输出路径将发生变化,每个broker日志的具体路径变化为
+```
+{user.home}/logs/{$brokerCanonicalName}_rocketmqlogs/
+```
+
+其中brokerCanonicalName为{BrokerClusterName_BrokerName_BrokerId},{BrokerClusterName_BrokerName_BrokerId}。
+
+**开发者需要注意!**
+
+在BrokerContainer模式下,多个broker会在同一个BrokerContainer进程中,因此所有broker的日志将会输出到同一个日志文件中,BrokerContainer模式下将提供broker日志分离功能,通过BrokerConfig配置isolateLogEnable=true进行开启,开启后不同broker的日志将会输出到不同文件中。
+
+主要通过线程名(ThreadName)或者通过设置线程本地变量(ThreadLocal)来区分不同broker线程,并且hack logback的logAppender将日志重定向到不同的文件中。
+
+通过设置线程名来区分不同broker线程,线程名前缀必须是#BrokerClusterName_BrokerName_BrokerId#
+
+通过设置线程本地变量区分不同broker线程,设置的变量为BrokerClusterName_BrokerName_BrokerId
+```java
+// set threadlocal broker identity to forward logging to corresponding broker
+InnerLoggerFactory.brokerIdentity.set(brokerIdentity.getCanonicalName())
+```
+
+如果线程没有上述区分,日志将仍然会输出在原来的目录下。
+
+以普通方式启动Broker(非BrokerContainer模式)时,日志将仍然会输出在原来的目录下。
+
+具体实现方式可以参考Slf4jLoggerFactory和BrokerLogbackConfigurator两个类。
+
+通过线程名和线程本地变量区分可以参考org.apache.rocketmq.common.AbstractBrokerRunnable、org.apache.rocketmq.common.ThreadFactoryImpl以及各个ServiceThread中getServiceName的实现。
+
+
+参考文档: [原RIP](https://github.com/apache/rocketmq/wiki/RIP-31-Support-RocketMQ-BrokerContainer)
\ No newline at end of file
diff --git a/docs/cn/QuorumACK.md b/docs/cn/QuorumACK.md
new file mode 100644
index 0000000..be609c6
--- /dev/null
+++ b/docs/cn/QuorumACK.md
@@ -0,0 +1,70 @@
+# Quorum Write和自动降级
+
+## 背景
+
+![](https://s4.ax1x.com/2022/02/05/HnWo2d.png)
+
+在RocketMQ中,主备之间的复制模式主要有同步复制和异步复制,如上图所示,Slave1的复制是同步的,在向Producer报告成功写入之前,Master需要等待Slave1成功复制该消息并确认,Slave2的复制是异步的,Master不需要等待Slave2的响应。在RocketMQ中,发送一条消息,如果一切都顺利,那最后会返回给Producer客户端一个PUT_OK的状态,如果是Slave同步超时则返回FLUSH_SLAVE_TIMEOUT状态,如果是Slave不可用或者Slave与Master之间CommitLog差距超过一定的值(默认是256MB),则返回SLAVE_NOT_AVAILABLE,后面两个状态并不会导致系统异常而无法写入下一条消息。
+
+同步复制可以保证Master失效后,数据仍然能在Slave中找到,适合可靠性要求较高的场景。异步复制虽然消息可能会丢失,但是由于无需等待Slave的确认,效率上要高于同步复制,适合对效率有一定要求的场景。但是只有两种模式仍然不够灵活,比如在三副本甚至五副本且对可靠性要求高场景中,采用异步复制无法满足需求,但采用同步复制则需要每一个副本确认后才会返回,在副本数多的情况下严重影响效率。另一方面,在同步复制的模式下,如果副本组中的某一个Slave出现假死,整个发送将一直失败直到进行手动处理。
+
+因此,RocketMQ 5 提出了副本组的quorum write,在同步复制的模式下,用户可以在broker端指定发送后至少需要写入多少副本数后才能返回,并且提供自适应降级的方式,可以根据存活的副本数以及CommitLog差距自动完成降级。
+
+## 架构和参数
+
+### Quorum Write
+
+通过增加两个参数来支持quorum write。
+
+- **totalReplicas**:副本组broker总数。默认为1。
+- **inSyncReplicas**:正常情况需保持同步的副本组数量。默认为1。
+
+通过这两个参数,可以在同步复制的模式下,灵活指定需要ACK的副本数。
+
+![](https://s4.ax1x.com/2022/02/05/HnWHKI.png)
+
+如上图所示,在两副本情况下,如果inSyncReplicas为2,则该条消息需要在Master和Slave中均复制完成后才会返回给客户端;在三副本情况下,如果inSyncReplicas为2,则该条消息除了需要复制在Master上,还需要复制到任意一个slave上,才会返回给客户端。在四副本情况下,如果inSyncReplicas为3,则条消息除了需要复制在Master上,还需要复制到任意两个slave上,才会返回给客户端。通过灵活设置totalReplicas和inSyncReplicas,可以满足用户各类场景的需求。
+
+### 自动降级
+
+自动降级的标准是
+
+- 当前副本组的存活副本数
+- Master Commitlog和Slave CommitLog的高度差
+
+通过Nameserver的反向通知以及GetBrokerMemberGroup请求可以获取当前副本组的存活信息,而Master与Slave的Commitlog高度差也可以通过HA服务中的位点记录计算出来。将增加以下参数完成自动降级:
+
+- **minInSyncReplicas**:最小需保持同步的副本组数量,仅在enableAutoInSyncReplicas为true时生效,默认为1。
+- **enableAutoInSyncReplicas**:自动同步降级开关,开启后,若当前副本组处于同步状态的broker数量(包括master自身)不满足inSyncReplicas指定的数量,则按照minInSyncReplicas进行同步。同步状态判断条件为:slave commitLog落后master长度不超过haSlaveFallBehindMax。默认为false。
+- **haSlaveFallBehindMax**:slave是否与master处于in-sync状态的判断值,slave commitLog落后master长度超过该值则认为slave已处于非同步状态。当enableAutoInSyncReplicas打开时,该值越小,越容易触发master的自动降级,当enableAutoInSyncReplicas关闭,且totalReplicas==inSyncReplicas时,该值越小,越容易导致在大流量时发送请求失败,故在该情况下可适当调大haSlaveFallBehindMax。默认为256K。
+
+注意:在RocketMQ 4.x中存在haSlaveFallbehindMax参数,默认256MB,表明Slave与Master的CommitLog高度差多少后判定其为不可用,在RocketMQ 5中该参数被取消,由haSlaveFallBehindMax代替,含义如上。
+
+```java
+//计算needAckNums
+int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
+                              this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
+needAckNums = calcNeedAckNums(inSyncReplicas);
+if (needAckNums > inSyncReplicas) {
+    // Tell the producer, don't have enough slaves to handle the send request
+    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+}
+
+private int calcNeedAckNums(int inSyncReplicas) {
+    int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
+    if (this.defaultMessageStore.getMessageStoreConfig().isEnableAutoInSyncReplicas()) {
+        needAckNums = Math.min(needAckNums, inSyncReplicas);
+        needAckNums = Math.max(needAckNums, this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas());
+    }
+    return needAckNums;
+}
+```
+
+当enableAutoInSyncReplicas=true是开启自适应降级模式,当副本组中存活的副本数减少或Master和Slave Commitlog高度差过大时,都会进行自动降级,最小降级到minInSyncReplicas副本数。比如在两副本中,如果设置totalReplicas=2,InSyncReplicas=2,minInSyncReplicas=1,enableAutoInSyncReplicas=true,正常情况下,两个副本均会处于同步复制,当Slave下线或假死时,会进行自适应降级,producer只需要发送到master即成功。
+
+## 兼容性
+
+用户需要设置正确的参数才能完成正确的向后兼容。举个例子,假设用户原集群为两副本同步复制,在没有修改任何参数的情况下,升级到RocketMQ 5的版本,由于totalReplicas、inSyncReplicas默认都为1,将降级为异步复制,如果需要和以前行为保持一致,则需要将totalReplicas和inSyncReplicas均设置为2。
+
+
+参考文档: [原RIP](https://github.com/apache/rocketmq/wiki/RIP-34-Support-quorum-write-and-adaptive-degradation-in-master-slave-architecture)
\ No newline at end of file
diff --git a/docs/cn/README.md b/docs/cn/README.md
index 2dbd854..fa2e7a5 100644
--- a/docs/cn/README.md
+++ b/docs/cn/README.md
@@ -34,9 +34,15 @@
 ### 5. 运维管理
 - [集群部署(Operation)](operation.md):介绍单Master模式、多Master模式、多Master多slave模式等RocketMQ集群各种形式的部署方法以及运维工具mqadmin的使用方式。
 
-
-
-### 6. API Reference(待补充)
+### 6. RocketMQ 5.0 新特性
+- [POP消费](https://github.com/apache/rocketmq/wiki/%5BRIP-19%5D-Server-side-rebalance,--lightweight-consumer-client-support)
+- [StaticTopic](RocketMQ_Static_Topic_Logic_Queue_设计.md)
+- [BatchConsumeQueue](https://github.com/apache/rocketmq/wiki/RIP-26-Improve-Batch-Message-Processing-Throughput)
+- [BrokerContainer](BrokerContainer.md)
+- [SlaveActingMaster模式](SlaveActingMasterMode.md)
+- [Quorum Write和自动降级](QuorumACK.md)
+
+### 7. API Reference(待补充)
 
 - [DefaultMQProducer API Reference](client/java/API_Reference_DefaultMQProducer.md)
 
diff --git a/docs/cn/SlaveActingMasterMode.md b/docs/cn/SlaveActingMasterMode.md
new file mode 100644
index 0000000..03978bf
--- /dev/null
+++ b/docs/cn/SlaveActingMasterMode.md
@@ -0,0 +1,161 @@
+# Slave Acting Master模式
+
+## 背景
+
+![](https://s4.ax1x.com/2022/02/05/HnW3CQ.png)
+
+上图为当前RocketMQ Master-Slave冷备部署,在该部署方式下,即使一个Master掉线,发送端仍然可以向其他Master发送消息,对于消费端而言,若开启备读,Consumer会自动重连到对应的Slave机器,不会出现消费停滞的情况。但也存在以下问题:
+
+1. 一些仅限于在Master上进行的操作将无法进行,包括且不限于:
+
+- searchOffset 
+- maxOffset 
+- minOffset 
+- earliestMsgStoreTime 
+- endTransaction
+
+所有锁MQ相关操作,包括lock, unlock, lockBatch, unlockAll
+
+具体影响为:
+- 客户端无法获取位于该副本组的mq的锁,故当本地锁过期后,将无法消费该组的顺序消息 
+- 客户端无法主动结束处于半状态的事务消息,只能等待broker回查事务状态 
+- Admin tools或控制中依赖查询offset及earliestMsgStoreTime等操作在该组上无法生效
+
+2. 故障Broker组上的二级消息消费将会中断,该类消息特点依赖Master Broker上的线程扫描CommitLog上的特殊Topic,并将满足要求的消息投放回CommitLog,如果Master Broker下线,会出现二级消息的消费延迟或丢失。具体会影响到当前版本的延迟消息消费、事务消息消费、Pop消费。
+
+3. 没有元数据的反向同步。Master重新被人工拉起后,容易造成元数据的回退,如Master上线后将落后的消费位点同步给备,该组broker的消费位点回退,造成大量消费重复。
+
+![](https://s4.ax1x.com/2022/02/05/HnWwUU.png)
+
+上图为DLedger(Raft)架构,其可以通过选主一定程度上规避上述存在的问题,但可以看到DLedger模式下当前需要强制三副本及以上。
+
+提出一个新的方案,Slave代理Master模式,作为Master-Slave部署模式的升级。在原先Master-Slave部署模式下,通过备代理主、轻量级心跳、副本组信息获取、broker预上线机制、二级消息逃逸等方式,当同组Master发生故障时,Slave将承担更加重要的作用,包括:
+
+- 当Master下线后,该组中brokerId最小的Slave会承担备读 以及 一些 客户端和管控会访问 但却只能在Master节点上完成的任务。包括且不限于searchOffset、maxOffset、minOffset、earliestMsgStoreTime、endTransaction以及所有锁MQ相关操作lock, unlock, lockBatch, unlockAll。
+- 当Master下线后,故障Broker组上的二级消息消费将不会中断,由该组中该组中brokerId最小的Slave承担起该任务,定时消息、Pop消息、事务消息等仍然可以正常运行。
+- 当Master下线后,在Slave代理Master一段时间主后,然后当Master再次上线后,通过预上线机制,Master会自动完成元数据的反向同步后再上线,不会出现元数据回退,造成消息大量重复消费或二级消息大量重放。
+
+## 架构
+
+### 备代理主
+
+Master下线后Slave能正常消费,且在不修改客户端代码情况下完成只能在Master完成的操作源自于Namesrv对“代理”Master的支持。此处“代理”Master指的是,当副本组处于无主状态时,Namesrv将把brokerId最小的存活Slave视为“代理”Master,具体表现为在构建TopicRouteData时,将该Slave的brokerId替换为0,并将brokerPermission修改为4(Read-Only),从而使得该Slave在客户端视图中充当只读模式的Master的角色。
+
+此外,当Master下线后,brokerId最小的Slave会承担起二级消息的扫描和重新投递功能,这也是“代理”的一部分。
+
+```java
+//改变二级消息扫描状态
+public void changeSpecialServiceStatus(boolean shouldStart) {
+	……
+
+    //改变延迟消息服务的状态
+    changeScheduleServiceStatus(shouldStart);
+
+    //改变事务消息服务的状态
+    changeTransactionCheckServiceStatus(shouldStart);
+
+    //改变Pop消息服务状态
+    if (this.ackMessageProcessor != null) {
+        LOG.info("Set PopReviveService Status to {}", shouldStart);
+        this.ackMessageProcessor.setPopReviveServiceStatus(shouldStart);
+    }
+}
+```
+
+### 轻量级心跳
+
+如上文所述,brokerId最小的存活Slave在Master故障后开启自动代理Master模式,因此需要一种机制,这个机制需要保证:
+
+1. Nameserver能及时发现broker上下线并完成路由替换以及下线broker的路由剔除。
+
+2. Broker能及时感知到同组Broker的上下线情况。
+
+针对1,Nameserver原本就存在判活机制,定时会扫描不活跃的broker使其下线,而原本broker与nameserver的“心跳”则依赖于registerBroker操作,而这个操作涉及到topic信息上报,过于“重”,而且注册间隔过于长,因此需要一个轻量级的心跳机制,RoccketMQ 5.0在nameserver和broker间新增BrokerHeartbeat请求,broker会定时向nameserver发送心跳,若nameserver定时任务扫描发现超过心跳超时时间仍未收到该broker的心跳,将unregister该broker。registerBroker时会完成心跳超时时间的设置,并且注册时如果发现broker组内最小brokerId发生变化,将反向通知该组所有broker,并在路由获取时将最小brokerId的Slave路由替换使其充当只读模式的Master的角色
+
+针对2,通过两个机制来及时感知同组broker上下线情况,1是上文中介绍的当nameserver发现该broker组内最小brokerId发生变化,反向通知该组所有broker。2是broker自身会有定时任务,向nameserver同步本broker组存活broker的信息,RoccketMQ 5.0会新增GetBrokerMemberGroup请求来完成该工作。
+
+Slave Broker发现自己是该组中最小的brokerId,将会开启代理模式,而一旦Master Broker重新上线,Slave Broker同样会通过Nameserver反向通知或自身定时任务同步同组broker的信息感知到,并自动结束代理模式。
+
+### 二级消息逃逸
+
+代理模式开启后,brokerId最小的Slave会承担起二级消息的扫描和重新投递功能。
+
+二级消息一般分为两个阶段,发送或者消费时会发送到一个特殊topic中,后台会有线程会扫描,最终的满足要求的消息会被重新投递到Commitlog中。我们可以让brokerId最小的Slave进行扫描,但如果扫描之后的消息重新投递到本Commitlog,那将会破坏Slave不可写的语义,造成Commitlog分叉。因此RoccketMQ 5.0提出一种逃逸机制,将重放的二级消息远程或本地投放到其他Master的Commitlog中。
+
+- 远程逃逸
+
+![](https://s4.ax1x.com/2022/02/05/HnWWVK.png)
+
+如上图所示,假设Region A发生故障,Region B中的节点2将会承担二级消息的扫描任务,同时将最终的满足要求的消息通过EscapeBridge远程发送到当前Broker集群中仍然存活的Master上。
+
+- 本地逃逸
+
+![](https://s4.ax1x.com/2022/02/05/HnWfUO.png)
+
+本地逃逸需要在BrokerContainer下进行,如果BrokerContainer中存在存活的Master,会优先向同进程的Master Commitlog中逃逸,避免远程RPC。
+
+#### 各类二级消息变化
+
+**延迟消息**
+
+Slave代理Master时,ScheduleMessageService将启动,时间到期的延迟消息将通过EscapeBridge优先往本地Master逃逸,若没有则向远程的Master逃逸。该broker上存量的时间未到期的消息将会被逃逸到存活的其他Master上,数据量上如果该broker上有大量的延迟消息未到期,远程逃逸会造成集群内部会有较大数据流转,但基本可控。
+
+
+**POP消息**
+
+1. CK/ACK拼key的时候增加brokerName属性。这样每个broker能在扫描自身commitlog的revive topic时抵消其他broker的CK/ACK消息。
+
+2. Slave上的CK/ACK消息将被逃逸到其他指定的Master A上(需要同一个Master,否则CK/ACK无法抵消,造成消息重复),Master A扫描自身Commitlog revive消息并进行抵消,若超时,则将根据CK消息中的信息向Slave拉取消息(若本地有则拉取本地,否则远程拉取),然后投放到本地的retry topic中。
+
+数据量上,如果是远程投递或拉取,且有消费者大量通过Pop消费存量的Slave消息,并且长时间不ACK,则在集群内部会有较大数据流转。
+
+### 预上线机制
+
+![](https://s4.ax1x.com/2022/02/05/HnW5Pe.png)
+
+当Master Broker下线后,Slave Broker将承担备读的作用,并对二级消息进行代理,因此Slave Broker中的部分元数据包括消费位点、定时消息进度等会比下线的Master Broker更加超前。如果Master Broker重新上线,Slave Broker元数据将被Master Broker覆盖,该组Broker元数据将发生回退,可能造成大量消息重复。因此,需要一套预上线机制来完成元数据的反向同步。
+
+需要为consumerOffset和delayOffset等元数据增加版本号(DataVersion)的概念,并且为了防止版本号更新太频繁,增加更新步长的概念,比如对于消费位点来说,默认每更新位点超过500次,版本号增加到下一个版本。
+
+如上图所示,Master Broker启动前会进行预上线,再预上线之前,对外不可见(Broker会有isIsolated标记自己的状态,当其为true时,不会像nameserver注册和发送心跳),因此也不会对外提供服务,二级消息的扫描流程也不会进行启动,具体预上线机制如下:
+
+1. Master Broker向NameServer获取Slave Broker地址(GetBrokerMemberGroup请求),但不注册
+2. Master Broker向Slave Broker发送自己的状态信息和地址
+3. Slave Broker得到Master Broker地址后和状态信息后,建立HA连接,并完成握手,进入Transfer状态
+4. Master Broker再完成握手后,反向获取备的元数据,包括消费位点、定时消息进度等,根据版本号决定是否更新。
+5. Master Broker对broker组内所有Slave Broker都完成1-4步操作后,正式上线,向NameServer注册,正式对外提供服务。
+
+### 锁Quorum
+
+当Slave代理Master时,外部看到的是“只读”的Master,因此顺序消息仍然可以对队列上锁,消费不会中断。但当真的Master重新上线后,在一定的时间差内可能会造成多个consumer锁定同一个队列,比如一个consumer仍然锁着代理的备某一个队列,一个consumer锁刚上线的主的同一队列,造成顺序消息的乱序和重复。
+
+因此在lock操作时要求,需锁broker副本组的大多数成员(quorum原则)均成功才算锁成功。但两副本下达不到quorum的原则,所以提供了lockInStrictMode参数,表示消费端消费顺序消息锁队列时是否使用严格模式。严格模式即对单个队列而言,需锁副本组的大多数成员(quorum原则)均成功才算锁成功,非严格模式即锁任意一副本成功就算锁成功,该参数默认为false。当对消息顺序性高于可用性时,需将该参数设置为false。
+
+## 配置更新
+
+Nameserver
+- scanNotActiveBrokerInterval:扫描不活跃broker间隔,每次扫描将判断broker心跳是否超时,默认5s。
+- supportActingMaster:nameserver端是否支持Slave代理Master模式,开启后,副本组在无master状态下,brokerId==1的slave将在TopicRoute中被替换成master(即brokerId=0),并以只读模式对客户端提供服务,默认为false。
+Broker
+- enableSlaveActingMaster:broker端开启slave代理master模式总开关,默认为false。
+- enableRemoteEscape:是否允许远程逃逸,默认为false。
+- brokerHeartbeatInterval:broker向nameserver发送心跳间隔(不同于注册间隔),默认1s。
+- brokerNotActiveTimeoutMillis:broker不活跃超时时间,超过此时间nameserver仍未收到broker心跳,则判定broker下线,默认10s。
+- sendHeartbeatTimeoutMillis:broker发送心跳请求超时时间,默认1s。
+- lockInStrictMode:消费端消费顺序消息锁队列时是否使用严格模式,默认为false,上文已介绍。
+- skipPreOnline:broker跳过预上线流程,默认为false。
+- compatibleWithOldNameSrv:是否以兼容模式访问旧nameserver,默认为true。
+
+## 兼容性方案
+
+新版nameserver和旧版broker:新版nameserver可以完全兼容旧版broker,无兼容问题。
+
+旧版nameserver和新版Broker:新版Broker开启Slave代理Master,会向Nameserver发送 BROKER_HEARTBEAT以及GET_BROKER_MEMBER_GROUP请求,但由于旧版本nameserver无法处理这些请求。因此需要在brokerConfig中配置compatibleWithOldNameSrv=true,开启对旧版nameserver的兼容模式,在该模式下,broker的一些新增RPC将通过复用原有RequestCode实现,具体为:
+新增轻量级心跳将通过复用QUERY_DATA_VERSION实现
+新增获取BrokerMemberGroup数据将通过复用GET_ROUTEINFO_BY_TOPIC实现,具体实现方式是每个broker都会新增rmq_sys_{brokerName}的系统topic,通过获取该系统topic的路由来获取该副本组的存活信息。
+但旧版nameserver无法提供代理功能,Slave代理Master的功能将无法生效,但不影响其他功能。
+
+客户端对新旧版本的nameserver和broker均无兼容性问题。
+
+
+参考文档: [原RIP](https://github.com/apache/rocketmq/wiki/RIP-32-Slave-Acting-Master-Mode)
\ No newline at end of file
diff --git a/logging/src/main/java/org/apache/rocketmq/logging/InternalLoggerFactory.java b/logging/src/main/java/org/apache/rocketmq/logging/InternalLoggerFactory.java
index 17c56bd..e4b5732 100644
--- a/logging/src/main/java/org/apache/rocketmq/logging/InternalLoggerFactory.java
+++ b/logging/src/main/java/org/apache/rocketmq/logging/InternalLoggerFactory.java
@@ -27,8 +27,19 @@ public abstract class InternalLoggerFactory {
 
     public static final String DEFAULT_LOGGER = LOGGER_SLF4J;
 
+    public static final String BROKER_CONTAINER_NAME = "BrokerContainer";
+
+    /**
+     * Loggers with following name will be directed to default logger for LogTail parser.
+     */
+    public static final String CONSUMER_STATS_LOGGER_NAME = "RocketmqConsumerStats";
+    public static final String COMMERCIAL_LOGGER_NAME = "RocketmqCommercial";
+    public static final String ACCOUNT_LOGGER_NAME = "RocketmqAccount";
+
     private static String loggerType = null;
 
+    public static ThreadLocal<String> brokerIdentity = new ThreadLocal<String>();
+
     private static ConcurrentHashMap<String, InternalLoggerFactory> loggerFactoryCache = new ConcurrentHashMap<String, InternalLoggerFactory>();
 
     public static InternalLogger getLogger(Class clazz) {
diff --git a/logging/src/main/java/org/apache/rocketmq/logging/Slf4jLoggerFactory.java b/logging/src/main/java/org/apache/rocketmq/logging/Slf4jLoggerFactory.java
index 53dbc94..c8516c3 100644
--- a/logging/src/main/java/org/apache/rocketmq/logging/Slf4jLoggerFactory.java
+++ b/logging/src/main/java/org/apache/rocketmq/logging/Slf4jLoggerFactory.java
@@ -17,6 +17,10 @@
 
 package org.apache.rocketmq.logging;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,116 +47,147 @@ public class Slf4jLoggerFactory extends InternalLoggerFactory {
     }
 
     public static class Slf4jLogger implements InternalLogger {
-
-        private Logger logger = null;
-
-        public Slf4jLogger(String name) {
-            logger = LoggerFactory.getLogger(name);
+        private static final Pattern PATTERN = Pattern.compile("#.*#");
+
+        private final String loggerSuffix;
+        private final Logger defaultLogger;
+
+        private final Map<String, Logger> loggerMap = new HashMap<String, Logger>();
+
+        public Slf4jLogger(String loggerSuffix) {
+            this.loggerSuffix = loggerSuffix;
+            this.defaultLogger = LoggerFactory.getLogger(loggerSuffix);
+        }
+
+        private Logger getLogger() {
+            if (loggerSuffix.equals(ACCOUNT_LOGGER_NAME)
+                || loggerSuffix.equals(CONSUMER_STATS_LOGGER_NAME)
+                || loggerSuffix.equals(COMMERCIAL_LOGGER_NAME)) {
+                return defaultLogger;
+            }
+            String brokerIdentity = InnerLoggerFactory.brokerIdentity.get();
+            if (brokerIdentity == null) {
+                Matcher m = PATTERN.matcher(Thread.currentThread().getName());
+                if (m.find()) {
+                    String match = m.group();
+                    brokerIdentity = match.substring(1, match.length() - 1);
+                }
+            }
+            if (InnerLoggerFactory.BROKER_CONTAINER_NAME.equals(brokerIdentity)) {
+                return defaultLogger;
+            }
+            if (brokerIdentity != null) {
+                if (!loggerMap.containsKey(brokerIdentity)) {
+                    loggerMap.put(brokerIdentity, LoggerFactory.getLogger("#" + brokerIdentity + "#" + loggerSuffix));
+                }
+                return loggerMap.get(brokerIdentity);
+            }
+            return defaultLogger;
         }
 
         @Override
         public String getName() {
-            return logger.getName();
+            return getLogger().getName();
         }
 
         @Override
         public void debug(String s) {
-            logger.debug(s);
+            getLogger().debug(s);
         }
 
         @Override
         public void debug(String s, Object o) {
-            logger.debug(s, o);
+            getLogger().debug(s, o);
         }
 
         @Override
         public void debug(String s, Object o, Object o1) {
-            logger.debug(s, o, o1);
+            getLogger().debug(s, o, o1);
         }
 
         @Override
         public void debug(String s, Object... objects) {
-            logger.debug(s, objects);
+            getLogger().debug(s, objects);
         }
 
         @Override
         public void debug(String s, Throwable throwable) {
-            logger.debug(s, throwable);
+            getLogger().debug(s, throwable);
         }
 
         @Override
         public void info(String s) {
-            logger.info(s);
+            getLogger().info(s);
         }
 
         @Override
         public void info(String s, Object o) {
-            logger.info(s, o);
+            getLogger().info(s, o);
         }
 
         @Override
         public void info(String s, Object o, Object o1) {
-            logger.info(s, o, o1);
+            getLogger().info(s, o, o1);
         }
 
         @Override
         public void info(String s, Object... objects) {
-            logger.info(s, objects);
+            getLogger().info(s, objects);
         }
 
         @Override
         public void info(String s, Throwable throwable) {
-            logger.info(s, throwable);
+            getLogger().info(s, throwable);
         }
 
         @Override
         public void warn(String s) {
-            logger.warn(s);
+            getLogger().warn(s);
         }
 
         @Override
         public void warn(String s, Object o) {
-            logger.warn(s, o);
+            getLogger().warn(s, o);
         }
 
         @Override
         public void warn(String s, Object... objects) {
-            logger.warn(s, objects);
+            getLogger().warn(s, objects);
         }
 
         @Override
         public void warn(String s, Object o, Object o1) {
-            logger.warn(s, o, o1);
+            getLogger().warn(s, o, o1);
         }
 
         @Override
         public void warn(String s, Throwable throwable) {
-            logger.warn(s, throwable);
+            getLogger().warn(s, throwable);
         }
 
         @Override
         public void error(String s) {
-            logger.error(s);
+            getLogger().error(s);
         }
 
         @Override
         public void error(String s, Object o) {
-            logger.error(s, o);
+            getLogger().error(s, o);
         }
 
         @Override
         public void error(String s, Object o, Object o1) {
-            logger.error(s, o, o1);
+            getLogger().error(s, o, o1);
         }
 
         @Override
         public void error(String s, Object... objects) {
-            logger.error(s, objects);
+            getLogger().error(s, objects);
         }
 
         @Override
         public void error(String s, Throwable throwable) {
-            logger.error(s, throwable);
+            getLogger().error(s, throwable);
         }
     }
 }
diff --git a/logging/src/main/java/org/apache/rocketmq/logging/inner/Level.java b/logging/src/main/java/org/apache/rocketmq/logging/inner/Level.java
index 0dc81d7..487682c 100755
--- a/logging/src/main/java/org/apache/rocketmq/logging/inner/Level.java
+++ b/logging/src/main/java/org/apache/rocketmq/logging/inner/Level.java
@@ -116,6 +116,7 @@ public class Level implements Serializable {
         if (s.equals(OFF_NAME)) {
             return Level.OFF;
         }
+
         if (s.equals(INFO_NAME)) {
             return Level.INFO;
         }
diff --git a/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java b/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java
index aaba4d6..b6d1049 100755
--- a/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java
+++ b/logging/src/main/java/org/apache/rocketmq/logging/inner/SysLogger.java
@@ -33,13 +33,13 @@ public class SysLogger {
 
     public static void debug(String msg) {
         if (debugEnabled && !quietMode) {
-            System.err.println(PREFIX + msg);
+            System.out.printf("%s", PREFIX + msg);
         }
     }
 
     public static void debug(String msg, Throwable t) {
         if (debugEnabled && !quietMode) {
-            System.err.println(PREFIX + msg);
+            System.out.printf("%s", PREFIX + msg);
             if (t != null) {
                 t.printStackTrace(System.out);
             }
diff --git a/logging/src/test/java/org/apache/rocketmq/logging/Slf4jLoggerFactoryTest.java b/logging/src/test/java/org/apache/rocketmq/logging/Slf4jLoggerFactoryTest.java
index 4bed745..2fe2abf 100644
--- a/logging/src/test/java/org/apache/rocketmq/logging/Slf4jLoggerFactoryTest.java
+++ b/logging/src/test/java/org/apache/rocketmq/logging/Slf4jLoggerFactoryTest.java
@@ -69,7 +69,7 @@ public class Slf4jLoggerFactoryTest extends BasicLoggerTest {
         logger3.error("error {}", "hahahah");
         logger3.debug("debug {}", "hahahah");
         String content = readFile(file);
-        System.out.println(content);
+        System.out.printf(content);
 
         Assert.assertTrue(content.contains("Slf4jLoggerFactoryTest"));
         Assert.assertTrue(content.contains("info"));
diff --git a/logging/src/test/java/org/apache/rocketmq/logging/inner/LoggingBuilderTest.java b/logging/src/test/java/org/apache/rocketmq/logging/inner/LoggingBuilderTest.java
index 6c816a6..e3dbb14 100644
--- a/logging/src/test/java/org/apache/rocketmq/logging/inner/LoggingBuilderTest.java
+++ b/logging/src/test/java/org/apache/rocketmq/logging/inner/LoggingBuilderTest.java
@@ -17,15 +17,15 @@
 
 package org.apache.rocketmq.logging.inner;
 
-import org.apache.rocketmq.logging.BasicLoggerTest;
-import org.junit.Assert;
-import org.junit.Test;
-
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.PrintStream;
 
+import org.apache.rocketmq.logging.BasicLoggerTest;
+import org.junit.Assert;
+import org.junit.Test;
+
 public class LoggingBuilderTest extends BasicLoggerTest {
 
     @Test
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index 820a406..ee1617e 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -41,5 +41,13 @@
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
+            <artifactId>concurrentlinkedhashmap-lru</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
index 111cede..adf61ad 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.srvutil;
 
+import com.google.common.base.Strings;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -26,7 +27,6 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -49,7 +49,7 @@ public class FileWatchService extends ServiceThread {
         this.fileCurrentHash = new ArrayList<>();
 
         for (int i = 0; i < watchFiles.length; i++) {
-            if (StringUtils.isNotEmpty(watchFiles[i]) && new File(watchFiles[i]).exists()) {
+            if (!Strings.isNullOrEmpty(watchFiles[i]) && new File(watchFiles[i]).exists()) {
                 this.watchFiles.add(watchFiles[i]);
                 this.fileCurrentHash.add(hash(watchFiles[i]));
             }
diff --git a/srvutil/src/main/java/org/apache/rocketmq/util/cache/CacheEvictHandler.java b/srvutil/src/main/java/org/apache/rocketmq/util/cache/CacheEvictHandler.java
new file mode 100644
index 0000000..13dcd9e
--- /dev/null
+++ b/srvutil/src/main/java/org/apache/rocketmq/util/cache/CacheEvictHandler.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.util.cache;
+
+import java.util.Map;
+
+public interface CacheEvictHandler<K, V> {
+    void onEvict(Map.Entry<K, V> eldest);
+}
diff --git a/srvutil/src/main/java/org/apache/rocketmq/util/cache/CacheObject.java b/srvutil/src/main/java/org/apache/rocketmq/util/cache/CacheObject.java
new file mode 100644
index 0000000..39c64ce
--- /dev/null
+++ b/srvutil/src/main/java/org/apache/rocketmq/util/cache/CacheObject.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.util.cache;
+
+
+public class CacheObject<T> {
+    private T target;
+    private long bornTime = System.currentTimeMillis();
+    private long exp;
+
+    public CacheObject(long exp, T target) {
+        this.exp = exp;
+        this.target = target;
+    }
+
+    public T getTarget() {
+        if (System.currentTimeMillis() - bornTime > exp) {
+            return null;
+        }
+        return target;
+    }
+}
diff --git a/srvutil/src/main/java/org/apache/rocketmq/util/cache/ExpiredLocalCache.java b/srvutil/src/main/java/org/apache/rocketmq/util/cache/ExpiredLocalCache.java
new file mode 100644
index 0000000..ab60f0f
--- /dev/null
+++ b/srvutil/src/main/java/org/apache/rocketmq/util/cache/ExpiredLocalCache.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.util.cache;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EvictionListener;
+
+public class ExpiredLocalCache<K, T> {
+    private ConcurrentLinkedHashMap<K, CacheObject<T>> cache;
+    private EvictionListener<K, CacheObject<T>> listener;
+
+    public ExpiredLocalCache(int size) {
+        cache = new ConcurrentLinkedHashMap.Builder<K, CacheObject<T>>().maximumWeightedCapacity(size).build();
+    }
+
+    public ExpiredLocalCache(int size, String name) {
+        cache = new ConcurrentLinkedHashMap.Builder<K, CacheObject<T>>().maximumWeightedCapacity(size).build();
+    }
+
+    public ExpiredLocalCache(int size, boolean memoryMeter, EvictionListener<K, CacheObject<T>> listener) {
+        this.listener = listener;
+        cache = new ConcurrentLinkedHashMap.Builder<K, CacheObject<T>>().listener(listener).maximumWeightedCapacity(size).build();
+    }
+
+    public T get(K key) {
+        CacheObject<T> object = cache.get(key);
+        if (object == null) {
+            return null;
+        }
+        T ret = object.getTarget();
+        if (ret == null) {
+            this.delete(key);
+        }
+        return ret;
+    }
+
+    public T put(K key, T v, long exp) {
+        CacheObject<T> value = new CacheObject<T>(exp, v);
+        CacheObject<T> old = cache.put(key, value);
+        if (old == null) {
+            return null;
+        } else {
+            return old.getTarget();
+        }
+    }
+
+    public T putIfAbsent(K key, T v, long exp) {
+        CacheObject<T> value = new CacheObject<T>(exp, v);
+        CacheObject<T> old = cache.putIfAbsent(key, value);
+        if (old == null) {
+            return null;
+        } else {
+            return old.getTarget();
+        }
+    }
+
+    public T delete(K key) {
+        CacheObject<T> object = cache.remove(key);
+        if (object == null) {
+            return null;
+        }
+        T ret = object.getTarget();
+        return ret;
+    }
+
+    public ConcurrentLinkedHashMap<K, CacheObject<T>> getCache() {
+        return cache;
+    }
+
+}
diff --git a/srvutil/src/main/java/org/apache/rocketmq/util/cache/LocalCache.java b/srvutil/src/main/java/org/apache/rocketmq/util/cache/LocalCache.java
new file mode 100644
index 0000000..7dbb6e2
--- /dev/null
+++ b/srvutil/src/main/java/org/apache/rocketmq/util/cache/LocalCache.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.util.cache;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class LocalCache<K, V> extends LinkedHashMap<K, V> {
+
+    private static final long serialVersionUID = 1606231700062718297L;
+
+    private static final int DEFAULT_CACHE_SIZE = 1000;
+
+    private int cacheSize = DEFAULT_CACHE_SIZE;
+    private CacheEvictHandler<K, V> handler;
+
+    /**
+     * The default initial capacity - MUST be a power of two.
+     */
+    static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
+
+
+    /**
+     * The load factor used when none specified in constructor.
+     */
+    static final float DEFAULT_LOAD_FACTOR = 0.75f;
+
+    public LocalCache(int cacheSize, boolean isLru, CacheEvictHandler<K, V> handler) {
+        super(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, isLru);
+        this.cacheSize = cacheSize;
+        this.handler = handler;
+    }
+
+
+    @Override
+    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+        boolean result = this.size() > cacheSize;
+        if (result && handler != null) {
+            handler.onEvict(eldest);
+        }
+        return result;
+    }
+
+}
diff --git a/srvutil/src/main/java/org/apache/rocketmq/util/cache/LockManager.java b/srvutil/src/main/java/org/apache/rocketmq/util/cache/LockManager.java
new file mode 100644
index 0000000..ae6906c
--- /dev/null
+++ b/srvutil/src/main/java/org/apache/rocketmq/util/cache/LockManager.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.util.cache;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rocketmq.common.PopAckConstants;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+
+public class LockManager {
+    private static ExpiredLocalCache<String, AtomicBoolean> expiredLocalCache = new ExpiredLocalCache<String, AtomicBoolean>(100000);
+
+    public static boolean tryLock(String key, long lockTime) {
+        AtomicBoolean v = expiredLocalCache.get(key);
+        if (v == null) {
+            return expiredLocalCache.putIfAbsent(key, new AtomicBoolean(false), lockTime) == null;
+        } else {
+            return v.compareAndSet(true, false);
+        }
+    }
+
+    public static void unLock(String key) {
+        AtomicBoolean v = expiredLocalCache.get(key);
+        if (v != null) {
+            v.set(true);
+        }
+    }
+
+    public static String buildKey(PopMessageRequestHeader requestHeader, int queueId) {
+        return requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + requestHeader.getTopic() + PopAckConstants.SPLIT + queueId;
+    }
+
+    public static String buildKey(String topic, String cid, int queueId) {
+        return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId;
+    }
+
+    public static String buildKey(String prefix, int queueId) {
+        return prefix + PopAckConstants.SPLIT + queueId;
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 2aa4488..b8bb399 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
@@ -44,8 +45,10 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.subscription.GroupForbidden;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -54,6 +57,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 
 import java.io.UnsupportedEncodingException;
@@ -61,6 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import org.apache.rocketmq.tools.admin.common.AdminToolResult;
 
 public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
@@ -95,12 +100,14 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum,
+        Map<String, String> attributes) throws MQClientException {
         createTopic(key, newTopic, queueNum, 0, attributes);
     }
 
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag,
+        Map<String, String> attributes) throws MQClientException {
         defaultMQAdminExtImpl.createTopic(key, newTopic, queueNum, topicSysFlag, attributes);
     }
 
@@ -133,16 +140,9 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     @Override
     public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
         throws MQClientException, InterruptedException {
-
         return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end);
     }
 
-    public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin, long end)
-        throws MQClientException, InterruptedException {
-
-        return defaultMQAdminExtImpl.queryMessageByUniqKey(topic, key, maxNum, begin, end);
-    }
-
     @Override
     public void start() throws MQClientException {
         defaultMQAdminExtImpl.start();
@@ -154,6 +154,18 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public void addBrokerToContainer(String brokerContainerAddr, String brokerConfig) throws InterruptedException,
+        RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+        defaultMQAdminExtImpl.addBrokerToContainer(brokerContainerAddr, brokerConfig);
+    }
+
+    @Override public void removeBrokerFromContainer(String brokerContainerAddr, String clusterName, String brokerName,
+        long brokerId) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException {
+        defaultMQAdminExtImpl.removeBrokerFromContainer(brokerContainerAddr, clusterName, brokerName, brokerId);
+    }
+
+    @Override
     public void updateBrokerConfig(String brokerAddr,
         Properties properties) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
@@ -225,12 +237,18 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
-    public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException,
-            MQBrokerException {
+    public TopicStatsTable examineTopicStats(String brokerAddr,
+        String topic) throws RemotingException, MQClientException, InterruptedException,
+        MQBrokerException {
         return defaultMQAdminExtImpl.examineTopicStats(brokerAddr, topic);
     }
 
     @Override
+    public AdminToolResult<TopicStatsTable> examineTopicStatsConcurrent(String topic) {
+        return defaultMQAdminExtImpl.examineTopicStatsConcurrent(topic);
+    }
+
+    @Override
     public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
         return this.defaultMQAdminExtImpl.fetchAllTopicList();
     }
@@ -263,6 +281,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(String consumerGroup, String topic) {
+        return defaultMQAdminExtImpl.examineConsumeStatsConcurrent(consumerGroup, topic);
+    }
+
+    @Override
     public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
         RemotingSendRequestException, MQBrokerException {
         return defaultMQAdminExtImpl.examineBrokerClusterInfo();
@@ -295,6 +318,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
         return defaultMQAdminExtImpl.examineProducerConnectionInfo(producerGroup, topic);
     }
 
+    @Override public void deleteTopicInNameServer(Set<String> addrs, String clusterName,
+        String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExtImpl.deleteTopicInNameServer(addrs, clusterName, topic);
+    }
+
     @Override
     public List<String> getNameServerAddressList() {
         return this.defaultMQAdminExtImpl.getNameServerAddressList();
@@ -330,6 +358,13 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public void deleteTopic(String topicName,
+        String clusterName) throws RemotingException, MQBrokerException, InterruptedException,
+        MQClientException {
+        defaultMQAdminExtImpl.deleteTopic(topicName, clusterName);
+    }
+
+    @Override
     public void deleteTopicInBroker(Set<String> addrs,
         String topic) throws RemotingException, MQBrokerException, InterruptedException,
         MQClientException {
@@ -337,6 +372,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public AdminToolResult<BrokerOperatorResult> deleteTopicInBrokerConcurrent(Set<String> addrs, String topic) {
+        return defaultMQAdminExtImpl.deleteTopicInBrokerConcurrent(addrs, topic);
+    }
+
+    @Override
     public void deleteTopicInNameServer(Set<String> addrs,
         String topic) throws RemotingException, MQBrokerException, InterruptedException,
         MQClientException {
@@ -398,6 +438,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public AdminToolResult<BrokerOperatorResult> resetOffsetNewConcurrent(final String group, final String topic,
+        final long timestamp) {
+        return this.defaultMQAdminExtImpl.resetOffsetNewConcurrent(group, topic, timestamp);
+    }
+
+    @Override
     public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
         String clientAddr) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException {
@@ -419,6 +465,23 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public TopicList queryTopicsByConsumer(String group)
+        throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return this.defaultMQAdminExtImpl.queryTopicsByConsumer(group);
+    }
+
+    @Override
+    public AdminToolResult<TopicList> queryTopicsByConsumerConcurrent(String group) {
+        return defaultMQAdminExtImpl.queryTopicsByConsumerConcurrent(group);
+    }
+
+    @Override
+    public SubscriptionData querySubscription(String group, String topic)
+        throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return this.defaultMQAdminExtImpl.querySubscription(group, topic);
+    }
+
+    @Override
     public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
         final String group) throws InterruptedException, MQBrokerException,
         RemotingException, MQClientException {
@@ -426,6 +489,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public AdminToolResult<List<QueueTimeSpan>> queryConsumeTimeSpanConcurrent(String topic, String group) {
+        return defaultMQAdminExtImpl.queryConsumeTimeSpanConcurrent(topic, group);
+    }
+
+    @Override
     public boolean cleanExpiredConsumerQueue(
         String cluster) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, MQClientException, InterruptedException {
@@ -459,6 +527,13 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId,
+        boolean jstack, boolean metrics) throws RemotingException,
+        MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.getConsumerRunningInfo(consumerGroup, clientId, jstack, metrics);
+    }
+
+    @Override
     public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId)
         throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, msgId);
@@ -479,6 +554,13 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public List<MessageTrack> messageTrackDetailConcurrent(
+        MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
+        MQBrokerException {
+        return this.defaultMQAdminExtImpl.messageTrackDetailConcurrent(msg);
+    }
+
+    @Override
     public void cloneGroupOffset(String srcGroup, String destGroup, String topic,
         boolean isOffline) throws RemotingException,
         MQClientException, InterruptedException, MQBrokerException {
@@ -548,6 +630,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
         return this.defaultMQAdminExtImpl.viewMessage(topic, msgId);
     }
 
+    @Override
+    public MessageExt queryMessage(String clusterName, String topic, String msgId)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return this.defaultMQAdminExtImpl.queryMessage(clusterName, topic, msgId);
+    }
+
     public String getAdminExtGroup() {
         return adminExtGroup;
     }
@@ -617,9 +705,54 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
-    public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException,  InterruptedException, MQBrokerException {
+    public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig,
+        TopicQueueMappingDetail mappingDetail,
+        boolean force) throws RemotingException, InterruptedException, MQBrokerException {
         this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
     }
 
+    @Override
+    public long searchOffset(final String brokerAddr, final String topicName,
+        final int queueId, final long timestamp, final long timeoutMillis)
+        throws RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQAdminExtImpl.searchOffset(brokerAddr, topicName, queueId, timestamp, timeoutMillis);
+    }
 
+    @Override
+    public void resetOffsetByQueueId(final String brokerAddr, final String consumerGroup,
+        final String topicName, final int queueId, final long resetOffset)
+        throws RemotingException, InterruptedException, MQBrokerException {
+        this.defaultMQAdminExtImpl.resetOffsetByQueueId(brokerAddr, consumerGroup, topicName, queueId, resetOffset);
+    }
+
+    @Override
+    public HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnectException,
+        RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
+        return this.defaultMQAdminExtImpl.getBrokerHAStatus(brokerAddr);
+    }
+
+    @Override
+    public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset)
+        throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+        this.defaultMQAdminExtImpl.resetMasterFlushOffset(brokerAddr, masterFlushOffset);
+    }
+
+    public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin, long end)
+        throws MQClientException, InterruptedException {
+
+        return defaultMQAdminExtImpl.queryMessageByUniqKey(topic, key, maxNum, begin, end);
+    }
+
+    public DefaultMQAdminExtImpl getDefaultMQAdminExtImpl() {
+        return defaultMQAdminExtImpl;
+    }
+
+    public GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, //
+        String groupName, //
+        String topicName, //
+        Boolean readable) //
+        throws RemotingException,
+        InterruptedException, MQBrokerException {
+        return this.defaultMQAdminExtImpl.updateAndGetGroupReadForbidden(brokerAddr, groupName, topicName, readable);
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index b6d6ace..dfd3325 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.tools.admin;
 
 import java.io.UnsupportedEncodingException;
+import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,6 +30,12 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -38,9 +45,11 @@ import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
@@ -65,6 +74,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
@@ -73,12 +83,14 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateGroupForbiddenRequestHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.subscription.GroupForbidden;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -89,8 +101,13 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.apache.rocketmq.tools.admin.api.TrackType;
+import org.apache.rocketmq.tools.admin.common.AdminToolHandler;
+import org.apache.rocketmq.tools.admin.common.AdminToolResult;
+import org.apache.rocketmq.tools.admin.common.AdminToolsResultCodeEnum;
+import org.apache.rocketmq.tools.command.CommandUtil;
 
 public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 
@@ -120,6 +137,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     private long timeoutMillis = 20000;
     private Random random = new Random();
 
+    protected final List<String> kvNamespaceToDeleteList = Arrays.asList(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+    protected ThreadPoolExecutor threadPoolExecutor;
+
     public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) {
         this(defaultMQAdminExt, null, timeoutMillis);
     }
@@ -130,8 +150,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         this.timeoutMillis = timeoutMillis;
     }
 
-    @Override
-    public void start() throws MQClientException {
+    @Override public void start() throws MQClientException {
         switch (this.serviceState) {
             case CREATE_JUST:
                 this.serviceState = ServiceState.START_FAILED;
@@ -143,9 +162,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
                 boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
                 if (!registerOK) {
                     this.serviceState = ServiceState.CREATE_JUST;
-                    throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
-                        + "] has created already, specifed another name please."
-                        + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
+                    throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup() + "] has created already, specifed another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
                 }
 
                 mqClientInstance.start();
@@ -153,20 +170,22 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
                 log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
 
                 this.serviceState = ServiceState.RUNNING;
+
+                int theadPoolCoreSize = Integer.parseInt(System.getProperty("rocketmq.admin.threadpool.coresize", "20"));
+
+                this.threadPoolExecutor = new ThreadPoolExecutor(theadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_"));
+
                 break;
             case RUNNING:
             case START_FAILED:
             case SHUTDOWN_ALREADY:
-                throw new MQClientException("The AdminExt service state not OK, maybe started once, "
-                    + this.serviceState
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
+                throw new MQClientException("The AdminExt service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
             default:
                 break;
         }
     }
 
-    @Override
-    public void shutdown() {
+    @Override public void shutdown() {
         switch (this.serviceState) {
             case CREATE_JUST:
                 break;
@@ -176,6 +195,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 
                 log.info("the adminExt [{}] shutdown OK", this.defaultMQAdminExt.getAdminExtGroup());
                 this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+                this.threadPoolExecutor.shutdown();
                 break;
             case SHUTDOWN_ALREADY:
                 break;
@@ -184,22 +204,46 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         }
     }
 
-    @Override
-    public void updateBrokerConfig(String brokerAddr,
-        Properties properties) throws RemotingConnectException, RemotingSendRequestException,
-        RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+    @Override public void addBrokerToContainer(String brokerContainerAddr,
+        String brokerConfig) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        this.mqClientInstance.getMQClientAPIImpl().addBroker(brokerContainerAddr, brokerConfig, timeoutMillis);
+    }
+
+    @Override public void removeBrokerFromContainer(String brokerContainerAddr, String clusterName, String brokerName,
+        long brokerId) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        this.mqClientInstance.getMQClientAPIImpl().removeBroker(brokerContainerAddr, clusterName, brokerName, brokerId, 20000);
+    }
+
+    public AdminToolResult adminToolExecute(AdminToolHandler handler) {
+        try {
+            return handler.doExecute();
+        } catch (RemotingException e) {
+            log.error("", e);
+            return AdminToolResult.failure(AdminToolsResultCodeEnum.REMOTING_ERROR, e.getMessage());
+        } catch (MQClientException e) {
+            if (ResponseCode.TOPIC_NOT_EXIST == e.getResponseCode()) {
+                return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, e.getErrorMessage());
+            }
+            return AdminToolResult.failure(AdminToolsResultCodeEnum.MQ_CLIENT_ERROR, e.getMessage());
+        } catch (InterruptedException e) {
+            return AdminToolResult.failure(AdminToolsResultCodeEnum.INTERRUPT_ERROR, e.getMessage());
+        } catch (Exception e) {
+            return AdminToolResult.failure(AdminToolsResultCodeEnum.MQ_BROKER_ERROR, e.getMessage());
+        }
+    }
+
+    @Override public void updateBrokerConfig(String brokerAddr,
+        Properties properties) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
         this.mqClientInstance.getMQClientAPIImpl().updateBrokerConfig(brokerAddr, properties, timeoutMillis);
     }
 
-    @Override
-    public Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException,
-        RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+    @Override public Properties getBrokerConfig(
+        final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
         return this.mqClientInstance.getMQClientAPIImpl().getBrokerConfig(brokerAddr, timeoutMillis);
     }
 
-    @Override
-    public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
-        InterruptedException, MQClientException {
+    @Override public void createAndUpdateTopicConfig(String addr,
+        TopicConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
     }
 
@@ -218,42 +262,34 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         this.mqClientInstance.getMQClientAPIImpl().updateGlobalWhiteAddrsConfig(addr, globalWhiteAddrs, timeoutMillis);
     }
 
-    @Override
-    public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
+    @Override public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
         String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterAclInfo(addr, timeoutMillis);
     }
 
-    @Override
-    public AclConfig examineBrokerClusterAclConfig(
+    @Override public AclConfig examineBrokerClusterAclConfig(
         String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterConfig(addr, timeoutMillis);
     }
 
-    @Override
-    public void createAndUpdateSubscriptionGroupConfig(String addr,
-        SubscriptionGroupConfig config) throws RemotingException,
-        MQBrokerException, InterruptedException, MQClientException {
+    @Override public void createAndUpdateSubscriptionGroupConfig(String addr,
+        SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.mqClientInstance.getMQClientAPIImpl().createSubscriptionGroup(addr, config, timeoutMillis);
     }
 
-    @Override
-    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group)
-        throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+    @Override public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr,
+        String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
         SubscriptionGroupWrapper wrapper = this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(addr, timeoutMillis);
         return wrapper.getSubscriptionGroupTable().get(group);
     }
 
-    @Override
-    public TopicConfig examineTopicConfig(String addr,
+    @Override public TopicConfig examineTopicConfig(String addr,
         String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
     }
 
-    @Override
-    public TopicStatsTable examineTopicStats(
-        String topic) throws RemotingException, MQClientException, InterruptedException,
-        MQBrokerException {
+    @Override public TopicStatsTable examineTopicStats(
+        String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
         TopicStatsTable topicStatsTable = new TopicStatsTable();
 
@@ -276,51 +312,91 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return topicStatsTable;
     }
 
-    @Override
-    public TopicStatsTable examineTopicStats(String brokerAddr,
-        String topic) throws RemotingException, MQClientException, InterruptedException,
-        MQBrokerException {
+    @Override public AdminToolResult<TopicStatsTable> examineTopicStatsConcurrent(final String topic) {
+        return adminToolExecute(new AdminToolHandler() {
+            @Override public AdminToolResult doExecute() throws Exception {
+                final TopicStatsTable topicStatsTable = new TopicStatsTable();
+                TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
+
+                if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
+                    return AdminToolResult.success(topicStatsTable);
+                }
+                final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
+                for (final BrokerData bd : topicRouteData.getBrokerDatas()) {
+                    threadPoolExecutor.submit(new Runnable() {
+                        @Override public void run() {
+                            try {
+                                String addr = bd.selectBrokerAddr();
+                                if (addr != null) {
+                                    TopicStatsTable tst = mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis);
+                                    topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
+                                }
+                            } catch (Exception e) {
+                                log.error("getTopicStatsInfo error. topic=" + topic, e);
+                            } finally {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+                latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+
+                return AdminToolResult.success(topicStatsTable);
+            }
+        });
+    }
+
+    @Override public TopicStatsTable examineTopicStats(String brokerAddr,
+        String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         return this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, timeoutMillis);
     }
 
-    @Override
-    public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+    @Override public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
     }
 
-    @Override
-    public TopicList fetchTopicsByCLuster(
+    @Override public TopicList fetchTopicsByCLuster(
         String clusterName) throws RemotingException, MQClientException, InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().getTopicsByCluster(clusterName, timeoutMillis);
     }
 
-    @Override
-    public KVTable fetchBrokerRuntimeStats(
-        final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
-        RemotingTimeoutException, InterruptedException, MQBrokerException {
+    @Override public KVTable fetchBrokerRuntimeStats(
+        final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
         return this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(brokerAddr, timeoutMillis);
     }
 
-    @Override
-    public ConsumeStats examineConsumeStats(
-        String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
-        MQBrokerException {
+    @Override public ConsumeStats examineConsumeStats(
+        String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         return examineConsumeStats(consumerGroup, null);
     }
 
-    @Override
-    public ConsumeStats examineConsumeStats(String consumerGroup,
-        String topic) throws RemotingException, MQClientException,
-        InterruptedException, MQBrokerException {
-        String retryTopic = MixAll.getRetryTopic(consumerGroup);
-        TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
+    @Override public ConsumeStats examineConsumeStats(String consumerGroup,
+        String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        TopicRouteData topicRouteData = null;
+        List<String> routeTopics = new ArrayList<>();
+        routeTopics.add(MixAll.getRetryTopic(consumerGroup));
+        if (topic != null) {
+            routeTopics.add(topic);
+            routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, consumerGroup));
+        }
+        for (int i = 0; i < routeTopics.size(); i++) {
+            try {
+                topicRouteData = this.examineTopicRouteInfo(routeTopics.get(i));
+                if (topicRouteData != null) {
+                    break;
+                }
+            } catch (Throwable e) {
+                if (i == routeTopics.size() - 1) {
+                    throw e;
+                }
+            }
+        }
         ConsumeStats result = new ConsumeStats();
 
         for (BrokerData bd : topicRouteData.getBrokerDatas()) {
             String addr = bd.selectBrokerAddr();
             if (addr != null) {
-                ConsumeStats consumeStats =
-                    this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
+                ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
                 result.getOffsetTable().putAll(consumeStats.getOffsetTable());
                 double value = result.getConsumeTps() + consumeStats.getConsumeTps();
                 result.setConsumeTps(value);
@@ -328,7 +404,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         }
 
         Set<String> topics = new HashSet<>();
-        for (MessageQueue messageQueue: result.getOffsetTable().keySet()) {
+        for (MessageQueue messageQueue : result.getOffsetTable().keySet()) {
             topics.add(messageQueue.getTopic());
         }
 
@@ -337,7 +413,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         // for topic, we put the physical stats, how about group?
         // staticResult.getOffsetTable().putAll(result.getOffsetTable());
 
-        for (String currentTopic: topics) {
+        for (String currentTopic : topics) {
             TopicRouteData currentRoute = this.examineTopicRouteInfo(currentTopic);
             Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, defaultMQAdminExt);
             ConsumeStats consumeStats = MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result);
@@ -345,27 +421,84 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         }
 
         if (staticResult.getOffsetTable().isEmpty()) {
-            throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
-                "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
+            throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
         }
 
         return staticResult;
     }
 
     @Override
-    public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException {
-        return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis);
+    public AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(final String consumerGroup, final String topic) {
+
+        return adminToolExecute(new AdminToolHandler() {
+            @Override public AdminToolResult doExecute() throws Exception {
+                TopicRouteData topicRouteData = null;
+                List<String> routeTopics = new ArrayList<>();
+                routeTopics.add(MixAll.getRetryTopic(consumerGroup));
+                if (topic != null) {
+                    routeTopics.add(topic);
+                    routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, consumerGroup));
+                }
+                for (int i = 0; i < routeTopics.size(); i++) {
+                    try {
+                        topicRouteData = examineTopicRouteInfo(routeTopics.get(i));
+                        if (topicRouteData != null) {
+                            break;
+                        }
+                    } catch (Throwable e) {
+                        continue;
+                    }
+                }
+                if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
+                    return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, "topic router info not found");
+                }
+
+                final ConsumeStats result = new ConsumeStats();
+                final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
+                final Map<String, Double> consumerTpsMap = new ConcurrentHashMap<>(topicRouteData.getBrokerDatas().size());
+                for (final BrokerData bd : topicRouteData.getBrokerDatas()) {
+                    threadPoolExecutor.submit(new Runnable() {
+                        @Override public void run() {
+                            try {
+                                String addr = bd.selectBrokerAddr();
+                                if (addr != null) {
+                                    ConsumeStats consumeStats = mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis);
+                                    result.getOffsetTable().putAll(consumeStats.getOffsetTable());
+                                    consumerTpsMap.put(addr, consumeStats.getConsumeTps());
+                                }
+                            } catch (Exception e) {
+                                log.error("getTopicStatsInfo error. topic=" + topic, e);
+                            } finally {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+                latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+
+                for (Double tps : consumerTpsMap.values()) {
+                    result.setConsumeTps(result.getConsumeTps() + tps);
+                }
+
+                if (result.getOffsetTable().isEmpty()) {
+                    AdminToolResult.failure(AdminToolsResultCodeEnum.CONSUMER_NOT_ONLINE, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
+                }
+                return AdminToolResult.success(result);
+            }
+        });
     }
 
     @Override
-    public TopicRouteData examineTopicRouteInfo(
+    public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis);
+    }
+
+    @Override public TopicRouteData examineTopicRouteInfo(
         String topic) throws RemotingException, MQClientException, InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
     }
 
-    @Override
-    public MessageExt viewMessage(String topic,
+    @Override public MessageExt viewMessage(String topic,
         String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
             MessageDecoder.decodeMessageId(msgId);
@@ -376,8 +509,18 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId);
     }
 
-    @Override
-    public ConsumerConnection examineConsumerConnectionInfo(
+    @Override public MessageExt queryMessage(String clusterName, String topic,
+        String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        try {
+            MessageDecoder.decodeMessageId(msgId);
+            return this.viewMessage(msgId);
+        } catch (Exception e) {
+            log.warn("the msgId maybe created by new client. msgId={}", msgId, e);
+        }
+        return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(clusterName, topic, msgId);
+    }
+
+    @Override public ConsumerConnection examineConsumerConnectionInfo(
         String consumerGroup) throws InterruptedException, MQBrokerException,
         RemotingException, MQClientException {
         ConsumerConnection result = new ConsumerConnection();
@@ -415,10 +558,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return result;
     }
 
-    @Override
-    public ProducerConnection examineProducerConnectionInfo(String producerGroup,
-        final String topic) throws RemotingException,
-        MQClientException, InterruptedException, MQBrokerException {
+    @Override public ProducerConnection examineProducerConnectionInfo(String producerGroup,
+        final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         ProducerConnection result = new ProducerConnection();
         List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas();
         BrokerData brokerData = brokers.get(random.nextInt(brokers.size()));
@@ -438,14 +579,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return result;
     }
 
-    @Override
-    public List<String> getNameServerAddressList() {
+    @Override public List<String> getNameServerAddressList() {
         return this.mqClientInstance.getMQClientAPIImpl().getNameServerAddressList();
     }
 
-    @Override
-    public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
-        RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+    @Override public int wipeWritePermOfBroker(final String namesrvAddr,
+        String brokerName) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
         return this.mqClientInstance.getMQClientAPIImpl().wipeWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis);
     }
 
@@ -455,35 +594,71 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return this.mqClientInstance.getMQClientAPIImpl().addWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis);
     }
 
-    @Override
-    public void putKVConfig(String namespace, String key, String value) {
+    @Override public void putKVConfig(String namespace, String key, String value) {
     }
 
-    @Override
-    public String getKVConfig(String namespace,
+    @Override public String getKVConfig(String namespace,
         String key) throws RemotingException, MQClientException, InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(namespace, key, timeoutMillis);
     }
 
-    @Override
-    public KVTable getKVListByNamespace(
+    @Override public KVTable getKVListByNamespace(
         String namespace) throws RemotingException, MQClientException, InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().getKVListByNamespace(namespace, timeoutMillis);
     }
 
-    @Override
-    public void deleteTopicInBroker(Set<String> addrs,
-        String topic) throws RemotingException, MQBrokerException, InterruptedException,
-        MQClientException {
+    @Override public void deleteTopic(String topicName,
+        String clusterName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        Set<String> brokerAddressSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(this.defaultMQAdminExt, clusterName);
+        this.deleteTopicInBroker(brokerAddressSet, topicName);
+        List<String> nameServerList = this.getNameServerAddressList();
+        Set<String> nameServerSet = new HashSet<String>(nameServerList);
+        this.deleteTopicInNameServer(nameServerSet, topicName);
+        for (String namespace : this.kvNamespaceToDeleteList) {
+            this.deleteKvConfig(namespace, topicName);
+        }
+    }
+
+    @Override public void deleteTopicInBroker(Set<String> addrs,
+        String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         for (String addr : addrs) {
             this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis);
         }
     }
 
-    @Override
-    public void deleteTopicInNameServer(Set<String> addrs,
-        String topic) throws RemotingException, MQBrokerException, InterruptedException,
-        MQClientException {
+    @Override public AdminToolResult<BrokerOperatorResult> deleteTopicInBrokerConcurrent(final Set<String> addrs,
+        final String topic) {
+        final List<String> successList = new CopyOnWriteArrayList<>();
+        final List<String> failureList = new CopyOnWriteArrayList<>();
+        final CountDownLatch latch = new CountDownLatch(addrs.size());
+        for (final String addr : addrs) {
+            threadPoolExecutor.submit(new Runnable() {
+                @Override public void run() {
+                    try {
+                        mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis);
+                        successList.add(addr);
+                    } catch (Exception e) {
+                        log.error("deleteTopicInBrokerConcurrent error. topic=" + topic + ", host=" + addr, e);
+                        failureList.add(addr);
+                    } finally {
+                        latch.countDown();
+                    }
+                }
+            });
+        }
+        try {
+            latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+        }
+
+        BrokerOperatorResult result = new BrokerOperatorResult();
+        result.setSuccessList(successList);
+        result.setFailureList(failureList);
+        return AdminToolResult.success(result);
+    }
+
+    @Override public void deleteTopicInNameServer(Set<String> addrs,
+        String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         if (addrs == null) {
             String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();
             addrs = new HashSet(Arrays.asList(ns.split(";")));
@@ -493,92 +668,82 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         }
     }
 
-    @Override
-    public void deleteSubscriptionGroup(String addr,
-        String groupName) throws RemotingException, MQBrokerException, InterruptedException,
-        MQClientException {
+    @Override public void deleteSubscriptionGroup(String addr,
+        String groupName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, false, timeoutMillis);
     }
 
-    @Override
-    public void deleteSubscriptionGroup(String addr,
-        String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException,
-        MQClientException {
+    @Override public void deleteSubscriptionGroup(String addr, String groupName,
+        boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, removeOffset, timeoutMillis);
     }
 
-    @Override
-    public void createAndUpdateKvConfig(String namespace, String key,
-        String value) throws RemotingException, MQBrokerException,
-        InterruptedException, MQClientException {
+    @Override public void createAndUpdateKvConfig(String namespace, String key,
+        String value) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(namespace, key, value, timeoutMillis);
     }
 
-    @Override
-    public void deleteKvConfig(String namespace,
-        String key) throws RemotingException, MQBrokerException, InterruptedException,
-        MQClientException {
+    @Override public void deleteKvConfig(String namespace,
+        String key) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(namespace, key, timeoutMillis);
     }
 
-    @Override
-    public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
-        boolean force)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+    @Override public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
+        boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
         List<RollbackStats> rollbackStatsList = new ArrayList<RollbackStats>();
-        Map<String, Integer> topicRouteMap = new HashMap<String, Integer>();
-        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
-            for (QueueData queueData : topicRouteData.getQueueDatas()) {
-                if (StringUtils.equals(queueData.getBrokerName(), bd.getBrokerName())) {
-                    topicRouteMap.put(bd.selectBrokerAddr(), queueData.getReadQueueNums());
-                }
-            }
+        Map<String, QueueData> topicRouteMap = new HashMap<String, QueueData>();
+        for (QueueData queueData : topicRouteData.getQueueDatas()) {
+            topicRouteMap.put(queueData.getBrokerName(), queueData);
         }
         for (BrokerData bd : topicRouteData.getBrokerDatas()) {
             String addr = bd.selectBrokerAddr();
             if (addr != null) {
-                ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, timeoutMillis);
-
-                boolean hasConsumed = false;
-                for (Map.Entry<MessageQueue, OffsetWrapper> entry : consumeStats.getOffsetTable().entrySet()) {
-                    MessageQueue queue = entry.getKey();
-                    OffsetWrapper offsetWrapper = entry.getValue();
-                    if (topic.equals(queue.getTopic())) {
-                        hasConsumed = true;
-                        RollbackStats rollbackStats = resetOffsetConsumeOffset(addr, consumerGroup, queue, offsetWrapper, timestamp, force);
-                        rollbackStatsList.add(rollbackStats);
-                    }
-                }
+                rollbackStatsList.addAll(resetOffsetByTimestampOld(addr, topicRouteMap.get(bd.getBrokerName()), consumerGroup, topic, timestamp, force));
+            }
+        }
+        return rollbackStatsList;
+    }
 
-                if (!hasConsumed) {
-                    HashMap<MessageQueue, TopicOffset> topicStatus =
-                        this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis).getOffsetTable();
-                    for (int i = 0; i < topicRouteMap.get(addr); i++) {
-                        MessageQueue queue = new MessageQueue(topic, bd.getBrokerName(), i);
-                        OffsetWrapper offsetWrapper = new OffsetWrapper();
-                        offsetWrapper.setBrokerOffset(topicStatus.get(queue).getMaxOffset());
-                        offsetWrapper.setConsumerOffset(topicStatus.get(queue).getMinOffset());
-
-                        RollbackStats rollbackStats = resetOffsetConsumeOffset(addr, consumerGroup, queue, offsetWrapper, timestamp, force);
-                        rollbackStatsList.add(rollbackStats);
-                    }
-                }
+    private List<RollbackStats> resetOffsetByTimestampOld(String brokerAddr, QueueData queueData, String consumerGroup,
+        String topic, long timestamp,
+        boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        List<RollbackStats> rollbackStatsList = new ArrayList<RollbackStats>();
+        ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(brokerAddr, consumerGroup, timeoutMillis);
+
+        boolean hasConsumed = false;
+        for (Map.Entry<MessageQueue, OffsetWrapper> entry : consumeStats.getOffsetTable().entrySet()) {
+            MessageQueue queue = entry.getKey();
+            OffsetWrapper offsetWrapper = entry.getValue();
+            if (topic.equals(queue.getTopic())) {
+                hasConsumed = true;
+                RollbackStats rollbackStats = resetOffsetConsumeOffset(brokerAddr, consumerGroup, queue, offsetWrapper, timestamp, force);
+                rollbackStatsList.add(rollbackStats);
+            }
+        }
+
+        if (!hasConsumed) {
+            HashMap<MessageQueue, TopicOffset> topicStatus = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, timeoutMillis).getOffsetTable();
+            for (int i = 0; i < queueData.getReadQueueNums(); i++) {
+                MessageQueue queue = new MessageQueue(topic, queueData.getBrokerName(), i);
+                OffsetWrapper offsetWrapper = new OffsetWrapper();
+                offsetWrapper.setBrokerOffset(topicStatus.get(queue).getMaxOffset());
+                offsetWrapper.setConsumerOffset(topicStatus.get(queue).getMinOffset());
+
+                RollbackStats rollbackStats = resetOffsetConsumeOffset(brokerAddr, consumerGroup, queue, offsetWrapper, timestamp, force);
+                rollbackStatsList.add(rollbackStats);
             }
         }
         return rollbackStatsList;
     }
 
-    @Override
-    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+    @Override public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp,
+        boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
     }
 
-    @Override
-    public void resetOffsetNew(String consumerGroup, String topic,
-        long timestamp) throws RemotingException, MQBrokerException,
-        InterruptedException, MQClientException {
+    @Override public void resetOffsetNew(String consumerGroup, String topic,
+        long timestamp) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
             this.resetOffsetByTimestamp(topic, consumerGroup, timestamp, true);
         } catch (MQClientException e) {
@@ -590,9 +755,76 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         }
     }
 
+    @Override
+    public AdminToolResult<BrokerOperatorResult> resetOffsetNewConcurrent(final String group, final String topic,
+        final long timestamp) {
+        return adminToolExecute(new AdminToolHandler() {
+            @Override public AdminToolResult doExecute() throws Exception {
+                TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
+                if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
+                    return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, "topic router info not found");
+                }
+                final Map<String, QueueData> topicRouteMap = new HashMap<String, QueueData>();
+                for (QueueData queueData : topicRouteData.getQueueDatas()) {
+                    topicRouteMap.put(queueData.getBrokerName(), queueData);
+                }
+
+                final CopyOnWriteArrayList successList = new CopyOnWriteArrayList();
+                final CopyOnWriteArrayList failureList = new CopyOnWriteArrayList();
+                final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
+                for (final BrokerData bd : topicRouteData.getBrokerDatas()) {
+                    threadPoolExecutor.submit(new Runnable() {
+                        @Override public void run() {
+                            String addr = bd.selectBrokerAddr();
+                            try {
+                                if (addr != null) {
+                                    Map<MessageQueue, Long> offsetTable = mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, true, timeoutMillis, false);
+                                    if (offsetTable != null) {
+                                        successList.add(addr);
+                                    } else {
+                                        failureList.add(addr);
+                                    }
+                                }
+                            } catch (MQClientException e) {
+                                if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+                                    try {
+                                        resetOffsetByTimestampOld(addr, topicRouteMap.get(bd.getBrokerName()), group, topic, timestamp, true);
+                                        successList.add(addr);
+                                    } catch (Exception e2) {
+                                        log.error(MessageFormat.format("resetOffsetByTimestampOld error. addr={0}, topic={1}, group={2},timestamp={3}", addr, topic, group, timestamp), e);
+                                        failureList.add(addr);
+                                    }
+                                } else if (ResponseCode.SYSTEM_ERROR == e.getResponseCode()) {
+                                    // CODE: 1  DESC: THe consumer group <GID_newggghh> not exist, never online
+                                    successList.add(addr);
+                                } else {
+                                    failureList.add(addr);
+                                    log.error(MessageFormat.format("resetOffsetNewConcurrent error. addr={0}, topic={1}, group={2},timestamp={3}", addr, topic, group, timestamp), e);
+                                }
+                            } catch (Exception e) {
+                                failureList.add(addr);
+                                log.error(MessageFormat.format("resetOffsetNewConcurrent error. addr={0}, topic={1}, group={2},timestamp={3}", addr, topic, group, timestamp), e);
+                            } finally {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+                latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+                BrokerOperatorResult result = new BrokerOperatorResult();
+                result.setSuccessList(successList);
+                result.setFailureList(failureList);
+                if (successList.size() == topicRouteData.getBrokerDatas().size()) {
+                    return AdminToolResult.success(result);
+                } else {
+                    return AdminToolResult.failure(AdminToolsResultCodeEnum.MQ_BROKER_ERROR, "operator failure", result);
+                }
+            }
+        });
+    }
+
     public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
-        boolean isC)
-        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
         List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
         Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
@@ -600,9 +832,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
             for (BrokerData brokerData : brokerDatas) {
                 String addr = brokerData.selectBrokerAddr();
                 if (addr != null) {
-                    Map<MessageQueue, Long> offsetTable =
-                        this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
-                            timeoutMillis, isC);
+                    Map<MessageQueue, Long> offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC);
                     if (offsetTable != null) {
                         allOffsetTable.putAll(offsetTable);
                     }
@@ -613,16 +843,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue queue,
-        OffsetWrapper offsetWrapper,
-        long timestamp, boolean force) throws RemotingException, InterruptedException, MQBrokerException {
+        OffsetWrapper offsetWrapper, long timestamp,
+        boolean force) throws RemotingException, InterruptedException, MQBrokerException {
         long resetOffset;
         if (timestamp == -1) {
 
             resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis);
         } else {
-            resetOffset =
-                this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp,
-                    timeoutMillis);
+            resetOffset = this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, timeoutMillis);
         }
 
         RollbackStats rollbackStats = new RollbackStats();
@@ -645,36 +873,28 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return rollbackStats;
     }
 
-    @Override
-    public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
-        String clientAddr) throws RemotingException,
-        MQBrokerException, InterruptedException, MQClientException {
+    @Override public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
+        String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
         List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
         if (brokerDatas != null && brokerDatas.size() > 0) {
             String addr = brokerDatas.get(0).selectBrokerAddr();
             if (addr != null) {
-                return this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToGetConsumerStatus(addr, topic, group, clientAddr,
-                    timeoutMillis);
+                return this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToGetConsumerStatus(addr, topic, group, clientAddr, timeoutMillis);
             }
         }
         return Collections.EMPTY_MAP;
     }
 
-    @Override
-    public void createOrUpdateOrderConf(String key, String value,
-        boolean isCluster) throws RemotingException, MQBrokerException,
-        InterruptedException, MQClientException {
+    @Override public void createOrUpdateOrderConf(String key, String value,
+        boolean isCluster) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
 
         if (isCluster) {
-            this.mqClientInstance.getMQClientAPIImpl()
-                .putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, value, timeoutMillis);
+            this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, value, timeoutMillis);
         } else {
             String oldOrderConfs = null;
             try {
-                oldOrderConfs =
-                    this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key,
-                        timeoutMillis);
+                oldOrderConfs = this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, timeoutMillis);
             } catch (Exception e) {
                 e.printStackTrace();
             }
@@ -696,15 +916,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
                 newOrderConf.append(splitor).append(entry.getValue());
                 splitor = ";";
             }
-            this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key,
-                newOrderConf.toString(), timeoutMillis);
+            this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, newOrderConf.toString(), timeoutMillis);
         }
     }
 
-    @Override
-    public GroupList queryTopicConsumeByWho(
-        String topic) throws InterruptedException, MQBrokerException, RemotingException,
-        MQClientException {
+    @Override public GroupList queryTopicConsumeByWho(
+        String topic) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
         TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
 
         for (BrokerData bd : topicRouteData.getBrokerDatas()) {
@@ -712,6 +929,19 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
             if (addr != null) {
                 return this.mqClientInstance.getMQClientAPIImpl().queryTopicConsumeByWho(addr, topic, timeoutMillis);
             }
+        }
+        return null;
+    }
+
+    @Override public SubscriptionData querySubscription(String group,
+        String topic) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String addr = bd.selectBrokerAddr();
+            if (addr != null) {
+                return this.mqClientInstance.getMQClientAPIImpl().querySubscriptionByConsumer(addr, group, topic, timeoutMillis);
+            }
 
             break;
         }
@@ -719,10 +949,61 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return null;
     }
 
-    @Override
-    public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
-        final String group) throws InterruptedException, MQBrokerException,
-        RemotingException, MQClientException {
+    @Override public TopicList queryTopicsByConsumer(
+        String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        String retryTopic = MixAll.getRetryTopic(group);
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
+        TopicList result = new TopicList();
+
+        //Query all brokers
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String addr = bd.selectBrokerAddr();
+            if (addr != null) {
+                TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().queryTopicsByConsumer(addr, group, timeoutMillis);
+                result.getTopicList().addAll(topicList.getTopicList());
+            }
+        }
+
+        return result;
+    }
+
+    @Override public AdminToolResult<TopicList> queryTopicsByConsumerConcurrent(final String group) {
+        return adminToolExecute(new AdminToolHandler() {
+            @Override public AdminToolResult doExecute() throws Exception {
+                String retryTopic = MixAll.getRetryTopic(group);
+                TopicRouteData topicRouteData = examineTopicRouteInfo(retryTopic);
+
+                if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
+                    return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, "router info not found.");
+                }
+                final TopicList result = new TopicList();
+                final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
+                for (final BrokerData bd : topicRouteData.getBrokerDatas()) {
+                    threadPoolExecutor.submit(new Runnable() {
+                        @Override public void run() {
+                            try {
+                                String addr = bd.selectBrokerAddr();
+                                if (addr != null) {
+                                    TopicList topicList = mqClientInstance.getMQClientAPIImpl().queryTopicsByConsumer(addr, group, timeoutMillis);
+                                    result.getTopicList().addAll(topicList.getTopicList());
+                                }
+                            } catch (Exception e) {
+                                log.error("getTopicStatsInfo error. groupId=" + group, e);
+                            } finally {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+                latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+
+                return AdminToolResult.success(result);
+            }
+        });
+    }
+
+    @Override public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
+        final String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
         List<QueueTimeSpan> spanSet = new ArrayList<QueueTimeSpan>();
         TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
         for (BrokerData bd : topicRouteData.getBrokerDatas()) {
@@ -735,9 +1016,41 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public boolean cleanExpiredConsumerQueue(
-        String cluster) throws RemotingConnectException, RemotingSendRequestException,
-        RemotingTimeoutException, MQClientException, InterruptedException {
+    public AdminToolResult<List<QueueTimeSpan>> queryConsumeTimeSpanConcurrent(final String topic, final String group) {
+        return adminToolExecute(new AdminToolHandler() {
+            @Override public AdminToolResult doExecute() throws Exception {
+                final List<QueueTimeSpan> spanSet = new ArrayList<QueueTimeSpan>();
+                TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
+
+                if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
+                    return AdminToolResult.success(spanSet);
+                }
+                final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
+                for (final BrokerData bd : topicRouteData.getBrokerDatas()) {
+                    threadPoolExecutor.submit(new Runnable() {
+                        @Override public void run() {
+                            try {
+                                String addr = bd.selectBrokerAddr();
+                                if (addr != null) {
+                                    spanSet.addAll(mqClientInstance.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, group, timeoutMillis));
+                                }
+                            } catch (Exception e) {
+                                log.error("queryConsumeTimeSpan error. topic=" + topic, e);
+                            } finally {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+                latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+
+                return AdminToolResult.success(spanSet);
+            }
+        });
+    }
+
+    @Override public boolean cleanExpiredConsumerQueue(
+        String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         boolean result = false;
         try {
             ClusterInfo clusterInfo = examineBrokerClusterInfo();
@@ -756,8 +1069,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo,
-        String cluster) throws RemotingConnectException,
-        RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         boolean result = false;
         String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
         for (String addr : addrs) {
@@ -766,18 +1078,15 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return result;
     }
 
-    @Override
-    public boolean cleanExpiredConsumerQueueByAddr(
-        String addr) throws RemotingConnectException, RemotingSendRequestException,
-        RemotingTimeoutException, MQClientException, InterruptedException {
+    @Override public boolean cleanExpiredConsumerQueueByAddr(
+        String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         boolean result = mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(addr, timeoutMillis);
         log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result);
         return result;
     }
 
-    @Override
-    public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
-        RemotingTimeoutException, MQClientException, InterruptedException {
+    @Override public boolean cleanUnusedTopic(
+        String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         boolean result = false;
         try {
             ClusterInfo clusterInfo = examineBrokerClusterInfo();
@@ -795,8 +1104,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return result;
     }
 
-    public boolean cleanUnusedTopicByCluster(ClusterInfo clusterInfo, String cluster) throws RemotingConnectException,
-        RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+    public boolean cleanUnusedTopicByCluster(ClusterInfo clusterInfo,
+        String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         boolean result = false;
         String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
         for (String addr : addrs) {
@@ -805,18 +1114,20 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return result;
     }
 
-    @Override
-    public boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
-        RemotingTimeoutException, MQClientException, InterruptedException {
+    @Override public boolean cleanUnusedTopicByAddr(
+        String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         boolean result = mqClientInstance.getMQClientAPIImpl().cleanUnusedTopicByAddr(addr, timeoutMillis);
         log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result);
         return result;
     }
 
-    @Override
-    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId,
-        boolean jstack) throws RemotingException,
-        MQClientException, InterruptedException {
+    @Override public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId,
+        boolean jstack) throws RemotingException, MQClientException, InterruptedException {
+        return this.getConsumerRunningInfo(consumerGroup, clientId, jstack, false);
+    }
+
+    @Override public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack,
+        boolean metrics) throws RemotingException, MQClientException, InterruptedException {
         String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
         TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
         List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
@@ -824,21 +1135,18 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
             for (BrokerData brokerData : brokerDatas) {
                 String addr = brokerData.selectBrokerAddr();
                 if (addr != null) {
-                    return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack,
-                        timeoutMillis * 3);
+                    return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack, timeoutMillis);
                 }
             }
         }
         return null;
     }
 
-    @Override
-    public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId)
-        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+    @Override public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId,
+        String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         MessageExt msg = this.viewMessage(msgId);
 
-        return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
-            consumerGroup, clientId, msgId, timeoutMillis * 3);
+        return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, msg.getTopic(), msgId, timeoutMillis);
     }
 
     @Override
@@ -847,19 +1155,15 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         MessageExt msg = this.viewMessage(topic, msgId);
         if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
-            return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
-                consumerGroup, clientId, msgId, timeoutMillis * 3);
+            return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, topic, msgId, timeoutMillis);
         } else {
             MessageClientExt msgClient = (MessageClientExt) msg;
-            return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
-                consumerGroup, clientId, msgClient.getOffsetMsgId(), timeoutMillis * 3);
+            return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, topic, msgClient.getOffsetMsgId(), timeoutMillis);
         }
     }
 
-    @Override
-    public List<MessageTrack> messageTrackDetail(
-        MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
-        MQBrokerException {
+    @Override public List<MessageTrack> messageTrackDetail(
+        MessageExt msg) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         List<MessageTrack> result = new ArrayList<MessageTrack>();
 
         GroupList groupList = this.queryTopicConsumeByWho(msg.getTopic());
@@ -919,9 +1223,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
                         while (it.hasNext()) {
                             Entry<String, SubscriptionData> next = it.next();
                             if (next.getKey().equals(msg.getTopic())) {
-                                if (next.getValue().getTagsSet().contains(msg.getTags())
-                                    || next.getValue().getTagsSet().contains("*")
-                                    || next.getValue().getTagsSet().isEmpty()) {
+                                if (next.getValue().getTagsSet().contains(msg.getTags()) || next.getValue().getTagsSet().contains("*") || next.getValue().getTagsSet().isEmpty()) {
                                 } else {
                                     mt.setTrackType(TrackType.CONSUMED_BUT_FILTERED);
                                 }
@@ -940,9 +1242,107 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return result;
     }
 
+    @Override public List<MessageTrack> messageTrackDetailConcurrent(
+        final MessageExt msg) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        final List<MessageTrack> result = new ArrayList<MessageTrack>();
+
+        GroupList groupList = this.queryTopicConsumeByWho(msg.getTopic());
+
+        final CountDownLatch countDownLatch = new CountDownLatch(groupList.getGroupList().size());
+
+        for (final String group : groupList.getGroupList()) {
+
+            threadPoolExecutor.submit(new Runnable() {
+                @Override public void run() {
+                    MessageTrack mt = new MessageTrack();
+                    mt.setConsumerGroup(group);
+                    mt.setTrackType(TrackType.UNKNOWN);
+                    ConsumerConnection cc = null;
+                    try {
+                        cc = DefaultMQAdminExtImpl.this.examineConsumerConnectionInfo(group);
+                    } catch (MQBrokerException e) {
+                        if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+                            mt.setTrackType(TrackType.NOT_ONLINE);
+                        }
+                        mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
+                        result.add(mt);
+                        countDownLatch.countDown();
+                        return;
+                    } catch (Exception e) {
+                        mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
+                        result.add(mt);
+                        countDownLatch.countDown();
+                        return;
+                    }
+
+                    switch (cc.getConsumeType()) {
+                        case CONSUME_ACTIVELY:
+                            mt.setTrackType(TrackType.PULL);
+                            break;
+                        case CONSUME_PASSIVELY:
+                            boolean ifConsumed = false;
+                            try {
+                                ifConsumed = DefaultMQAdminExtImpl.this.consumed(msg, group);
+                            } catch (MQClientException e) {
+                                if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+                                    mt.setTrackType(TrackType.NOT_ONLINE);
+                                }
+                                mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
+                                result.add(mt);
+                                countDownLatch.countDown();
+                                return;
+                            } catch (MQBrokerException e) {
+                                if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+                                    mt.setTrackType(TrackType.NOT_ONLINE);
+                                }
+                                mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
+                                result.add(mt);
+                                countDownLatch.countDown();
+                                return;
+                            } catch (Exception e) {
+                                mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
+                                result.add(mt);
+                                countDownLatch.countDown();
+                                return;
+                            }
+
+                            if (ifConsumed) {
+                                mt.setTrackType(TrackType.CONSUMED);
+                                Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator();
+                                while (it.hasNext()) {
+                                    Entry<String, SubscriptionData> next = it.next();
+                                    if (next.getKey().equals(msg.getTopic())) {
+                                        if (next.getValue().getTagsSet().contains(msg.getTags()) || next.getValue().getTagsSet().contains("*") || next.getValue().getTagsSet().isEmpty()) {
+                                        } else {
+                                            mt.setTrackType(TrackType.CONSUMED_BUT_FILTERED);
+                                        }
+                                    }
+                                }
+                            } else {
+                                mt.setTrackType(TrackType.NOT_CONSUME_YET);
+                            }
+                            break;
+                        default:
+                            break;
+                    }
+                    result.add(mt);
+                    countDownLatch.countDown();
+                    return;
+                }
+            });
+        }
+
+        countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+
+        return result;
+    }
+
+    public static void main(String[] args) {
+        Arrays.asList(null);
+    }
+
     public boolean consumed(final MessageExt msg,
-        final String group) throws RemotingException, MQClientException, InterruptedException,
-        MQBrokerException {
+        final String group) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
 
         ConsumeStats cstats = this.examineConsumeStats(group);
 
@@ -968,45 +1368,67 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return false;
     }
 
-    @Override
-    public void cloneGroupOffset(String srcGroup, String destGroup, String topic,
-        boolean isOffline) throws RemotingException,
-        MQClientException, InterruptedException, MQBrokerException {
+    public boolean consumedConcurrent(final MessageExt msg,
+        final String group) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+
+        AdminToolResult<ConsumeStats> cstats = this.examineConsumeStatsConcurrent(group, null);
+
+        if (!cstats.isSuccess()) {
+            throw new MQClientException(cstats.getCode(), cstats.getErrorMsg());
+        }
+
+        ClusterInfo ci = this.examineBrokerClusterInfo();
+
+        if (cstats.isSuccess()) {
+            for (Entry<MessageQueue, OffsetWrapper> next : cstats.getData().getOffsetTable().entrySet()) {
+                MessageQueue mq = next.getKey();
+                if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
+                    BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
+                    if (brokerData != null) {
+                        String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+                        if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
+                            if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
+                                return true;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return false;
+    }
+
+    @Override public void cloneGroupOffset(String srcGroup, String destGroup, String topic,
+        boolean isOffline) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         String retryTopic = MixAll.getRetryTopic(srcGroup);
         TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
 
         for (BrokerData bd : topicRouteData.getBrokerDatas()) {
             String addr = bd.selectBrokerAddr();
             if (addr != null) {
-                this.mqClientInstance.getMQClientAPIImpl().cloneGroupOffset(addr, srcGroup, destGroup, topic, isOffline, timeoutMillis * 3);
+                this.mqClientInstance.getMQClientAPIImpl().cloneGroupOffset(addr, srcGroup, destGroup, topic, isOffline, timeoutMillis);
             }
         }
     }
 
-    @Override
-    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
-        String statsKey) throws RemotingConnectException,
-        RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+    @Override public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
+        String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().viewBrokerStatsData(brokerAddr, statsName, statsKey, timeoutMillis);
     }
 
-    @Override
-    public Set<String> getClusterList(String topic) throws RemotingConnectException, RemotingSendRequestException,
-        RemotingTimeoutException, MQClientException, InterruptedException {
+    @Override public Set<String> getClusterList(
+        String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().getClusterList(topic, timeoutMillis);
     }
 
-    @Override
-    public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, long timeoutMillis)
-        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
-        InterruptedException {
+    @Override public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder,
+        long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
     }
 
-    @Override
-    public Set<String> getTopicClusterList(
-        final String topic) throws InterruptedException, MQBrokerException, MQClientException,
-        RemotingException {
+    @Override public Set<String> getTopicClusterList(
+        final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
         Set<String> clusterSet = new HashSet<String>();
         ClusterInfo clusterInfo = examineBrokerClusterInfo();
         TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
@@ -1022,22 +1444,16 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return clusterSet;
     }
 
-    @Override
-    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
-        long timeoutMillis) throws InterruptedException,
-        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+    @Override public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         return this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
     }
 
-    @Override
-    public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
-        long timeoutMillis) throws InterruptedException,
-        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
-        SubscriptionGroupWrapper subscriptionGroupWrapper = this.mqClientInstance.getMQClientAPIImpl()
-            .getAllSubscriptionGroup(brokerAddr, timeoutMillis);
-
-        Iterator<Entry<String, SubscriptionGroupConfig>> iterator = subscriptionGroupWrapper.getSubscriptionGroupTable()
-            .entrySet().iterator();
+    @Override public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        SubscriptionGroupWrapper subscriptionGroupWrapper = this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
+
+        Iterator<Entry<String, SubscriptionGroupConfig>> iterator = subscriptionGroupWrapper.getSubscriptionGroupTable().entrySet().iterator();
         while (iterator.hasNext()) {
             Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
             if (MixAll.isSysConsumerGroup(configEntry.getKey()) || SYSTEM_GROUP_SET.contains(configEntry.getKey())) {
@@ -1048,22 +1464,16 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return subscriptionGroupWrapper;
     }
 
-    @Override
-    public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
-        long timeoutMillis) throws InterruptedException,
-        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+    @Override public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis);
     }
 
-    @Override
-    public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
-        long timeoutMillis) throws InterruptedException, RemotingException,
-        MQBrokerException, MQClientException {
+    @Override public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
+        long timeoutMillis) throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
         TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.getAllTopicConfig(brokerAddr, timeoutMillis);
-        TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(brokerAddr,
-            timeoutMillis);
-        Iterator<Entry<String, TopicConfig>> iterator = topicConfigSerializeWrapper.getTopicConfigTable().entrySet()
-            .iterator();
+        TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(brokerAddr, timeoutMillis);
+        Iterator<Entry<String, TopicConfig>> iterator = topicConfigSerializeWrapper.getTopicConfigTable().entrySet().iterator();
         while (iterator.hasNext()) {
             String topic = iterator.next().getKey();
             if (topicList.getTopicList().contains(topic) || (!specialTopic && (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)))) {
@@ -1073,64 +1483,50 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return topicConfigSerializeWrapper;
     }
 
-    @Override
-    public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+    @Override public void createTopic(String key, String newTopic, int queueNum,
+        Map<String, String> attributes) throws MQClientException {
         createTopic(key, newTopic, queueNum, 0, null);
     }
 
-    @Override
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
+    @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag,
+        Map<String, String> attributes) throws MQClientException {
         this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, attributes);
     }
 
-    @Override
-    public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
+    @Override public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
         final TopicQueueMappingDetail mappingDetail,
         final boolean force) throws RemotingException, InterruptedException, MQBrokerException {
         this.mqClientInstance.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
     }
 
-    @Override
-    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+    @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
     }
 
-    @Override
-    public long maxOffset(MessageQueue mq) throws MQClientException {
+    @Override public long maxOffset(MessageQueue mq) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
     }
 
-    @Override
-    public long minOffset(MessageQueue mq) throws MQClientException {
+    @Override public long minOffset(MessageQueue mq) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().minOffset(mq);
     }
 
-    @Override
-    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+    @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().earliestMsgStoreTime(mq);
     }
 
-    @Override
-    public MessageExt viewMessage(
+    @Override public MessageExt viewMessage(
         String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         return this.mqClientInstance.getMQAdminImpl().viewMessage(msgId);
     }
 
-    @Override
-    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-        throws MQClientException, InterruptedException {
-
-        return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
-    }
-
-    public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin,
+    @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
         long end) throws MQClientException, InterruptedException {
 
-        return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, key, maxNum, begin, end);
+        return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
     }
 
-    @Override
-    public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
+    @Override public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
         long offset) throws RemotingException, InterruptedException, MQBrokerException {
         UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
         requestHeader.setConsumerGroup(consumeGroup);
@@ -1140,41 +1536,31 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
     }
 
-    @Override
-    public void updateNameServerConfig(final Properties properties, final List<String> nameServers)
-        throws InterruptedException, RemotingConnectException,
-        UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException,
-        MQClientException, MQBrokerException {
+    @Override public void updateNameServerConfig(final Properties properties,
+        final List<String> nameServers) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
         this.mqClientInstance.getMQClientAPIImpl().updateNameServerConfig(properties, nameServers, timeoutMillis);
     }
 
-    @Override
-    public Map<String, Properties> getNameServerConfig(final List<String> nameServers)
-        throws InterruptedException, RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException, MQClientException,
-        UnsupportedEncodingException {
+    @Override public Map<String, Properties> getNameServerConfig(
+        final List<String> nameServers) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
         return this.mqClientInstance.getMQClientAPIImpl().getNameServerConfig(nameServers, timeoutMillis);
     }
 
     @Override
     public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index,
-        int count, String consumerGroup)
-        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
-        return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(
-            brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis
-        );
+        int count,
+        String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+        return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis);
     }
 
-    @Override
-    public boolean resumeCheckHalfMessage(String msgId)
-        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+    @Override public boolean resumeCheckHalfMessage(
+        String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         MessageExt msg = this.viewMessage(msgId);
 
         return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis);
     }
 
-    @Override
-    public boolean resumeCheckHalfMessage(final String topic,
+    @Override public boolean resumeCheckHalfMessage(final String topic,
         final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         MessageExt msg = this.viewMessage(topic, msgId);
         if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
@@ -1185,11 +1571,67 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         }
     }
 
-    @Override
-    public void setMessageRequestMode(final String brokerAddr, final String topic, final String consumerGroup, final
-        MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis)
-        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
-        RemotingConnectException, MQClientException {
+    @Override public void setMessageRequestMode(final String brokerAddr, final String topic, final String consumerGroup,
+        final MessageRequestMode mode, final int popShareQueueNum,
+        final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
         this.mqClientInstance.getMQClientAPIImpl().setMessageRequestMode(brokerAddr, topic, consumerGroup, mode, popShareQueueNum, timeoutMillis);
     }
+
+    @Override
+    public long searchOffset(final String brokerAddr, final String topicName, final int queueId, final long timestamp,
+        final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
+        return this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, topicName, queueId, timestamp, timeoutMillis);
+    }
+
+    public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin,
+        long end) throws MQClientException, InterruptedException {
+
+        return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, key, maxNum, begin, end);
+    }
+
+    @Override
+    public void resetOffsetByQueueId(final String brokerAddr, final String consumeGroup, final String topicName,
+        final int queueId, final long resetOffset) throws RemotingException, InterruptedException, MQBrokerException {
+        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
+        requestHeader.setConsumerGroup(consumeGroup);
+        requestHeader.setTopic(topicName);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setCommitOffset(resetOffset);
+        this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
+    }
+
+    @Override public HARuntimeInfo getBrokerHAStatus(
+        String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
+        return this.mqClientInstance.getMQClientAPIImpl().getBrokerHAStatus(brokerAddr, timeoutMillis);
+    }
+
+    @Override public void resetMasterFlushOffset(String brokerAddr,
+        long masterFlushOffset) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        this.mqClientInstance.getMQClientAPIImpl().resetMasterFlushOffset(brokerAddr, masterFlushOffset);
+    }
+
+    @Override
+    public GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, String groupName, String topicName,
+        Boolean readable) throws RemotingException, InterruptedException, MQBrokerException {
+        UpdateGroupForbiddenRequestHeader requestHeader = new UpdateGroupForbiddenRequestHeader();
+        requestHeader.setGroup(groupName);
+        requestHeader.setTopic(topicName);
+        requestHeader.setReadable(readable);
+        return this.mqClientInstance.getMQClientAPIImpl().updateAndGetGroupForbidden(brokerAddr, requestHeader, timeoutMillis);
+    }
+
+    @Override public void deleteTopicInNameServer(Set<String> addrs, String clusterName,
+        String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        if (addrs == null) {
+            String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();
+            addrs = new HashSet(Arrays.asList(ns.split(";")));
+        }
+        for (String addr : addrs) {
+            this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(addr, clusterName, topic, timeoutMillis);
+        }
+    }
+
+    public MQClientInstance getMqClientInstance() {
+        return mqClientInstance;
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 33d580d..5ed147c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
@@ -43,14 +44,17 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.subscription.GroupForbidden;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 
 import java.io.UnsupportedEncodingException;
@@ -58,12 +62,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import org.apache.rocketmq.tools.admin.common.AdminToolResult;
 
 public interface MQAdminExt extends MQAdmin {
     void start() throws MQClientException;
 
     void shutdown();
 
+    void addBrokerToContainer(final String brokerContainerAddr, final String brokerConfig) throws InterruptedException,
+        MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+    void removeBrokerFromContainer(final String brokerContainerAddr, String clusterName, final String brokerName,
+        long brokerId) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
     void updateBrokerConfig(final String brokerAddr, final Properties properties) throws RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException;
 
@@ -98,7 +109,6 @@ public interface MQAdminExt extends MQAdmin {
 
     SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
 
-
     TopicStatsTable examineTopicStats(
         final String topic) throws RemotingException, MQClientException, InterruptedException,
         MQBrokerException;
@@ -106,6 +116,8 @@ public interface MQAdminExt extends MQAdmin {
     TopicStatsTable examineTopicStats(String brokerAddr, final String topic) throws RemotingException, MQClientException, InterruptedException,
             MQBrokerException;
 
+    AdminToolResult<TopicStatsTable> examineTopicStatsConcurrent(String topic);
+
     TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException;
 
     TopicList fetchTopicsByCLuster(
@@ -123,6 +135,8 @@ public interface MQAdminExt extends MQAdmin {
         final String topic) throws RemotingException, MQClientException,
         InterruptedException, MQBrokerException;
 
+    AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(String consumerGroup, String topic);
+
     ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException;
 
@@ -157,10 +171,20 @@ public interface MQAdminExt extends MQAdmin {
     KVTable getKVListByNamespace(
         final String namespace) throws RemotingException, MQClientException, InterruptedException;
 
+    void deleteTopic(final String topicName,
+        final String clusterName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
     void deleteTopicInBroker(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
 
+    AdminToolResult<BrokerOperatorResult> deleteTopicInBrokerConcurrent(Set<String> addrs, String topic);
+
+    void deleteTopicInNameServer(final Set<String> addrs,
+        final String topic) throws RemotingException, MQBrokerException,
+        InterruptedException, MQClientException;
+
     void deleteTopicInNameServer(final Set<String> addrs,
+        final String clusterName,
         final String topic) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
 
@@ -186,6 +210,9 @@ public interface MQAdminExt extends MQAdmin {
     void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
 
+    AdminToolResult<BrokerOperatorResult> resetOffsetNewConcurrent(final String group, final String topic,
+        final long timestamp);
+
     Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
         String clientAddr) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException;
@@ -197,10 +224,18 @@ public interface MQAdminExt extends MQAdmin {
     GroupList queryTopicConsumeByWho(final String topic) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, MQClientException;
 
+    TopicList queryTopicsByConsumer(final String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException;
+
+    AdminToolResult<TopicList> queryTopicsByConsumerConcurrent(final String group);
+
+    SubscriptionData querySubscription(final String group, final String topic) throws InterruptedException, MQBrokerException, RemotingException, MQClientException;
+
     List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
         final String group) throws InterruptedException, MQBrokerException,
         RemotingException, MQClientException;
 
+    AdminToolResult<List<QueueTimeSpan>> queryConsumeTimeSpanConcurrent(final String topic, final String group);
+
     boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, MQClientException, InterruptedException;
 
@@ -216,6 +251,9 @@ public interface MQAdminExt extends MQAdmin {
     ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, final String clientId, final boolean jstack)
         throws RemotingException, MQClientException, InterruptedException;
 
+    ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, final String clientId, final boolean jstack, final boolean metrics)
+        throws RemotingException, MQClientException, InterruptedException;
+
     ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
         String clientId,
         String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
@@ -229,6 +267,10 @@ public interface MQAdminExt extends MQAdmin {
         MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
         MQBrokerException;
 
+    List<MessageTrack> messageTrackDetailConcurrent(
+        MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
+        MQBrokerException;
+
     void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException,
         MQClientException, InterruptedException, MQBrokerException;
 
@@ -254,6 +296,7 @@ public interface MQAdminExt extends MQAdmin {
         long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQBrokerException;
 
+
     TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
         long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQBrokerException;
@@ -315,11 +358,37 @@ public interface MQAdminExt extends MQAdmin {
         throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQClientException;
 
+    long searchOffset(final String brokerAddr, final String topicName,
+        final int queueId, final long timestamp, final long timeoutMillis)
+        throws RemotingException, MQBrokerException, InterruptedException;
 
+    void resetOffsetByQueueId(final String brokerAddr, final String consumerGroup,
+        final String topicName, final int queueId, final long resetOffset)
+        throws RemotingException, InterruptedException, MQBrokerException;
 
     TopicConfig examineTopicConfig(final String addr,
                                    final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
 
     void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException;
 
+    GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, String groupName, String topicName,
+        Boolean readable)
+        throws RemotingException, InterruptedException, MQBrokerException;
+
+
+    MessageExt queryMessage(String clusterName,
+        String topic,
+        String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
+    HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
+        RemotingTimeoutException, InterruptedException, MQBrokerException;
+
+    /**
+     * Reset master flush offset in slave
+     *
+     * @param brokerAddr slave broker address
+     * @param masterFlushOffset master flush offset
+     */
+    void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset)
+        throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/BrokerOperatorResult.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/BrokerOperatorResult.java
new file mode 100644
index 0000000..5ec04b5
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/BrokerOperatorResult.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.admin.api;
+
+import java.util.List;
+
+public class BrokerOperatorResult {
+
+    private List<String> successList;
+
+    private List<String> failureList;
+
+    public List<String> getSuccessList() {
+        return successList;
+    }
+
+    public void setSuccessList(List<String> successList) {
+        this.successList = successList;
+    }
+
+    public List<String> getFailureList() {
+        return failureList;
+    }
+
+    public void setFailureList(List<String> failureList) {
+        this.failureList = failureList;
+    }
+
+    @Override
+    public String toString() {
+        return "BrokerOperatorResult{" +
+            "successList=" + successList +
+            ", failureList=" + failureList +
+            '}';
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/common/AdminToolHandler.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/common/AdminToolHandler.java
new file mode 100644
index 0000000..1afebeb
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/common/AdminToolHandler.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.admin.common;
+
+public interface AdminToolHandler {
+    AdminToolResult doExecute() throws Exception;
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/common/AdminToolResult.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/common/AdminToolResult.java
new file mode 100644
index 0000000..21b9652
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/common/AdminToolResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.admin.common;
+
+public class AdminToolResult<T> {
+
+    private boolean success;
+    private int code;
+    private String errorMsg;
+    private T data;
+
+    public AdminToolResult(boolean success, int code, String errorMsg, T data) {
+        this.success = success;
+        this.code = code;
+        this.errorMsg = errorMsg;
+        this.data = data;
+    }
+
+    public static AdminToolResult success(Object data) {
+        return new AdminToolResult(true, AdminToolsResultCodeEnum.SUCCESS.getCode(), "success", data);
+    }
+
+    public static AdminToolResult failure(AdminToolsResultCodeEnum errorCodeEnum, String errorMsg) {
+        return new AdminToolResult(false, errorCodeEnum.getCode(), errorMsg, null);
+    }
+
+    public static AdminToolResult failure(AdminToolsResultCodeEnum errorCodeEnum, String errorMsg, Object data) {
+        return new AdminToolResult(false, errorCodeEnum.getCode(), errorMsg, data);
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getErrorMsg() {
+        return errorMsg;
+    }
+
+    public void setErrorMsg(String errorMsg) {
+        this.errorMsg = errorMsg;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/common/AdminToolsResultCodeEnum.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/common/AdminToolsResultCodeEnum.java
new file mode 100644
index 0000000..9598845
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/common/AdminToolsResultCodeEnum.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.admin.common;
+
+public enum AdminToolsResultCodeEnum {
+
+    /**
+     *
+     */
+    SUCCESS(200),
+
+    REMOTING_ERROR(-1001),
+    MQ_BROKER_ERROR(-1002),
+    MQ_CLIENT_ERROR(-1003),
+    INTERRUPT_ERROR(-1004),
+
+    TOPIC_ROUTE_INFO_NOT_EXIST(-2001),
+    CONSUMER_NOT_ONLINE(-2002);
+
+    private int code;
+
+    AdminToolsResultCodeEnum(int code) {
+        this.code = code;
+    }
+
+    public int getCode() {
+        return code;
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
index cdf5f32..c2e4cb2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
@@ -39,6 +39,8 @@ public class CommandUtil {
     private static final String ERROR_MESSAGE = "Make sure the specified clusterName exists or the name server " +
         "connected to is correct.";
 
+    public static final String NO_MASTER_PLACEHOLDER = "NO_MASTER";
+
     public static Map<String/*master addr*/, List<String>/*slave addr*/> fetchMasterAndSlaveDistinguish(
         final MQAdminExt adminExt, final String clusterName)
         throws InterruptedException, RemotingConnectException,
@@ -62,14 +64,23 @@ public class CommandUtil {
             }
 
             String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
-            masterAndSlaveMap.put(masterAddr, new ArrayList<String>());
+
+            if (masterAddr == null) {
+                masterAndSlaveMap.putIfAbsent(NO_MASTER_PLACEHOLDER, new ArrayList<>());
+            } else {
+                masterAndSlaveMap.put(masterAddr, new ArrayList<String>());
+            }
 
             for (Entry<Long, String> brokerAddrEntry : brokerData.getBrokerAddrs().entrySet()) {
                 if (brokerAddrEntry.getValue() == null || brokerAddrEntry.getKey() == MixAll.MASTER_ID) {
                     continue;
                 }
 
-                masterAndSlaveMap.get(masterAddr).add(brokerAddrEntry.getValue());
+                if (masterAddr == null) {
+                    masterAndSlaveMap.get(NO_MASTER_PLACEHOLDER).add(brokerAddrEntry.getValue());
+                } else {
+                    masterAndSlaveMap.get(masterAddr).add(brokerAddrEntry.getValue());
+                }
             }
         }
 
@@ -136,7 +147,7 @@ public class CommandUtil {
 
     public static String fetchBrokerNameByAddr(final MQAdminExt adminExt, final String addr) throws Exception {
         ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
-        HashMap<String/* brokerName */, BrokerData> brokerAddrTable = clusterInfoSerializeWrapper.getBrokerAddrTable();
+        Map<String/* brokerName */, BrokerData> brokerAddrTable = clusterInfoSerializeWrapper.getBrokerAddrTable();
         Iterator<Map.Entry<String, BrokerData>> it = brokerAddrTable.entrySet().iterator();
         while (it.hasNext()) {
             Map.Entry<String, BrokerData> entry = it.next();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index c8bfe66..bc72038 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
 import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
 import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand;
 import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand;
+import org.apache.rocketmq.tools.command.broker.ResetMasterFlushOffsetSubCommand;
 import org.apache.rocketmq.tools.command.broker.SendMsgStatusCommand;
 import org.apache.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand;
 import org.apache.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand;
@@ -51,9 +52,12 @@ import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand;
 import org.apache.rocketmq.tools.command.consumer.SetConsumeModeSubCommand;
 import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
 import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
+import org.apache.rocketmq.tools.command.container.AddBrokerSubCommand;
+import org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand;
 import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
 import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
 import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
+import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand;
 import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
 import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
 import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
@@ -146,8 +150,11 @@ public class MQAdminStartup {
                             String namesrvAddr = commandLine.getOptionValue('n');
                             System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
                         }
-
-                        cmd.execute(commandLine, options, AclUtils.getAclRPCHook(rocketmqHome + MixAll.ACL_CONF_TOOLS_FILE));
+                        if (rpcHook != null) {
+                            cmd.execute(commandLine, options, rpcHook);
+                        } else {
+                            cmd.execute(commandLine, options, AclUtils.getAclRPCHook(rocketmqHome + MixAll.ACL_CONF_TOOLS_FILE));
+                        }
                     } else {
                         System.out.printf("The sub command %s not exist.%n", args[0]);
                     }
@@ -171,6 +178,9 @@ public class MQAdminStartup {
         initCommand(new TopicStatusSubCommand());
         initCommand(new TopicClusterSubCommand());
 
+        initCommand(new AddBrokerSubCommand());
+        initCommand(new RemoveBrokerSubCommand());
+        initCommand(new ResetMasterFlushOffsetSubCommand());
         initCommand(new BrokerStatusSubCommand());
         initCommand(new QueryMsgByIdSubCommand());
         initCommand(new QueryMsgByKeySubCommand());
@@ -228,13 +238,14 @@ public class MQAdminStartup {
         initCommand(new UpdateGlobalWhiteAddrSubCommand());
         initCommand(new GetAccessConfigSubCommand());
 
-
         initCommand(new UpdateStaticTopicSubCommand());
         initCommand(new RemappingStaticTopicSubCommand());
 
         initCommand(new ExportMetadataCommand());
         initCommand(new ExportConfigsCommand());
         initCommand(new ExportMetricsCommand());
+
+        initCommand(new HAStatusSubCommand());
     }
 
     private static void initLogback() throws JoranException {
@@ -247,6 +258,7 @@ public class MQAdminStartup {
 
     private static void printHelp() {
         System.out.printf("The most commonly used mqadmin commands are:%n");
+
         for (SubCommand cmd : subCommandList) {
             System.out.printf("   %-20s %s%n", cmd.commandName(), cmd.commandDesc());
         }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
index 6bacd3c..b9cfdf9 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
@@ -85,19 +85,28 @@ public class GetBrokerConfigCommand implements SubCommand {
                 Map<String, List<String>> masterAndSlaveMap
                     = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
 
-                for (Entry<String, List<String>> masterAndSlaveEntry : masterAndSlaveMap.entrySet()) {
+                for (String masterAddr : masterAndSlaveMap.keySet()) {
+
+                    if (masterAddr == null) {
+                        continue;
+                    }
 
                     getAndPrint(
-                            defaultMQAdminExt,
-                            String.format("============Master: %s============\n", masterAndSlaveEntry.getKey()),
-                            masterAndSlaveEntry.getKey()
+                        defaultMQAdminExt,
+                        String.format("============Master: %s============\n", masterAddr),
+                        masterAddr
                     );
-                    for (String slaveAddr : masterAndSlaveEntry.getValue()) {
+
+                    for (String slaveAddr : masterAndSlaveMap.get(masterAddr)) {
+
+                        if (slaveAddr == null) {
+                            continue;
+                        }
 
                         getAndPrint(
-                                defaultMQAdminExt,
-                                String.format("============My Master: %s=====Slave: %s============\n", masterAndSlaveEntry.getKey(), slaveAddr),
-                                slaveAddr
+                            defaultMQAdminExt,
+                            String.format("============My Master: %s=====Slave: %s============\n", masterAddr, slaveAddr),
+                            slaveAddr
                         );
                     }
                 }
@@ -117,6 +126,10 @@ public class GetBrokerConfigCommand implements SubCommand {
 
         System.out.print(printPrefix);
 
+        if (addr.equals(CommandUtil.NO_MASTER_PLACEHOLDER)) {
+            return;
+        }
+
         Properties properties = defaultMQAdminExt.getBrokerConfig(addr);
         if (properties == null) {
             System.out.printf("Broker[%s] has no config property!\n", addr);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java
new file mode 100644
index 0000000..b2ac48c
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.broker;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class ResetMasterFlushOffsetSubCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "resetMasterFlushOffset";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Reset master flush offset in slave";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "which broker to reset");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("o", "offset", true, "the offset to reset at");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options,
+        RPCHook rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            String brokerAddr = commandLine.getOptionValue('b').trim();
+            long masterFlushOffset = Long.parseLong(commandLine.getOptionValue('o').trim());
+
+            defaultMQAdminExt.resetMasterFlushOffset(brokerAddr, masterFlushOffset);
+            System.out.printf("reset master flush offset to %d success%n", masterFlushOffset);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
index a94fa50..98abeb6 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
@@ -50,6 +50,10 @@ public class UpdateBrokerConfigSubCommand implements SubCommand {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("a", "updateAllBroker", true, "update all brokers include slave");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         opt = new Option("k", "key", true, "config key");
         opt.setRequired(true);
         options.addOption(opt);
@@ -87,9 +91,15 @@ public class UpdateBrokerConfigSubCommand implements SubCommand {
 
                 defaultMQAdminExt.start();
 
-                Set<String> masterSet =
-                    CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
-                for (String brokerAddr : masterSet) {
+                Set<String> brokerAddrSet;
+
+                if (commandLine.hasOption('a')) {
+                    brokerAddrSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName);
+                } else {
+                    brokerAddrSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                }
+
+                for (String brokerAddr : brokerAddrSet) {
                     try {
                         defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties);
                         System.out.printf("update broker config success, %s\n", brokerAddr);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
index 872a130..0658ba0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.tools.command.cluster;
 import java.math.BigDecimal;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.TreeSet;
@@ -93,7 +93,7 @@ public class CLusterSendMsgRTCommand implements SubCommand {
             producer.start();
 
             ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
-            HashMap<String, Set<String>> clusterAddr = clusterInfoSerializeWrapper
+            Map<String, Set<String>> clusterAddr = clusterInfoSerializeWrapper
                 .getClusterAddrTable();
 
             Set<String> clusterNames = null;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
index 6a0cd71..f34d032 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
@@ -20,17 +20,15 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
-import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
@@ -44,7 +42,7 @@ public class ClusterListSubCommand implements SubCommand {
 
     @Override
     public String commandDesc() {
-        return "List all of clusters";
+        return "List cluster infos";
     }
 
     @Override
@@ -57,6 +55,10 @@ public class ClusterListSubCommand implements SubCommand {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("c", "clusterName", true, "which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -74,6 +76,8 @@ public class ClusterListSubCommand implements SubCommand {
             printInterval = Long.parseLong(commandLine.getOptionValue('i')) * 1000;
         }
 
+        String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : "";
+
         try {
             defaultMQAdminExt.start();
             long i = 0;
@@ -82,10 +86,15 @@ public class ClusterListSubCommand implements SubCommand {
                 if (i++ > 0) {
                     Thread.sleep(printInterval);
                 }
+
+                ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+
+                Set<String> clusterNames = getTargetClusterNames(clusterName, clusterInfo);
+
                 if (commandLine.hasOption('m')) {
-                    this.printClusterMoreStats(defaultMQAdminExt);
+                    this.printClusterMoreStats(clusterNames, defaultMQAdminExt, clusterInfo);
                 } else {
-                    this.printClusterBaseInfo(defaultMQAdminExt);
+                    this.printClusterBaseInfo(clusterNames, defaultMQAdminExt, clusterInfo);
                 }
             }
             while (enableInterval);
@@ -96,11 +105,19 @@ public class ClusterListSubCommand implements SubCommand {
         }
     }
 
-    private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException,
-        RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException {
-
-        ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
+    private Set<String> getTargetClusterNames(String clusterName, ClusterInfo clusterInfo) {
+        if (StringUtils.isEmpty(clusterName)) {
+            return clusterInfo.getClusterAddrTable().keySet();
+        } else {
+            Set<String> clusterNames = new TreeSet<String>();
+            clusterNames.add(clusterName);
+            return clusterNames;
+        }
+    }
 
+    private void printClusterMoreStats(final Set<String> clusterNames,
+        final DefaultMQAdminExt defaultMQAdminExt,
+        final ClusterInfo clusterInfo) {
         System.out.printf("%-16s  %-32s %14s %14s %14s %14s%n",
             "#Cluster Name",
             "#Broker Name",
@@ -110,17 +127,16 @@ public class ClusterListSubCommand implements SubCommand {
             "#OutTotalToday"
         );
 
-        Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator();
-        while (itCluster.hasNext()) {
-            Map.Entry<String, Set<String>> next = itCluster.next();
-            String clusterName = next.getKey();
-            TreeSet<String> brokerNameSet = new TreeSet<String>();
-            brokerNameSet.addAll(next.getValue());
+        for (String clusterName : clusterNames) {
+            TreeSet<String> brokerNameTreeSet = new TreeSet<String>();
+            Set<String> brokerNameSet = clusterInfo.getClusterAddrTable().get(clusterName);
+            if (brokerNameSet != null && !brokerNameSet.isEmpty()) {
+                brokerNameTreeSet.addAll(brokerNameSet);
+            }
 
-            for (String brokerName : brokerNameSet) {
-                BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+            for (String brokerName : brokerNameTreeSet) {
+                BrokerData brokerData = clusterInfo.getBrokerAddrTable().get(brokerName);
                 if (brokerData != null) {
-
                     Iterator<Map.Entry<Long, String>> itAddr = brokerData.getBrokerAddrs().entrySet().iterator();
                     while (itAddr.hasNext()) {
                         Map.Entry<Long, String> next1 = itAddr.next();
@@ -144,7 +160,7 @@ public class ClusterListSubCommand implements SubCommand {
                             inTotalToday = Long.parseLong(msgPutTotalTodayNow) - Long.parseLong(msgPutTotalTodayMorning);
                             outTotalToday = Long.parseLong(msgGetTotalTodayNow) - Long.parseLong(msgGetTotalTodayMorning);
 
-                        } catch (Exception e) {
+                        } catch (Exception ignored) {
                         }
 
                         System.out.printf("%-16s  %-32s %14d %14d %14d %14d%n",
@@ -158,19 +174,12 @@ public class ClusterListSubCommand implements SubCommand {
                     }
                 }
             }
-
-            if (itCluster.hasNext()) {
-                System.out.printf("");
-            }
         }
     }
 
-    private void printClusterBaseInfo(
-        final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException,
-        RemotingSendRequestException, InterruptedException, MQBrokerException {
-
-        ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
-
+    private void printClusterBaseInfo(final Set<String> clusterNames,
+        final DefaultMQAdminExt defaultMQAdminExt,
+        final ClusterInfo clusterInfo) {
         System.out.printf("%-16s  %-22s  %-4s  %-22s %-16s %19s %19s %10s %5s %6s%n",
             "#Cluster Name",
             "#Broker Name",
@@ -181,20 +190,20 @@ public class ClusterListSubCommand implements SubCommand {
             "#OutTPS(LOAD)",
             "#PCWait(ms)",
             "#Hour",
-            "#SPACE"
+            "#SPACE",
+            "#ACTIVATED"
         );
 
-        Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator();
-        while (itCluster.hasNext()) {
-            Map.Entry<String, Set<String>> next = itCluster.next();
-            String clusterName = next.getKey();
-            TreeSet<String> brokerNameSet = new TreeSet<String>();
-            brokerNameSet.addAll(next.getValue());
+        for (String clusterName : clusterNames) {
+            TreeSet<String> brokerNameTreeSet = new TreeSet<String>();
+            Set<String> brokerNameSet = clusterInfo.getClusterAddrTable().get(clusterName);
+            if (brokerNameSet != null && !brokerNameSet.isEmpty()) {
+                brokerNameTreeSet.addAll(brokerNameSet);
+            }
 
-            for (String brokerName : brokerNameSet) {
-                BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+            for (String brokerName : brokerNameTreeSet) {
+                BrokerData brokerData = clusterInfo.getBrokerAddrTable().get(brokerName);
                 if (brokerData != null) {
-
                     Iterator<Map.Entry<Long, String>> itAddr = brokerData.getBrokerAddrs().entrySet().iterator();
                     while (itAddr.hasNext()) {
                         Map.Entry<Long, String> next1 = itAddr.next();
@@ -208,8 +217,10 @@ public class ClusterListSubCommand implements SubCommand {
                         String pageCacheLockTimeMills = "";
                         String earliestMessageTimeStamp = "";
                         String commitLogDiskRatio = "";
+                        boolean isBrokerActive = false;
                         try {
                             KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(next1.getValue());
+                            isBrokerActive = Boolean.parseBoolean(kvTable.getTable().get("brokerActive"));
                             String putTps = kvTable.getTable().get("putTps");
                             String getTransferedTps = kvTable.getTable().get("getTransferedTps");
                             sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize");
@@ -246,15 +257,15 @@ public class ClusterListSubCommand implements SubCommand {
                         double space = 0.0;
 
                         if (earliestMessageTimeStamp != null && earliestMessageTimeStamp.length() > 0) {
-                            long mills = System.currentTimeMillis() - Long.valueOf(earliestMessageTimeStamp);
+                            long mills = System.currentTimeMillis() - Long.parseLong(earliestMessageTimeStamp);
                             hour = mills / 1000.0 / 60.0 / 60.0;
                         }
 
                         if (commitLogDiskRatio != null && commitLogDiskRatio.length() > 0) {
-                            space = Double.valueOf(commitLogDiskRatio);
+                            space = Double.parseDouble(commitLogDiskRatio);
                         }
 
-                        System.out.printf("%-16s  %-22s  %-4s  %-22s %-16s %19s %19s %10s %5s %6s%n",
+                        System.out.printf("%-16s  %-22s  %-4s  %-22s %-16s %19s %19s %10s %5s %6s %10s%n",
                             clusterName,
                             brokerName,
                             next1.getKey(),
@@ -264,15 +275,12 @@ public class ClusterListSubCommand implements SubCommand {
                             String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
                             pageCacheLockTimeMills,
                             String.format("%2.2f", hour),
-                            String.format("%.4f", space)
+                            String.format("%.4f", space),
+                            isBrokerActive
                         );
                     }
                 }
             }
-
-            if (itCluster.hasNext()) {
-                System.out.printf("");
-            }
         }
     }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index 48e0c1b..86464c2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -64,6 +64,10 @@ public class ConsumerProgressSubCommand implements SubCommand {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("t", "topicName", true, "topic name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         Option optionShowClientIP = new Option("s", "showClientIP", true, "Show Client IP per Queue");
         optionShowClientIP.setRequired(false);
         options.addOption(optionShowClientIP);
@@ -79,12 +83,13 @@ public class ConsumerProgressSubCommand implements SubCommand {
             for (Connection connection : consumerConnection.getConnectionSet()) {
                 String clientId = connection.getClientId();
                 ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId,
-                    false);
+                    false, false);
                 for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) {
                     results.put(messageQueue, clientId.split("@")[0]);
                 }
             }
-        } catch (Exception ignore) {
+        } catch (Exception e) {
+            log.error("getMqAllocationsResult error, ", e);
         }
         return results;
     }
@@ -102,7 +107,13 @@ public class ConsumerProgressSubCommand implements SubCommand {
 
             if (commandLine.hasOption('g')) {
                 String consumerGroup = commandLine.getOptionValue('g').trim();
-                ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
+                String topicName = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : null;
+                ConsumeStats consumeStats;
+                if (topicName == null) {
+                    consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
+                } else {
+                    consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup, topicName);
+                }
                 List<MessageQueue> mqList = new LinkedList<MessageQueue>();
                 mqList.addAll(consumeStats.getOffsetTable().keySet());
                 Collections.sort(mqList);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
index fb0efeb..44113e3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
@@ -27,7 +27,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
-import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
 
 public class DeleteSubscriptionGroupCommand implements SubCommand {
     @Override
@@ -68,17 +67,19 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
         try {
             // groupName
             String groupName = commandLine.getOptionValue('g').trim();
-
-            boolean removeOffset = false;
+            boolean cleanOffset = false;
             if (commandLine.hasOption('r')) {
-                removeOffset = Boolean.valueOf(commandLine.getOptionValue("r").trim());
+                try {
+                    cleanOffset = Boolean.valueOf(commandLine.getOptionValue('r').trim());
+                } catch (Exception e) {
+                }
             }
 
             if (commandLine.hasOption('b')) {
                 String addr = commandLine.getOptionValue('b').trim();
                 adminExt.start();
 
-                adminExt.deleteSubscriptionGroup(addr, groupName, removeOffset);
+                adminExt.deleteSubscriptionGroup(addr, groupName, cleanOffset);
                 System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName,
                     addr);
 
@@ -89,17 +90,15 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
 
                 Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
                 for (String master : masterSet) {
-                    adminExt.deleteSubscriptionGroup(master, groupName, removeOffset);
+                    adminExt.deleteSubscriptionGroup(master, groupName, cleanOffset);
                     System.out.printf(
                         "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n",
                         groupName, master, clusterName);
                 }
 
                 try {
-                    DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.RETRY_GROUP_TOPIC_PREFIX
-                        + groupName);
-                    DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.DLQ_GROUP_TOPIC_PREFIX
-                        + groupName);
+                    adminExt.deleteTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + groupName, clusterName);
+                    adminExt.deleteTopic(MixAll.DLQ_GROUP_TOPIC_PREFIX + groupName, clusterName);
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
index 61e3611..4696b4f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
@@ -134,7 +134,7 @@ class ConsumerConfigInfo {
         return brokerName;
     }
 
-    public void setBrokerName(String brokerNameList) {
+    public void setBrokerName(String brokerName) {
         this.brokerName = brokerName;
     }
 
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java
new file mode 100644
index 0000000..12e66ab
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.container;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class AddBrokerSubCommand implements SubCommand {
+    @Override public String commandName() {
+        return "addBroker";
+    }
+
+    @Override public String commandDesc() {
+        return "Add a broker to specified container";
+    }
+
+    @Override public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("c", "brokerContainerAddr", true, "Broker container address");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("b", "brokerConfigPath", true, "Broker config path");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override public void execute(CommandLine commandLine, Options options,
+        RPCHook rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            String brokerContainerAddr = commandLine.getOptionValue('c').trim();
+            String brokerConfigPath = commandLine.getOptionValue('b').trim();
+            defaultMQAdminExt.addBrokerToContainer(brokerContainerAddr, brokerConfigPath);
+            System.out.printf("add broker to %s success%n", brokerContainerAddr);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
new file mode 100644
index 0000000..9907d8a
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.container;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class RemoveBrokerSubCommand implements SubCommand {
+    @Override public String commandName() {
+        return "removeBroker";
+    }
+
+    @Override public String commandDesc() {
+        return "Remove a broker from specified container";
+    }
+
+    @Override public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("c", "brokerContainerAddr", true, "Broker container address");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("b", "brokerIdentity", true, "Information to identify a broker: clusterName:brokerName:brokerId");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override public void execute(CommandLine commandLine, Options options,
+        RPCHook rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            String brokerContainerAddr = commandLine.getOptionValue('c').trim();
+            String[] brokerIdentities = commandLine.getOptionValue('b').trim().split(":");
+            String clusterName = brokerIdentities[0].trim();
+            String brokerName = brokerIdentities[1].trim();
+            long brokerId;
+            try {
+                brokerId = Long.parseLong(brokerIdentities[2].trim());
+            } catch (NumberFormatException e) {
+                e.printStackTrace();
+                return;
+            }
+            if (brokerId < 0) {
+                System.out.printf("brokerId can't be negative%n");
+                return;
+            }
+            defaultMQAdminExt.removeBrokerFromContainer(brokerContainerAddr, clusterName, brokerName, brokerId);
+            System.out.printf("remove broker from %s success%n", brokerContainerAddr);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java
new file mode 100644
index 0000000..228c67d
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.ha;
+
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
+import org.apache.rocketmq.common.protocol.body.HARuntimeInfo.HAClientRuntimeInfo;
+import org.apache.rocketmq.common.protocol.body.HARuntimeInfo.HAConnectionRuntimeInfo;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class HAStatusSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "haStatus";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Fetch ha runtime status data";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("c", "clusterName", true, "which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("b", "brokerAddr", true, "which broker to fetch");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("f", "follow", true, "the interval(second) of get info");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            if (commandLine.hasOption('f')) {
+                String flushSecondStr = commandLine.getOptionValue('f');
+                int flushSecond = 3;
+                if (flushSecondStr != null && !flushSecondStr.trim().equals("")) {
+                    flushSecond = Integer.parseInt(flushSecondStr);
+                }
+
+                defaultMQAdminExt.start();
+
+                while (true) {
+                    this.innerExec(commandLine, options, defaultMQAdminExt);
+                    Thread.sleep(flushSecond * 1000);
+                }
+            } else {
+                defaultMQAdminExt.start();
+
+                this.innerExec(commandLine, options, defaultMQAdminExt);
+            }
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    private void innerExec(CommandLine commandLine, Options options,
+        DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+        if (commandLine.hasOption('b')) {
+            String addr = commandLine.getOptionValue('b').trim();
+            this.printStatus(addr, defaultMQAdminExt);
+        } else if (commandLine.hasOption('c')) {
+
+            String clusterName = commandLine.getOptionValue('c').trim();
+            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+
+            for (String addr : masterSet) {
+                this.printStatus(addr, defaultMQAdminExt);
+            }
+        } else {
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        }
+
+    }
+
+    private void printStatus(String brokerAddr, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+        HARuntimeInfo haRuntimeInfo = defaultMQAdminExt.getBrokerHAStatus(brokerAddr);
+
+        if (haRuntimeInfo.isMaster()) {
+            System.out.printf("\n#MasterAddr\t%s\n#MasterCommitLogMaxOffset\t%d\n#SlaveNum\t%d\n#InSyncSlaveNum\t%d\n", brokerAddr,
+                haRuntimeInfo.getMasterCommitLogMaxOffset(), haRuntimeInfo.getHaConnectionInfo().size(), haRuntimeInfo.getInSyncSlaveNums());
+            System.out.printf("%-32s  %-16s %16s %16s %16s %16s\n",
+                "#SlaveAddr",
+                "#SlaveAckOffset",
+                "#Diff",
+                "#TransferSpeed(KB/s)",
+                "#Status",
+                "#TransferFromWhere"
+            );
+
+            for (HAConnectionRuntimeInfo cInfo : haRuntimeInfo.getHaConnectionInfo()) {
+                System.out.printf("%-32s  %-16d %16d %16.2f %16s %16d\n",
+                    cInfo.getAddr(),
+                    cInfo.getSlaveAckOffset(),
+                    cInfo.getDiff(),
+                    cInfo.getTransferredByteInSecond() / 1024.0,
+                    cInfo.isInSync() ? "OK" : "Fall Behind",
+                    cInfo.getTransferFromWhere());
+            }
+        } else {
+            HAClientRuntimeInfo haClientRuntimeInfo = haRuntimeInfo.getHaClientRuntimeInfo();
+
+            System.out.printf("\n#MasterAddr\t%s\n", haClientRuntimeInfo.getMasterAddr());
+            System.out.printf("#CommitLogMaxOffset\t%d\n", haClientRuntimeInfo.getMaxOffset());
+            System.out.printf("#TransferSpeed(KB/s)\t%.2f\n", haClientRuntimeInfo.getTransferredByteInSecond() / 1024.0);
+            System.out.printf("#LastReadTime\t%s\n", UtilAll.timeMillisToHumanString2(haClientRuntimeInfo.getLastReadTimestamp()));
+            System.out.printf("#LastWriteTime\t%s\n", UtilAll.timeMillisToHumanString2(haClientRuntimeInfo.getLastWriteTimestamp()));
+            System.out.printf("#MasterFlushOffset\t%s\n", haClientRuntimeInfo.getMasterFlushOffset());
+        }
+    }
+
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
index 266de75..6e4e570 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
@@ -64,6 +64,15 @@ public class ResetOffsetByTimeCommand implements SubCommand {
         opt = new Option("c", "cplus", false, "reset c++ client offset");
         opt.setRequired(false);
         options.addOption(opt);
+
+        opt = new Option("b", "broker", true, "broker addr");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("q", "queue", true, "queue id");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -96,7 +105,32 @@ public class ResetOffsetByTimeCommand implements SubCommand {
                 isC = true;
             }
 
+            String brokerAddr = null;
+            if (commandLine.hasOption('b')) {
+                brokerAddr = commandLine.getOptionValue("b");
+            }
+            int queueId = -1;
+            if (commandLine.hasOption("q")) {
+                queueId = Integer.valueOf(commandLine.getOptionValue('q'));
+            }
+
             defaultMQAdminExt.start();
+
+            if (brokerAddr != null && queueId > -1) {
+                System.out.printf("rollback consumer offset by specified group[%s], topic[%s], queueId[%s], broker[%s], timestamp(string)[%s], timestamp(long)[%s]%n",
+                        group, topic, queueId, brokerAddr, timeStampStr, timestamp);
+                try {
+                    long resetOffset = defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId, timestamp, 3000);
+                    System.out.printf("Rollback Offset is: %s", resetOffset);
+                    if (resetOffset > 0) {
+                        defaultMQAdminExt.resetOffsetByQueueId(brokerAddr, group, topic, queueId, resetOffset);
+                    }
+                } catch (Throwable e) {
+                    throw e;
+                }
+                return;
+            }
+
             Map<MessageQueue, Long> offsetTable;
             try {
                 offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index a7fd514..59df145 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -283,7 +283,7 @@ public class DefaultMQAdminExtTest {
     @Test
     public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
-        HashMap<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable();
+        Map<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable();
         assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker");
         assertThat(brokerList.containsKey("broker-test")).isTrue();
 
@@ -294,7 +294,7 @@ public class DefaultMQAdminExtTest {
         clusterMap.put("default-cluster", brokers);
         ClusterInfo cInfo = mock(ClusterInfo.class);
         when(cInfo.getClusterAddrTable()).thenReturn(clusterMap);
-        HashMap<String, Set<String>> clusterAddress = cInfo.getClusterAddrTable();
+        Map<String, Set<String>> clusterAddress = cInfo.getClusterAddrTable();
         assertThat(clusterAddress.containsKey("default-cluster")).isTrue();
         assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2);
     }
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
index b556e5c..a7b2143 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
@@ -92,7 +92,7 @@ public class CommandUtilTest {
     @Test
     public void testFetchMasterAndSlaveDistinguish() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         Map<String, List<String>> result = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExtImpl, "default-cluster");
-        assertThat(result.get(null).get(0)).isEqualTo("127.0.0.1:10911");
+        assertThat(result.get(CommandUtil.NO_MASTER_PLACEHOLDER).get(0)).isEqualTo("127.0.0.1:10911");
     }
 
     @Test
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
index e21a66f..1298c91 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
@@ -89,7 +89,6 @@ public class QueryMsgByUniqueKeySubCommandTest {
         field.setAccessible(true);
         field.set(mqClientInstance, mQAdminImpl);
 
-
         field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
         field.setAccessible(true);
         field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
@@ -97,7 +96,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
         ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
         result.setConsumeResult(CMResult.CR_SUCCESS);
         result.setRemark("customRemark_122333444");
-        when(mQClientAPIImpl.consumeMessageDirectly(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(result);
+        when(mQClientAPIImpl.consumeMessageDirectly(anyString(), anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(result);
 
         MessageExt retMsgExt = new MessageExt();
         retMsgExt.setMsgId("0A3A54F7BF7D18B4AAC28A3FA2CF0000");
@@ -135,9 +134,8 @@ public class QueryMsgByUniqueKeySubCommandTest {
         groupList.setGroupList(groupSets);
         when(mQClientAPIImpl.queryTopicConsumeByWho(anyString(), anyString(), anyLong())).thenReturn(groupList);
 
-
         ConsumeStats consumeStats = new ConsumeStats();
-        consumeStats.setConsumeTps(100*10000);
+        consumeStats.setConsumeTps(100 * 10000);
         HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>();
         MessageQueue messageQueue = new MessageQueue();
         messageQueue.setBrokerName("messageQueue BrokerName testing");
@@ -149,7 +147,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
         offsetWrapper.setLastTimestamp(System.currentTimeMillis());
         offsetTable.put(messageQueue, offsetWrapper);
         consumeStats.setOffsetTable(offsetTable);
-        when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), (String)isNull(), anyLong())).thenReturn(consumeStats);
+        when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), (String) isNull(), anyLong())).thenReturn(consumeStats);
 
         ClusterInfo clusterInfo = new ClusterInfo();
         HashMap<String, BrokerData> brokerAddrTable = new HashMap<String, BrokerData>();
@@ -185,7 +183,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
 
-        String[] args = new String[]{"-t myTopicTest", "-i msgId"};
+        String[] args = new String[] {"-t myTopicTest", "-i msgId"};
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
         cmd.execute(commandLine, options, null);
 
@@ -208,12 +206,10 @@ public class QueryMsgByUniqueKeySubCommandTest {
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
 
-        String[] args = new String[]{"-t myTopicTest", "-i 7F000001000004D20000000000000066"};
+        String[] args = new String[] {"-t myTopicTest", "-i 7F000001000004D20000000000000066"};
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
         cmd.execute(commandLine, options, null);
 
-
-
     }
 
     @Test
@@ -221,7 +217,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
 
-        String[] args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
+        String[] args = new String[] {"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
         cmd.execute(commandLine, options, null);
 
@@ -241,7 +237,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
         cmd.execute(commandLine, options, null);
 
-        args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
+        args = new String[] {"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
         commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
         cmd.execute(commandLine, options, null);