You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/06/15 02:53:18 UTC

[48/51] [partial] incubator-rocketmq-externals git commit: Release rocketmq-console 1.0.0 version

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
new file mode 100644
index 0000000..0eba3a5
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminExtImpl.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.client;
+
+import com.google.common.base.Throwables;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.RollbackStats;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.console.util.JsonUtil;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.joor.Reflect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
+
+@Service
+public class MQAdminExtImpl implements MQAdminExt {
+    private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
+
+    public MQAdminExtImpl() {
+    }
+
+    @Override
+    public void updateBrokerConfig(String brokerAddr, Properties properties)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties);
+    }
+
+    @Override
+    public void createAndUpdateTopicConfig(String addr, TopicConfig config)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
+    }
+
+    @Override
+    public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr, config);
+    }
+
+    @Override
+    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
+        RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
+        RemotingCommand response = null;
+        try {
+            response = remotingClient.invokeSync(addr, request, 3000);
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                SubscriptionGroupWrapper subscriptionGroupWrapper = decode(response.getBody(), SubscriptionGroupWrapper.class);
+                return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group);
+            }
+            default:
+                throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark()));
+        }
+    }
+
+    @Override
+    public TopicConfig examineTopicConfig(String addr, String topic) {
+        RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+        RemotingCommand response = null;
+        try {
+            response = remotingClient.invokeSync(addr, request, 3000);
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class);
+                return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
+            }
+            default:
+                throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark()));
+        }
+    }
+
+    @Override
+    public TopicStatsTable examineTopicStats(String topic)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic);
+    }
+
+    @Override
+    public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+        TopicList topicList = MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList();
+        logger.debug("op=look={}", JsonUtil.obj2String(topicList.getTopicList()));
+        return topicList;
+    }
+
+    @Override
+    public KVTable fetchBrokerRuntimeStats(String brokerAddr)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr);
+    }
+
+    @Override
+    public ConsumeStats examineConsumeStats(String consumerGroup)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup);
+    }
+
+    @Override
+    public ConsumeStats examineConsumeStats(String consumerGroup, String topic)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup, topic);
+    }
+
+    @Override
+    public ClusterInfo examineBrokerClusterInfo()
+        throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineBrokerClusterInfo();
+    }
+
+    @Override
+    public TopicRouteData examineTopicRouteInfo(String topic)
+        throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineTopicRouteInfo(topic);
+    }
+
+    @Override
+    public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup);
+    }
+
+    @Override
+    public ProducerConnection examineProducerConnectionInfo(String producerGroup, String topic)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().examineProducerConnectionInfo(producerGroup, topic);
+    }
+
+    @Override
+    public List<String> getNameServerAddressList() {
+        return MQAdminInstance.threadLocalMQAdminExt().getNameServerAddressList();
+    }
+
+    @Override
+    public int wipeWritePermOfBroker(String namesrvAddr, String brokerName)
+        throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
+        RemotingTimeoutException, InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().wipeWritePermOfBroker(namesrvAddr, brokerName);
+    }
+
+    @Override
+    public void putKVConfig(String namespace, String key, String value) {
+        MQAdminInstance.threadLocalMQAdminExt().putKVConfig(namespace, key, value);
+    }
+
+    @Override
+    public String getKVConfig(String namespace, String key)
+        throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().getKVConfig(namespace, key);
+    }
+
+    @Override
+    public KVTable getKVListByNamespace(String namespace)
+        throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().getKVListByNamespace(namespace);
+    }
+
+    @Override
+    public void deleteTopicInBroker(Set<String> addrs, String topic)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        logger.info("addrs={} topic={}", JsonUtil.obj2String(addrs), topic);
+        MQAdminInstance.threadLocalMQAdminExt().deleteTopicInBroker(addrs, topic);
+    }
+
+    @Override
+    public void deleteTopicInNameServer(Set<String> addrs, String topic)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().deleteTopicInNameServer(addrs, topic);
+    }
+
+    @Override
+    public void deleteSubscriptionGroup(String addr, String groupName)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().deleteSubscriptionGroup(addr, groupName);
+    }
+
+    @Override
+    public void createAndUpdateKvConfig(String namespace, String key, String value)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateKvConfig(namespace, key, value);
+    }
+
+    @Override
+    public void deleteKvConfig(String namespace, String key)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().deleteKvConfig(namespace, key);
+    }
+
+    @Override
+    public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
+        boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
+    }
+
+    @Override
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp,
+        boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().resetOffsetByTimestamp(topic, group, timestamp, isForce);
+    }
+
+    @Override
+    public void resetOffsetNew(String consumerGroup, String topic, long timestamp)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().resetOffsetNew(consumerGroup, topic, timestamp);
+    }
+
+    @Override
+    public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
+        String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().getConsumeStatus(topic, group, clientAddr);
+    }
+
+    @Override
+    public void createOrUpdateOrderConf(String key, String value, boolean isCluster)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createOrUpdateOrderConf(key, value, isCluster);
+    }
+
+    @Override
+    public GroupList queryTopicConsumeByWho(String topic)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().queryTopicConsumeByWho(topic);
+    }
+
+    @Override
+    public boolean cleanExpiredConsumerQueue(String cluster)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+        InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueue(cluster);
+    }
+
+    @Override
+    public boolean cleanExpiredConsumerQueueByAddr(String addr)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+        InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().cleanExpiredConsumerQueueByAddr(addr);
+    }
+
+    @Override
+    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack)
+        throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().getConsumerRunningInfo(consumerGroup, clientId, jstack);
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId,
+        String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, msgId);
+    }
+
+    @Override
+    public List<MessageTrack> messageTrackDetail(MessageExt msg)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().messageTrackDetail(msg);
+    }
+
+    @Override
+    public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline)
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        MQAdminInstance.threadLocalMQAdminExt().cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
+        throws MQClientException {
+        MQAdminInstance.threadLocalMQAdminExt().createTopic(key, newTopic, queueNum, topicSysFlag);
+    }
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().searchOffset(mq, timestamp);
+    }
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().maxOffset(mq);
+    }
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().minOffset(mq);
+    }
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().earliestMsgStoreTime(mq);
+    }
+
+    @Override
+    public MessageExt viewMessage(String msgId)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().viewMessage(msgId);
+    }
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+        throws MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().queryMessage(topic, key, maxNum, begin, end);
+    }
+
+    @Override
+    @Deprecated
+    public void start() throws MQClientException {
+        throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this");
+    }
+
+    @Override
+    @Deprecated
+    public void shutdown() {
+        throw new IllegalStateException("thisMethod is deprecated.use org.apache.rocketmq.console.aspect.admin.MQAdminAspect instead of this");
+    }
+
+    // below is 3.2.6->3.5.8 updated
+
+    @Override
+    public List<QueueTimeSpan> queryConsumeTimeSpan(String topic,
+        String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        return MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group);
+    }
+
+    //MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a day
+    //next version we will remove it
+    //https://issues.apache.org/jira/browse/ROCKETMQ-111
+    //https://github.com/apache/incubator-rocketmq/pull/69
+    @Override
+    public MessageExt viewMessage(String topic,
+        String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
+        try {
+            return viewMessage(msgId);
+        }
+        catch (Exception e) {
+        }
+        MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
+        QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32,
+            MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get();
+        if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
+            return qr.getMessageList().get(0);
+        }
+        else {
+            return null;
+        }
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String topic,
+        String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+    }
+
+    @Override
+    public Properties getBrokerConfig(
+        String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().getBrokerConfig(brokerAddr);
+    }
+
+    @Override
+    public TopicList fetchTopicsByCLuster(
+        String clusterName) throws RemotingException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().fetchTopicsByCLuster(clusterName);
+    }
+
+    @Override
+    public boolean cleanUnusedTopic(
+        String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopic(cluster);
+    }
+
+    @Override
+    public boolean cleanUnusedTopicByAddr(
+        String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().cleanUnusedTopicByAddr(addr);
+    }
+
+    @Override
+    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
+        String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().viewBrokerStatsData(brokerAddr, statsName, statsKey);
+    }
+
+    @Override
+    public Set<String> getClusterList(
+        String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().getClusterList(topic);
+    }
+
+    @Override
+    public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder,
+        long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return MQAdminInstance.threadLocalMQAdminExt().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
+    }
+
+    @Override
+    public Set<String> getTopicClusterList(
+        String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
+        return MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
+    }
+
+    @Override
+    public SubscriptionGroupWrapper getAllSubscriptionGroup(String brokerAddr,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
+    }
+
+    @Override
+    public TopicConfigSerializeWrapper getAllTopicGroup(String brokerAddr,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return MQAdminInstance.threadLocalMQAdminExt().getAllTopicGroup(brokerAddr, timeoutMillis);
+    }
+
+    @Override
+    public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
+        long offset) throws RemotingException, InterruptedException, MQBrokerException {
+        MQAdminInstance.threadLocalMQAdminExt().updateConsumeOffset(brokerAddr, consumeGroup, mq, offset);
+    }
+
+    // 4.0.0 added
+    @Override public void updateNameServerConfig(Properties properties,
+        List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
+
+    }
+
+    @Override public Map<String, Properties> getNameServerConfig(
+        List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
new file mode 100644
index 0000000..e914e6c
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/client/MQAdminInstance.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.client;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.joor.Reflect;
+
+public class MQAdminInstance {
+    private static final ThreadLocal<DefaultMQAdminExt> MQ_ADMIN_EXT_THREAD_LOCAL = new ThreadLocal<DefaultMQAdminExt>();
+    private static final ThreadLocal<Integer> INIT_COUNTER = new ThreadLocal<Integer>();
+
+    public static MQAdminExt threadLocalMQAdminExt() {
+        DefaultMQAdminExt defaultMQAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
+        if (defaultMQAdminExt == null) {
+            throw new IllegalStateException("defaultMQAdminExt should be init before you get this");
+        }
+        return defaultMQAdminExt;
+    }
+
+    public static RemotingClient threadLocalRemotingClient() {
+        MQClientInstance mqClientInstance = threadLocalMqClientInstance();
+        MQClientAPIImpl mQClientAPIImpl = Reflect.on(mqClientInstance).get("mQClientAPIImpl");
+        return Reflect.on(mQClientAPIImpl).get("remotingClient");
+    }
+
+    public static MQClientInstance threadLocalMqClientInstance() {
+        DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
+        return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
+    }
+
+    public static void initMQAdminInstance(long timeoutMillis) throws MQClientException {
+        Integer nowCount = INIT_COUNTER.get();
+        if (nowCount == null) {
+            DefaultMQAdminExt defaultMQAdminExt;
+            if (timeoutMillis > 0) {
+                defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis);
+            }
+            else {
+                defaultMQAdminExt = new DefaultMQAdminExt();
+            }
+            defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+            defaultMQAdminExt.start();
+            MQ_ADMIN_EXT_THREAD_LOCAL.set(defaultMQAdminExt);
+            INIT_COUNTER.set(1);
+        }
+        else {
+            INIT_COUNTER.set(nowCount + 1);
+        }
+
+    }
+
+    public static void destroyMQAdminInstance() {
+        Integer nowCount = INIT_COUNTER.get() - 1;
+        if (nowCount > 0) {
+            INIT_COUNTER.set(nowCount);
+            return;
+        }
+        MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
+        if (mqAdminExt != null) {
+            mqAdminExt.shutdown();
+            MQ_ADMIN_EXT_THREAD_LOCAL.remove();
+            INIT_COUNTER.remove();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
new file mode 100644
index 0000000..e225edc
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ClusterServiceImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.console.service.ClusterService;
+import org.apache.rocketmq.console.util.JsonUtil;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.Map;
+import java.util.Properties;
+
+@Service
+public class ClusterServiceImpl implements ClusterService {
+    private Logger logger = LoggerFactory.getLogger(ClusterServiceImpl.class);
+    @Resource
+    private MQAdminExt mqAdminExt;
+
+    @Override
+    public Map<String, Object> list() {
+        try {
+            Map<String, Object> resultMap = Maps.newHashMap();
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            logger.info("op=look_clusterInfo {}", JsonUtil.obj2String(clusterInfo));
+            Map<String/*brokerName*/, Map<Long/* brokerId */, Object/* brokerDetail */>> brokerServer = Maps.newHashMap();
+            for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
+                Map<Long, Object> brokerMasterSlaveMap = Maps.newHashMap();
+                for (Map.Entry<Long/* brokerId */, String/* broker address */> brokerAddr : brokerData.getBrokerAddrs().entrySet()) {
+                    KVTable kvTable = mqAdminExt.fetchBrokerRuntimeStats(brokerAddr.getValue());
+//                KVTable kvTable = mqAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911");
+                    brokerMasterSlaveMap.put(brokerAddr.getKey(), kvTable.getTable());
+                }
+                brokerServer.put(brokerData.getBrokerName(), brokerMasterSlaveMap);
+            }
+            resultMap.put("clusterInfo", clusterInfo);
+            resultMap.put("brokerServer", brokerServer);
+            return resultMap;
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+    }
+
+
+    @Override
+    public Properties getBrokerConfig(String brokerAddr) {
+        try {
+            return mqAdminExt.getBrokerConfig(brokerAddr);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
new file mode 100644
index 0000000..715cbf5
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.RollbackStats;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod;
+import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat;
+import org.apache.rocketmq.console.model.GroupConsumeInfo;
+import org.apache.rocketmq.console.model.QueueStatInfo;
+import org.apache.rocketmq.console.model.TopicConsumerInfo;
+import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
+import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest;
+import org.apache.rocketmq.console.model.request.ResetOffsetRequest;
+import org.apache.rocketmq.console.service.AbstractCommonService;
+import org.apache.rocketmq.console.service.ConsumerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import static com.google.common.base.Throwables.propagate;
+
+@Service
+public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
+    private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public List<GroupConsumeInfo> queryGroupList() {
+        Set<String> consumerGroupSet = Sets.newHashSet();
+        try {
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
+                SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+                consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+            }
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList();
+        for (String consumerGroup : consumerGroupSet) {
+            groupConsumeInfoList.add(queryGroup(consumerGroup));
+        }
+        Collections.sort(groupConsumeInfoList);
+        return groupConsumeInfoList;
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public GroupConsumeInfo queryGroup(String consumerGroup) {
+        GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
+        try {
+            ConsumeStats consumeStats = null;
+            try {
+                consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);
+            }
+            catch (Exception e) {
+                logger.warn("examineConsumeStats exception, " + consumerGroup, e);
+            }
+
+            ConsumerConnection consumerConnection = null;
+            try {
+                consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
+            }
+            catch (Exception e) {
+                logger.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e);
+            }
+
+            groupConsumeInfo.setGroup(consumerGroup);
+
+            if (consumeStats != null) {
+                groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
+                groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
+            }
+
+            if (consumerConnection != null) {
+                groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());
+                groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());
+                groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());
+                groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));
+            }
+        }
+        catch (Exception e) {
+            logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, "
+                + consumerGroup, e);
+        }
+        return groupConsumeInfo;
+    }
+
+    @Override
+    public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName) {
+        return queryConsumeStatsList(null, groupName);
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String groupName) {
+        ConsumeStats consumeStats = null;
+        try {
+            consumeStats = mqAdminExt.examineConsumeStats(groupName, topic);
+        }
+        catch (Exception e) {
+            throw propagate(e);
+        }
+        List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() {
+            @Override
+            public boolean apply(MessageQueue o) {
+                return StringUtils.isBlank(topic) || o.getTopic().equals(topic);
+            }
+        }));
+        Collections.sort(mqList);
+        List<TopicConsumerInfo> topicConsumerInfoList = Lists.newArrayList();
+        TopicConsumerInfo nowTopicConsumerInfo = null;
+        Map<MessageQueue, String> messageQueueClientMap = getClientConnection(groupName);
+        for (MessageQueue mq : mqList) {
+            if (nowTopicConsumerInfo == null || (!StringUtils.equals(mq.getTopic(), nowTopicConsumerInfo.getTopic()))) {
+                nowTopicConsumerInfo = new TopicConsumerInfo(mq.getTopic());
+                topicConsumerInfoList.add(nowTopicConsumerInfo);
+            }
+            QueueStatInfo queueStatInfo = QueueStatInfo.fromOffsetTableEntry(mq, consumeStats.getOffsetTable().get(mq));
+            queueStatInfo.setClientInfo(messageQueueClientMap.get(mq));
+            nowTopicConsumerInfo.appendQueueStatInfo(queueStatInfo);
+        }
+        return topicConsumerInfoList;
+    }
+
+    private Map<MessageQueue, String> getClientConnection(String groupName) {
+        Map<MessageQueue, String> results = Maps.newHashMap();
+        try {
+            ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(groupName);
+            for (Connection connection : consumerConnection.getConnectionSet()) {
+                String clinetId = connection.getClientId();
+                ConsumerRunningInfo consumerRunningInfo = mqAdminExt.getConsumerRunningInfo(groupName, clinetId, false);
+                for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) {
+//                    results.put(messageQueue, clinetId + " " + connection.getClientAddr());
+                    results.put(messageQueue, clinetId);
+                }
+            }
+        }
+        catch (Exception err) {
+            logger.error("op=getClientConnection_error", err);
+        }
+        return results;
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public Map<String /*groupName*/, TopicConsumerInfo> queryConsumeStatsListByTopicName(String topic) {
+        Map<String, TopicConsumerInfo> group2ConsumerInfoMap = Maps.newHashMap();
+        try {
+            GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
+            for (String group : groupList.getGroupList()) {
+                List<TopicConsumerInfo> topicConsumerInfoList = null;
+                try {
+                    topicConsumerInfoList = queryConsumeStatsList(topic, group);
+                }
+                catch (Exception ignore) {
+                }
+                group2ConsumerInfoMap.put(group, CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) : topicConsumerInfoList.get(0));
+            }
+            return group2ConsumerInfoMap;
+        }
+        catch (Exception e) {
+            throw propagate(e);
+        }
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public Map<String, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest resetOffsetRequest) {
+        Map<String, ConsumerGroupRollBackStat> groupRollbackStats = Maps.newHashMap();
+        for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) {
+            try {
+                Map<MessageQueue, Long> rollbackStatsMap =
+                    mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce());
+                ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true);
+                List<RollbackStats> rollbackStatsList = consumerGroupRollBackStat.getRollbackStatsList();
+                for (Map.Entry<MessageQueue, Long> rollbackStatsEntty : rollbackStatsMap.entrySet()) {
+                    RollbackStats rollbackStats = new RollbackStats();
+                    rollbackStats.setRollbackOffset(rollbackStatsEntty.getValue());
+                    rollbackStats.setQueueId(rollbackStatsEntty.getKey().getQueueId());
+                    rollbackStats.setBrokerName(rollbackStatsEntty.getKey().getBrokerName());
+                    rollbackStatsList.add(rollbackStats);
+                }
+                groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat);
+            }
+            catch (MQClientException e) {
+                if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+                    try {
+                        ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true);
+                        List<RollbackStats> rollbackStatsList = mqAdminExt.resetOffsetByTimestampOld(consumerGroup, resetOffsetRequest.getTopic(), resetOffsetRequest.getResetTime(), true);
+                        consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList);
+                        groupRollbackStats.put(consumerGroup, consumerGroupRollBackStat);
+                        continue;
+                    }
+                    catch (Exception err) {
+                        logger.error("op=resetOffset_which_not_online_error", err);
+                    }
+                }
+                else {
+                    logger.error("op=resetOffset_error", e);
+                }
+                groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage()));
+            }
+            catch (Exception e) {
+                logger.error("op=resetOffset_error", e);
+                groupRollbackStats.put(consumerGroup, new ConsumerGroupRollBackStat(false, e.getMessage()));
+            }
+        }
+        return groupRollbackStats;
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group) {
+        List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
+        try {
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName
+                String brokerAddress = clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
+                SubscriptionGroupConfig subscriptionGroupConfig = mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group);
+                if (subscriptionGroupConfig == null) {
+                    continue;
+                }
+                consumerConfigInfoList.add(new ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig));
+            }
+        }
+        catch (Exception e) {
+            throw propagate(e);
+        }
+        return consumerConfigInfoList;
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) {
+        try {
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) {
+                logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName());
+                mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName());
+            }
+        }
+        catch (Exception e) {
+            throw propagate(e);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) {
+        try {
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
+                consumerConfigInfo.getClusterNameList(), consumerConfigInfo.getBrokerNameList())) {
+                mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), consumerConfigInfo.getSubscriptionGroupConfig());
+            }
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        return true;
+    }
+
+    @Override
+    @MultiMQAdminCmdMethod
+    public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) {
+        Set<String> brokerNameSet = Sets.newHashSet();
+        try {
+            List<ConsumerConfigInfo> consumerConfigInfoList = examineSubscriptionGroupConfig(group);
+            for (ConsumerConfigInfo consumerConfigInfo : consumerConfigInfoList) {
+                brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList());
+            }
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        return brokerNameSet;
+
+    }
+
+    @Override
+    public ConsumerConnection getConsumerConnection(String consumerGroup) {
+        try {
+            return mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) {
+        try {
+            return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, jstack);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
new file mode 100644
index 0000000..d32a344
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardCollectServiceImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.impl;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.base.Ticker;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.exception.ServiceException;
+import org.apache.rocketmq.console.service.DashboardCollectService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DashboardCollectServiceImpl implements DashboardCollectService {
+
+    @Resource
+    private RMQConfigure rmqConfigure;
+
+    private final static Logger log = LoggerFactory.getLogger(DashboardCollectServiceImpl.class);
+
+    private LoadingCache<String, List<String>> brokerMap = CacheBuilder.newBuilder()
+        .maximumSize(1000)
+        .concurrencyLevel(10)
+        .recordStats()
+        .ticker(Ticker.systemTicker())
+        .removalListener(new RemovalListener<Object, Object>() {
+            @Override
+            public void onRemoval(RemovalNotification<Object, Object> notification) {
+                log.debug(notification.getKey() + " was removed, cause is " + notification.getCause());
+            }
+        })
+        .build(
+            new CacheLoader<String, List<String>>() {
+                @Override
+                public List<String> load(String key) {
+                    List<String> list = Lists.newArrayList();
+                    return list;
+                }
+            }
+        );
+
+    private LoadingCache<String, List<String>> topicMap = CacheBuilder.newBuilder()
+        .maximumSize(1000)
+        .concurrencyLevel(10)
+        .recordStats()
+        .ticker(Ticker.systemTicker())
+        .removalListener(new RemovalListener<Object, Object>() {
+            @Override
+            public void onRemoval(RemovalNotification<Object, Object> notification) {
+                log.debug(notification.getKey() + " was removed, cause is " + notification.getCause());
+            }
+        })
+        .build(
+            new CacheLoader<String, List<String>>() {
+                @Override
+                public List<String> load(String key) {
+                    List<String> list = Lists.newArrayList();
+                    return list;
+                }
+            }
+        );
+
+    @Override
+    public LoadingCache<String, List<String>> getBrokerMap() {
+        return brokerMap;
+    }
+    @Override
+    public LoadingCache<String, List<String>> getTopicMap() {
+        return topicMap;
+    }
+
+    @Override
+    public Map<String, List<String>> jsonDataFile2map(File file) {
+        List<String> strings;
+        try {
+            strings = Files.readLines(file, Charsets.UTF_8);
+        }
+        catch (IOException e) {
+            throw Throwables.propagate(e);
+        }
+        StringBuffer sb = new StringBuffer();
+        for (String string : strings) {
+            sb.append(string);
+        }
+        JSONObject json = (JSONObject) JSONObject.parse(sb.toString());
+        Set<Map.Entry<String, Object>> entries = json.entrySet();
+        Map<String, List<String>> map = Maps.newHashMap();
+        for (Map.Entry<String, Object> entry : entries) {
+            JSONArray tpsArray = (JSONArray) entry.getValue();
+            if (tpsArray == null) {
+                continue;
+            }
+            Object[] tpsStrArray = tpsArray.toArray();
+            List<String> tpsList = Lists.newArrayList();
+            for (Object tpsObj : tpsStrArray) {
+                tpsList.add("" + tpsObj);
+            }
+            map.put(entry.getKey(), tpsList);
+        }
+        return map;
+    }
+
+    @Override
+    public Map<String, List<String>> getBrokerCache(String date) {
+        String dataLocationPath = rmqConfigure.getConsoleCollectData();
+        File file = new File(dataLocationPath + date + ".json");
+        if (!file.exists()) {
+            throw Throwables.propagate(new ServiceException(1, "This date have't data!"));
+        }
+        return jsonDataFile2map(file);
+    }
+
+    @Override
+    public Map<String, List<String>> getTopicCache(String date) {
+        String dataLocationPath = rmqConfigure.getConsoleCollectData();
+        File file = new File(dataLocationPath + date + "_topic" + ".json");
+        if (!file.exists()) {
+            throw Throwables.propagate(new ServiceException(1, "This date have't data!"));
+        }
+        return jsonDataFile2map(file);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
new file mode 100644
index 0000000..3189093
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/DashboardServiceImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.collect.Lists;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.service.DashboardCollectService;
+import org.apache.rocketmq.console.service.DashboardService;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DashboardServiceImpl implements DashboardService {
+
+    @Resource
+    private DashboardCollectService dashboardCollectService;
+    /**
+     * @param date format yyyy-MM-dd
+     */
+    @Override
+    public Map<String, List<String>> queryBrokerData(String date) {
+        return dashboardCollectService.getBrokerCache(date);
+    }
+
+    @Override
+    public Map<String, List<String>> queryTopicData(String date) {
+        return dashboardCollectService.getTopicCache(date);
+    }
+
+    /**
+     * @param date format yyyy-MM-dd
+     * @param topicName
+     */
+    @Override
+    public List<String> queryTopicData(String date, String topicName) {
+        if (null != dashboardCollectService.getTopicCache(date)) {
+            return dashboardCollectService.getTopicCache(date).get(topicName);
+        }
+        return null;
+    }
+
+    @Override
+    public List<String> queryTopicCurrentData() {
+        Date date = new Date();
+        DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+        Map<String, List<String>> topicCache = dashboardCollectService.getTopicCache(format.format(date));
+        List<String> result = Lists.newArrayList();
+        for (Map.Entry<String, List<String>> entry : topicCache.entrySet()) {
+            List<String> value = entry.getValue();
+            result.add(entry.getKey() + "," + value.get(value.size() - 1).split(",")[4]);
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
new file mode 100644
index 0000000..0205a69
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MessageServiceImpl.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Resource;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.console.model.MessageView;
+import org.apache.rocketmq.console.service.MessageService;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+public class MessageServiceImpl implements MessageService {
+
+    private Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
+    /**
+     * @see org.apache.rocketmq.store.config.MessageStoreConfig maxMsgsNumBatch = 64;
+     * @see org.apache.rocketmq.store.index.IndexService maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
+     */
+    private final static int QUERY_MESSAGE_MAX_NUM = 64;
+    @Resource
+    private MQAdminExt mqAdminExt;
+
+    public Pair<MessageView, List<MessageTrack>> viewMessage(String subject, final String msgId) {
+        try {
+
+            MessageExt messageExt = mqAdminExt.viewMessage(subject, msgId);
+            List<MessageTrack> messageTrackList = messageTrackDetail(messageExt);
+            return new Pair<>(MessageView.fromMessageExt(messageExt), messageTrackList);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public List<MessageView> queryMessageByTopicAndKey(String topic, String key) {
+        try {
+            return Lists.transform(mqAdminExt.queryMessage(topic, key, QUERY_MESSAGE_MAX_NUM, 0, System.currentTimeMillis()).getMessageList(), new Function<MessageExt, MessageView>() {
+                @Override
+                public MessageView apply(MessageExt messageExt) {
+                    return MessageView.fromMessageExt(messageExt);
+                }
+            });
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+    }
+
+    @Override
+    public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
+        List<MessageView> messageViewList = Lists.newArrayList();
+        try {
+            String subExpression = "*";
+            consumer.start();
+            Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
+            for (MessageQueue mq : mqs) {
+                long minOffset = consumer.searchOffset(mq, begin);
+                long maxOffset = consumer.searchOffset(mq, end);
+                READQ:
+                for (long offset = minOffset; offset <= maxOffset; ) {
+                    try {
+                        if (messageViewList.size() > 2000) {
+                            break;
+                        }
+                        PullResult pullResult = consumer.pull(mq, subExpression, offset, 32);
+                        offset = pullResult.getNextBeginOffset();
+                        switch (pullResult.getPullStatus()) {
+                            case FOUND:
+
+                                List<MessageView> messageViewListByQuery = Lists.transform(pullResult.getMsgFoundList(), new Function<MessageExt, MessageView>() {
+                                    @Override
+                                    public MessageView apply(MessageExt messageExt) {
+                                        messageExt.setBody(null);
+                                        return MessageView.fromMessageExt(messageExt);
+                                    }
+                                });
+                                List<MessageView> filteredList = Lists.newArrayList(Iterables.filter(messageViewListByQuery, new Predicate<MessageView>() {
+                                    @Override
+                                    public boolean apply(MessageView messageView) {
+                                        if (messageView.getStoreTimestamp() < begin || messageView.getStoreTimestamp() > end) {
+                                            logger.info("begin={} end={} time not in range {} {}", begin, end, messageView.getStoreTimestamp(), new Date(messageView.getStoreTimestamp()).toString());
+                                        }
+                                        return messageView.getStoreTimestamp() >= begin && messageView.getStoreTimestamp() <= end;
+                                    }
+                                }));
+                                messageViewList.addAll(filteredList);
+                                break;
+                            case NO_MATCHED_MSG:
+                            case NO_NEW_MSG:
+                            case OFFSET_ILLEGAL:
+                                break READQ;
+                        }
+                    }
+                    catch (Exception e) {
+                        break;
+                    }
+                }
+            }
+            Collections.sort(messageViewList, new Comparator<MessageView>() {
+                @Override
+                public int compare(MessageView o1, MessageView o2) {
+                    if (o1.getStoreTimestamp() - o2.getStoreTimestamp() == 0) {
+                        return 0;
+                    }
+                    return (o1.getStoreTimestamp() > o2.getStoreTimestamp()) ? -1 : 1;
+                }
+            });
+            return messageViewList;
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        finally {
+            consumer.shutdown();
+        }
+    }
+
+    @Override
+    public List<MessageTrack> messageTrackDetail(MessageExt msg) {
+        try {
+            return mqAdminExt.messageTrackDetail(msg);
+        }
+        catch (Exception e) {
+            logger.error("op=messageTrackDetailError", e);
+            return Collections.emptyList();
+        }
+    }
+
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup,
+        String clientId) {
+        if (StringUtils.isNotBlank(clientId)) {
+            try {
+                return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+            }
+            catch (Exception e) {
+                throw Throwables.propagate(e);
+            }
+        }
+
+        try {
+            ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
+            for (Connection connection : consumerConnection.getConnectionSet()) {
+                if (StringUtils.isBlank(connection.getClientId())) {
+                    continue;
+                }
+                logger.info("clientId={}", connection.getClientId());
+                return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
+            }
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        throw new IllegalStateException("NO CONSUMER");
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
new file mode 100644
index 0000000..d3a109d
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/MonitorServiceImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Throwables;
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.model.ConsumerMonitorConfig;
+import org.apache.rocketmq.console.service.MonitorService;
+import org.apache.rocketmq.console.util.JsonUtil;
+import org.springframework.stereotype.Service;
+
+@Service
+public class MonitorServiceImpl implements MonitorService {
+
+
+    @Resource
+    private RMQConfigure rmqConfigure;
+
+    private Map<String, ConsumerMonitorConfig> configMap = new ConcurrentHashMap<>();
+
+    @Override
+    public boolean createOrUpdateConsumerMonitor(String name, ConsumerMonitorConfig config) {
+        configMap.put(name, config);// todo if write map success but write file fail
+        writeToFile(getConsumerMonitorConfigDataPath(), configMap);
+        return true;
+    }
+
+    @Override
+    public Map<String, ConsumerMonitorConfig> queryConsumerMonitorConfig() {
+        return configMap;
+    }
+
+    @Override
+    public ConsumerMonitorConfig queryConsumerMonitorConfigByGroupName(String consumeGroupName) {
+        return configMap.get(consumeGroupName);
+    }
+
+    @Override
+    public boolean deleteConsumerMonitor(String consumeGroupName) {
+        configMap.remove(consumeGroupName);
+        writeToFile(getConsumerMonitorConfigDataPath(), configMap);
+        return true;
+    }
+
+    //rocketmq.console.data.path/monitor/consumerMonitorConfig.json
+    private String getConsumerMonitorConfigDataPath() {
+        return rmqConfigure.getRocketMqConsoleDataPath() + File.separatorChar + "monitor" + File.separatorChar + "consumerMonitorConfig.json";
+    }
+
+    private String getConsumerMonitorConfigDataPathBackUp() {
+        return getConsumerMonitorConfigDataPath() + ".bak";
+    }
+
+    private void writeToFile(String path, Object data) {
+        writeDataJsonToFile(path, JsonUtil.obj2String(data));
+    }
+
+    private void writeDataJsonToFile(String path, String dataStr) {
+        try {
+            MixAll.string2File(dataStr, path);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @PostConstruct
+    private void loadData() {
+        String content = MixAll.file2String(getConsumerMonitorConfigDataPath());
+        if (content == null) {
+            content = MixAll.file2String(getConsumerMonitorConfigDataPathBackUp());
+        }
+        if (content == null) {
+            return;
+        }
+        configMap = JsonUtil.string2Obj(content, new TypeReference<ConcurrentHashMap<String, ConsumerMonitorConfig>>() {
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
new file mode 100644
index 0000000..84e6d2f
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/OpsServiceImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.service.AbstractCommonService;
+import org.apache.rocketmq.console.service.OpsService;
+import org.apache.rocketmq.console.service.checker.CheckerType;
+import org.apache.rocketmq.console.service.checker.RocketMqChecker;
+import org.springframework.stereotype.Service;
+
+@Service
+public class OpsServiceImpl extends AbstractCommonService implements OpsService {
+
+    @Resource
+    private RMQConfigure rMQConfigure;
+
+    @Resource
+    private List<RocketMqChecker> rocketMqCheckerList;
+
+    @Override
+    public Map<String, Object> homePageInfo() {
+        Map<String, Object> homePageInfoMap = Maps.newHashMap();
+        homePageInfoMap.put("namesvrAddrList", Splitter.on(";").splitToList(rMQConfigure.getNamesrvAddr()));
+        homePageInfoMap.put("useVIPChannel", Boolean.valueOf(rMQConfigure.getIsVIPChannel()));
+        return homePageInfoMap;
+    }
+
+    @Override
+    public void updateNameSvrAddrList(String nameSvrAddrList) {
+        rMQConfigure.setNamesrvAddr(nameSvrAddrList);
+    }
+
+    @Override
+    public String getNameSvrList() {
+        return rMQConfigure.getNamesrvAddr();
+    }
+
+    @Override
+    public Map<CheckerType, Object> rocketMqStatusCheck() {
+        Map<CheckerType, Object> checkResultMap = Maps.newHashMap();
+        for (RocketMqChecker rocketMqChecker : rocketMqCheckerList) {
+            checkResultMap.put(rocketMqChecker.checkerType(), rocketMqChecker.doCheck());
+        }
+        return checkResultMap;
+    }
+
+    @Override public boolean updateIsVIPChannel(String useVIPChannel) {
+        rMQConfigure.setIsVIPChannel(useVIPChannel);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
new file mode 100644
index 0000000..3e46958
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/ProducerServiceImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.base.Throwables;
+import javax.annotation.Resource;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.console.service.ProducerService;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ProducerServiceImpl implements ProducerService {
+    @Resource
+    private MQAdminExt mqAdminExt;
+
+    @Override
+    public ProducerConnection getProducerConnection(String producerGroup, String topic) {
+        try {
+            return mqAdminExt.examineProducerConnectionInfo(producerGroup, topic);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
new file mode 100644
index 0000000..117bcfd
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/impl/TopicServiceImpl.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service.impl;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
+import org.apache.rocketmq.console.model.request.TopicConfigInfo;
+import org.apache.rocketmq.console.service.AbstractCommonService;
+import org.apache.rocketmq.console.service.TopicService;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class TopicServiceImpl extends AbstractCommonService implements TopicService {
+
+    @Autowired
+    private RMQConfigure rMQConfigure;
+
+    @Override
+    public TopicList fetchAllTopicList() {
+        try {
+            return mqAdminExt.fetchAllTopicList();
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public TopicStatsTable stats(String topic) {
+        try {
+            return mqAdminExt.examineTopicStats(topic);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public TopicRouteData route(String topic) {
+        try {
+            return mqAdminExt.examineTopicRouteInfo(topic);
+        }
+        catch (Exception ex) {
+            throw Throwables.propagate(ex);
+        }
+    }
+
+    @Override
+    public GroupList queryTopicConsumerInfo(String topic) {
+        try {
+            return mqAdminExt.queryTopicConsumeByWho(topic);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
+        TopicConfig topicConfig = new TopicConfig();
+        BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
+        try {
+            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
+                topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) {
+                mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig);
+            }
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+    }
+
+    @Override
+    public TopicConfig examineTopicConfig(String topic, String brokerName) {
+        ClusterInfo clusterInfo = null;
+        try {
+            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic);
+    }
+
+    @Override
+    public List<TopicConfigInfo> examineTopicConfig(String topic) {
+        List<TopicConfigInfo> topicConfigInfoList = Lists.newArrayList();
+        TopicRouteData topicRouteData = route(topic);
+        for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+            TopicConfigInfo topicConfigInfo = new TopicConfigInfo();
+            TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName());
+            BeanUtils.copyProperties(topicConfig, topicConfigInfo);
+            topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName()));
+            topicConfigInfoList.add(topicConfigInfo);
+        }
+        return topicConfigInfoList;
+    }
+
+    @Override
+    public boolean deleteTopic(String topic, String clusterName) {
+        try {
+            if (StringUtils.isBlank(clusterName)) {
+                return deleteTopic(topic);
+            }
+            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, clusterName);
+            mqAdminExt.deleteTopicInBroker(masterSet, topic);
+            Set<String> nameServerSet = null;
+            if (StringUtils.isNotBlank(rMQConfigure.getNamesrvAddr())) {
+                String[] ns = rMQConfigure.getNamesrvAddr().split(";");
+                nameServerSet = new HashSet<String>(Arrays.asList(ns));
+            }
+            mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean deleteTopic(String topic) {
+        ClusterInfo clusterInfo = null;
+        try {
+            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+        }
+        catch (Exception err) {
+            throw Throwables.propagate(err);
+        }
+        for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
+            deleteTopic(topic, clusterName);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean deleteTopicInBroker(String brokerName, String topic) {
+
+        try {
+            ClusterInfo clusterInfo = null;
+            try {
+                clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            }
+            catch (Exception e) {
+                throw Throwables.propagate(e);
+            }
+            mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        return true;
+    }
+
+    @Override
+    public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
+        DefaultMQProducer producer = new DefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP);
+        producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
+        producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
+        try {
+            producer.start();
+            Message msg = new Message(sendTopicMessageRequest.getTopic(),
+                sendTopicMessageRequest.getTag(),
+                sendTopicMessageRequest.getKey(),
+                sendTopicMessageRequest.getMessageBody().getBytes()
+            );
+            return producer.send(msg);
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+        finally {
+            producer.shutdown();
+        }
+    }
+
+}