You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/06/15 02:53:19 UTC

[49/51] [partial] incubator-rocketmq-externals git commit: Release rocketmq-console 1.0.0 version

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/NamesvrController.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/NamesvrController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/NamesvrController.java
new file mode 100644
index 0000000..ad6c25a
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/NamesvrController.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.controller;
+
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.aspect.admin.annotation.OriginalControllerReturnValue;
+import org.apache.rocketmq.console.service.OpsService;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping("/rocketmq")
+public class NamesvrController {
+    @Resource
+    private OpsService opsService;
+
+    @RequestMapping(value = "/nsaddr", method = RequestMethod.GET)
+    @ResponseBody
+    @OriginalControllerReturnValue
+    public Object nsaddr() {
+        return opsService.getNameSvrList();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/OpsController.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/OpsController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/OpsController.java
new file mode 100644
index 0000000..d82862f
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/OpsController.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.controller;
+
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.service.OpsService;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping("/ops")
+public class OpsController {
+
+    @Resource
+    private OpsService opsService;
+
+    @RequestMapping(value = "/homePage.query", method = RequestMethod.GET)
+    @ResponseBody
+    public Object homePage() {
+        return opsService.homePageInfo();
+    }
+
+    @RequestMapping(value = "/updateNameSvrAddr.do", method = RequestMethod.POST)
+    @ResponseBody
+    public Object updateNameSvrAddr(@RequestParam String nameSvrAddrList) {
+        opsService.updateNameSvrAddrList(nameSvrAddrList);
+        return true;
+    }
+
+    @RequestMapping(value = "/updateIsVIPChannel.do", method = RequestMethod.POST)
+    @ResponseBody
+    public Object updateIsVIPChannel(@RequestParam String useVIPChannel) {
+        opsService.updateIsVIPChannel(useVIPChannel);
+        return true;
+    }
+
+
+    @RequestMapping(value = "/rocketMqStatus.query", method = RequestMethod.GET)
+    @ResponseBody
+    public Object clusterStatus() {
+        return opsService.rocketMqStatusCheck();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/ProducerController.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/ProducerController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/ProducerController.java
new file mode 100644
index 0000000..1a69de5
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/ProducerController.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.controller;
+
+import javax.annotation.Resource;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.console.model.ConnectionInfo;
+import org.apache.rocketmq.console.service.ProducerService;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping("/producer")
+public class ProducerController {
+
+    @Resource
+    private ProducerService producerService;
+
+    @RequestMapping(value = "/producerConnection.query", method = {RequestMethod.GET})
+    @ResponseBody
+    public Object producerConnection(@RequestParam String producerGroup, @RequestParam String topic) {
+        ProducerConnection producerConnection = producerService.getProducerConnection(producerGroup, topic);
+        producerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(producerConnection.getConnectionSet()));
+        return producerConnection;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TestController.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TestController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TestController.java
new file mode 100644
index 0000000..d7af1ad
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TestController.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.controller;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import java.util.List;
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping("/test")
+public class TestController {
+    private Logger logger = LoggerFactory.getLogger(TestController.class);
+    private String testTopic = "TestTopic";
+
+    @Resource
+    private RMQConfigure rMQConfigure;
+
+    @RequestMapping(value = "/runTask.do", method = RequestMethod.GET)
+    @ResponseBody
+    public Object list() throws MQClientException, RemotingException, InterruptedException {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(testTopic + "Group");
+        consumer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumer.subscribe(testTopic, "*");
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                ConsumeConcurrentlyContext context) {
+                logger.info("receiveMessage msgSize={}", msgs.size());
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();
+        final DefaultMQProducer producer = new DefaultMQProducer(testTopic + "Group");
+        producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
+        producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
+        producer.start();
+
+        new Thread(new Runnable() {
+
+            @Override public void run() {
+
+                int i = 0;
+                while (true) {
+                    try {
+                        Message msg = new Message(testTopic,
+                            "TagA" + i,
+                            "KEYS" + i,
+                            ("Hello RocketMQ " + i).getBytes()
+                        );
+                        Thread.sleep(1000L);
+                        SendResult sendResult = producer.send(msg);
+                        logger.info("sendMessage={}", JsonUtil.obj2String(sendResult));
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+                        try {
+                            Thread.sleep(1000);
+                        }
+                        catch (Exception ignore) {
+                        }
+                    }
+                }
+            }
+        }).start();
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TopicController.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TopicController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TopicController.java
new file mode 100644
index 0000000..90819af
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TopicController.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.controller;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
+import org.apache.rocketmq.console.model.request.TopicConfigInfo;
+import org.apache.rocketmq.console.service.ConsumerService;
+import org.apache.rocketmq.console.service.TopicService;
+import org.apache.rocketmq.console.util.JsonUtil;
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+
+import javax.annotation.Resource;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping("/topic")
+public class TopicController {
+    private Logger logger = LoggerFactory.getLogger(TopicController.class);
+
+    @Resource
+    private TopicService topicService;
+
+    @Resource
+    private ConsumerService consumerService;
+
+    @RequestMapping(value = "/list.query", method = RequestMethod.GET)
+    @ResponseBody
+    public Object list() throws MQClientException, RemotingException, InterruptedException {
+        return topicService.fetchAllTopicList();
+    }
+
+    @RequestMapping(value = "/stats.query", method = RequestMethod.GET)
+    @ResponseBody
+    public Object stats(@RequestParam String topic) {
+        return topicService.stats(topic);
+    }
+
+    @RequestMapping(value = "/route.query", method = RequestMethod.GET)
+    @ResponseBody
+    public Object route(@RequestParam String topic) {
+        return topicService.route(topic);
+    }
+
+
+    @RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST})
+    @ResponseBody
+    public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {
+        Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()),
+            "clusterName or brokerName can not be all blank");
+        logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));
+        topicService.createOrUpdate(topicCreateOrUpdateRequest);
+        return true;
+    }
+
+    @RequestMapping(value = "/queryConsumerByTopic.query")
+    @ResponseBody
+    public Object queryConsumerByTopic(@RequestParam String topic) {
+        return consumerService.queryConsumeStatsListByTopicName(topic);
+    }
+
+    @RequestMapping(value = "/queryTopicConsumerInfo.query")
+    @ResponseBody
+    public Object queryTopicConsumerInfo(@RequestParam String topic) {
+        return topicService.queryTopicConsumerInfo(topic);
+    }
+
+    @RequestMapping(value = "/examineTopicConfig.query")
+    @ResponseBody
+    public Object examineTopicConfig(@RequestParam String topic,
+        @RequestParam(required = false) String brokerName) throws RemotingException, MQClientException, InterruptedException {
+        return topicService.examineTopicConfig(topic);
+    }
+
+    @RequestMapping(value = "/sendTopicMessage.do", method = {RequestMethod.POST})
+    @ResponseBody
+    public Object sendTopicMessage(
+        @RequestBody SendTopicMessageRequest sendTopicMessageRequest) throws RemotingException, MQClientException, InterruptedException {
+        return topicService.sendTopicMessageRequest(sendTopicMessageRequest);
+    }
+
+    @RequestMapping(value = "/deleteTopic.do", method = {RequestMethod.POST})
+    @ResponseBody
+    public Object delete(@RequestParam(required = false) String clusterName, @RequestParam String topic) {
+        return topicService.deleteTopic(topic, clusterName);
+    }
+
+    @RequestMapping(value = "/deleteTopicByBroker.do", method = {RequestMethod.POST})
+    @ResponseBody
+    public Object deleteTopicByBroker(@RequestParam String brokerName, @RequestParam String topic) {
+        return topicService.deleteTopicInBroker(brokerName, topic);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/exception/ServiceException.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/exception/ServiceException.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/exception/ServiceException.java
new file mode 100644
index 0000000..7ad166b
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/exception/ServiceException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.exception;
+
+public class ServiceException extends RuntimeException {
+    private static final long serialVersionUID = 9213584003139969215L;
+    private int code;
+
+    public ServiceException(int code, String message) {
+        super(message);
+        this.code = code;
+    }
+
+    public int getCode() {
+        return code;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConnectionInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConnectionInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConnectionInfo.java
new file mode 100644
index 0000000..6e8dd19
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConnectionInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.protocol.body.Connection;
+
+public class ConnectionInfo extends Connection {
+    private String versionDesc;
+
+    public static ConnectionInfo buildConnectionInfo(Connection connection) {
+        ConnectionInfo connectionInfo = new ConnectionInfo();
+        connectionInfo.setClientId(connection.getClientId());
+        connectionInfo.setClientAddr(connection.getClientAddr());
+        connectionInfo.setLanguage(connection.getLanguage());
+        connectionInfo.setVersion(connection.getVersion());
+        connectionInfo.setVersionDesc(MQVersion.getVersionDesc(connection.getVersion()));
+        return connectionInfo;
+    }
+
+    public static HashSet<Connection> buildConnectionInfoHashSet(Collection<Connection> connectionList) {
+        HashSet<Connection> connectionHashSet = Sets.newHashSet();
+        for (Connection connection : connectionList) {
+            connectionHashSet.add(buildConnectionInfo(connection));
+        }
+        return connectionHashSet;
+    }
+
+    public String getVersionDesc() {
+        return versionDesc;
+    }
+
+    public void setVersionDesc(String versionDesc) {
+        this.versionDesc = versionDesc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerGroupRollBackStat.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerGroupRollBackStat.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerGroupRollBackStat.java
new file mode 100644
index 0000000..3ddfe07
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerGroupRollBackStat.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.admin.RollbackStats;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class ConsumerGroupRollBackStat {
+    private boolean status;
+    private String errMsg;
+    private List<RollbackStats> rollbackStatsList = Lists.newArrayList();
+
+    public ConsumerGroupRollBackStat(boolean status) {
+        this.status = status;
+    }
+
+    public ConsumerGroupRollBackStat(boolean status, String errMsg) {
+        this.status = status;
+        this.errMsg = errMsg;
+    }
+
+    public String getErrMsg() {
+        return errMsg;
+    }
+
+    public void setErrMsg(String errMsg) {
+        this.errMsg = errMsg;
+    }
+
+    public boolean isStatus() {
+        return status;
+    }
+
+    public void setStatus(boolean status) {
+        this.status = status;
+    }
+
+    public List<RollbackStats> getRollbackStatsList() {
+        return rollbackStatsList;
+    }
+
+    public void setRollbackStatsList(List<RollbackStats> rollbackStatsList) {
+        this.rollbackStatsList = rollbackStatsList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerMonitorConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerMonitorConfig.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerMonitorConfig.java
new file mode 100644
index 0000000..9124f00
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerMonitorConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+public class ConsumerMonitorConfig {
+    private int minCount;
+    private int maxDiffTotal;
+
+    public ConsumerMonitorConfig() {
+    }
+
+    public ConsumerMonitorConfig(int minCount, int maxDiffTotal) {
+        this.minCount = minCount;
+        this.maxDiffTotal = maxDiffTotal;
+    }
+
+    public int getMinCount() {
+        return minCount;
+    }
+
+    public void setMinCount(int minCount) {
+        this.minCount = minCount;
+    }
+
+    public int getMaxDiffTotal() {
+        return maxDiffTotal;
+    }
+
+    public void setMaxDiffTotal(int maxDiffTotal) {
+        this.maxDiffTotal = maxDiffTotal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java
new file mode 100644
index 0000000..27c5f92
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
+    private String group;
+    private String version;
+    private int count;
+    private ConsumeType consumeType;
+    private MessageModel messageModel;
+    private int consumeTps;
+    private long diffTotal = -1;
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+    public void setCount(int count) {
+        this.count = count;
+    }
+
+    public ConsumeType getConsumeType() {
+        return consumeType;
+    }
+
+    public void setConsumeType(ConsumeType consumeType) {
+        this.consumeType = consumeType;
+    }
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+    public long getDiffTotal() {
+        return diffTotal;
+    }
+
+    public void setDiffTotal(long diffTotal) {
+        this.diffTotal = diffTotal;
+    }
+
+    @Override
+    public int compareTo(GroupConsumeInfo o) {
+        if (this.count != o.count) {
+            return o.count - this.count;
+        }
+
+        return (int) (o.diffTotal - diffTotal);
+    }
+
+    public int getConsumeTps() {
+        return consumeTps;
+    }
+
+    public void setConsumeTps(int consumeTps) {
+        this.consumeTps = consumeTps;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageView.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageView.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageView.java
new file mode 100644
index 0000000..4011cad
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageView.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import com.google.common.base.Charsets;
+import org.springframework.beans.BeanUtils;
+
+import java.net.SocketAddress;
+import java.util.Map;
+
+public class MessageView {
+
+    /** from MessageExt **/
+    private int queueId;
+    private int storeSize;
+    private long queueOffset;
+    private int sysFlag;
+    private long bornTimestamp;
+    private SocketAddress bornHost;
+    private long storeTimestamp;
+    private SocketAddress storeHost;
+    private String msgId;
+    private long commitLogOffset;
+    private int bodyCRC;
+    private int reconsumeTimes;
+    private long preparedTransactionOffset;
+    /**from MessageExt**/
+
+    /** from Message **/
+    private String topic;
+    private int flag;
+    private Map<String, String> properties;
+    private String messageBody; // body
+
+    /** from Message **/
+
+    public static MessageView fromMessageExt(MessageExt messageExt) {
+        MessageView messageView = new MessageView();
+        BeanUtils.copyProperties(messageExt, messageView);
+        if (messageExt.getBody() != null) {
+            messageView.setMessageBody(new String(messageExt.getBody(), Charsets.UTF_8));
+        }
+        return messageView;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getFlag() {
+        return flag;
+    }
+
+    public void setFlag(int flag) {
+        this.flag = flag;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public int getStoreSize() {
+        return storeSize;
+    }
+
+    public void setStoreSize(int storeSize) {
+        this.storeSize = storeSize;
+    }
+
+    public long getQueueOffset() {
+        return queueOffset;
+    }
+
+    public void setQueueOffset(long queueOffset) {
+        this.queueOffset = queueOffset;
+    }
+
+    public int getSysFlag() {
+        return sysFlag;
+    }
+
+    public void setSysFlag(int sysFlag) {
+        this.sysFlag = sysFlag;
+    }
+
+    public long getBornTimestamp() {
+        return bornTimestamp;
+    }
+
+    public void setBornTimestamp(long bornTimestamp) {
+        this.bornTimestamp = bornTimestamp;
+    }
+
+    public SocketAddress getBornHost() {
+        return bornHost;
+    }
+
+    public void setBornHost(SocketAddress bornHost) {
+        this.bornHost = bornHost;
+    }
+
+    public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+    public void setStoreTimestamp(long storeTimestamp) {
+        this.storeTimestamp = storeTimestamp;
+    }
+
+    public SocketAddress getStoreHost() {
+        return storeHost;
+    }
+
+    public void setStoreHost(SocketAddress storeHost) {
+        this.storeHost = storeHost;
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public long getCommitLogOffset() {
+        return commitLogOffset;
+    }
+
+    public void setCommitLogOffset(long commitLogOffset) {
+        this.commitLogOffset = commitLogOffset;
+    }
+
+    public int getBodyCRC() {
+        return bodyCRC;
+    }
+
+    public void setBodyCRC(int bodyCRC) {
+        this.bodyCRC = bodyCRC;
+    }
+
+    public int getReconsumeTimes() {
+        return reconsumeTimes;
+    }
+
+    public void setReconsumeTimes(int reconsumeTimes) {
+        this.reconsumeTimes = reconsumeTimes;
+    }
+
+    public long getPreparedTransactionOffset() {
+        return preparedTransactionOffset;
+    }
+
+    public void setPreparedTransactionOffset(long preparedTransactionOffset) {
+        this.preparedTransactionOffset = preparedTransactionOffset;
+    }
+
+    public String getMessageBody() {
+        return messageBody;
+    }
+
+    public void setMessageBody(String messageBody) {
+        this.messageBody = messageBody;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java
new file mode 100644
index 0000000..a6d67a8
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.springframework.beans.BeanUtils;
+
+public class QueueStatInfo {
+    private String brokerName;
+    private int queueId;
+    private long brokerOffset;
+    private long consumerOffset;
+    private String clientInfo;
+    private long lastTimestamp;
+
+    public static QueueStatInfo fromOffsetTableEntry(MessageQueue key, OffsetWrapper value) {
+        QueueStatInfo queueStatInfo = new QueueStatInfo();
+        BeanUtils.copyProperties(key, queueStatInfo);
+        BeanUtils.copyProperties(value, queueStatInfo);
+        return queueStatInfo;
+    }
+
+    public String getClientInfo() {
+        return clientInfo;
+    }
+
+    public void setClientInfo(String clientInfo) {
+        this.clientInfo = clientInfo;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public long getBrokerOffset() {
+        return brokerOffset;
+    }
+
+    public void setBrokerOffset(long brokerOffset) {
+        this.brokerOffset = brokerOffset;
+    }
+
+    public long getConsumerOffset() {
+        return consumerOffset;
+    }
+
+    public void setConsumerOffset(long consumerOffset) {
+        this.consumerOffset = consumerOffset;
+    }
+
+    public long getLastTimestamp() {
+        return lastTimestamp;
+    }
+
+    public void setLastTimestamp(long lastTimestamp) {
+        this.lastTimestamp = lastTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java
new file mode 100644
index 0000000..bd6d8e2
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class TopicConsumerInfo {
+    private String topic;
+    private long diffTotal;
+    private long lastTimestamp;
+    private List<QueueStatInfo> queueStatInfoList = Lists.newArrayList();
+
+    public TopicConsumerInfo(String topic) {
+        this.topic = topic;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public long getDiffTotal() {
+        return diffTotal;
+    }
+
+    public void setDiffTotal(long diffTotal) {
+        this.diffTotal = diffTotal;
+    }
+
+    public List<QueueStatInfo> getQueueStatInfoList() {
+        return queueStatInfoList;
+    }
+
+    public long getLastTimestamp() {
+        return lastTimestamp;
+    }
+
+    public void appendQueueStatInfo(QueueStatInfo queueStatInfo) {
+        queueStatInfoList.add(queueStatInfo);
+        diffTotal = diffTotal + (queueStatInfo.getBrokerOffset() - queueStatInfo.getConsumerOffset());
+        lastTimestamp = Math.max(lastTimestamp, queueStatInfo.getLastTimestamp());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java
new file mode 100644
index 0000000..cee155c
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+
+import java.util.List;
+
+public class ConsumerConfigInfo {
+    private List<String> clusterNameList;
+
+    private List<String> brokerNameList;
+    private SubscriptionGroupConfig subscriptionGroupConfig;
+
+    public ConsumerConfigInfo() {
+    }
+
+    public ConsumerConfigInfo(List<String> brokerNameList, SubscriptionGroupConfig subscriptionGroupConfig) {
+        this.brokerNameList = brokerNameList;
+        this.subscriptionGroupConfig = subscriptionGroupConfig;
+    }
+
+    public List<String> getClusterNameList() {
+        return clusterNameList;
+    }
+
+    public void setClusterNameList(List<String> clusterNameList) {
+        this.clusterNameList = clusterNameList;
+    }
+
+    public List<String> getBrokerNameList() {
+        return brokerNameList;
+    }
+
+    public void setBrokerNameList(List<String> brokerNameList) {
+        this.brokerNameList = brokerNameList;
+    }
+
+    public SubscriptionGroupConfig getSubscriptionGroupConfig() {
+        return subscriptionGroupConfig;
+    }
+
+    public void setSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
+        this.subscriptionGroupConfig = subscriptionGroupConfig;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java
new file mode 100644
index 0000000..152256b
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import java.util.List;
+
+public class DeleteSubGroupRequest {
+    private String groupName;
+    private List<String> brokerNameList;
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+    public List<String> getBrokerNameList() {
+        return brokerNameList;
+    }
+
+    public void setBrokerNameList(List<String> brokerNameList) {
+        this.brokerNameList = brokerNameList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java
new file mode 100644
index 0000000..22263af
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import java.util.List;
+
+public class ResetOffsetRequest {
+    private List<String> consumerGroupList;
+    private String topic;
+    private long resetTime;
+    private boolean force;
+
+    public List<String> getConsumerGroupList() {
+        return consumerGroupList;
+    }
+
+    public void setConsumerGroupList(List<String> consumerGroupList) {
+        this.consumerGroupList = consumerGroupList;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public long getResetTime() {
+        return resetTime;
+    }
+
+    public void setResetTime(long resetTime) {
+        this.resetTime = resetTime;
+    }
+
+    public boolean isForce() {
+        return force;
+    }
+
+    public void setForce(boolean force) {
+        this.force = force;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java
new file mode 100644
index 0000000..c7ffa8a
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+public class SendTopicMessageRequest {
+    private String topic;
+    private String key;
+    private String tag;
+    private String messageBody;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public String getTag() {
+        return tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
+    public String getMessageBody() {
+        return messageBody;
+    }
+
+    public void setMessageBody(String messageBody) {
+        this.messageBody = messageBody;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java
new file mode 100644
index 0000000..e1f56d4
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import com.google.common.base.Objects;
+
+import java.util.List;
+
+public class TopicConfigInfo {
+
+    private List<String> clusterNameList;
+    private List<String> brokerNameList;
+
+    /** topicConfig */
+    private String topicName;
+    private int writeQueueNums;
+    private int readQueueNums;
+    private int perm;
+    private boolean order;
+
+    public List<String> getClusterNameList() {
+        return clusterNameList;
+    }
+
+    public void setClusterNameList(List<String> clusterNameList) {
+        this.clusterNameList = clusterNameList;
+    }
+
+    /** topicConfig */
+
+
+
+    public List<String> getBrokerNameList() {
+        return brokerNameList;
+    }
+
+    public void setBrokerNameList(List<String> brokerNameList) {
+        this.brokerNameList = brokerNameList;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+    public int getWriteQueueNums() {
+        return writeQueueNums;
+    }
+
+    public void setWriteQueueNums(int writeQueueNums) {
+        this.writeQueueNums = writeQueueNums;
+    }
+
+    public int getReadQueueNums() {
+        return readQueueNums;
+    }
+
+    public void setReadQueueNums(int readQueueNums) {
+        this.readQueueNums = readQueueNums;
+    }
+
+    public int getPerm() {
+        return perm;
+    }
+
+    public void setPerm(int perm) {
+        this.perm = perm;
+    }
+
+    public boolean isOrder() {
+        return order;
+    }
+
+    public void setOrder(boolean order) {
+        this.order = order;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        TopicConfigInfo that = (TopicConfigInfo) o;
+        return writeQueueNums == that.writeQueueNums &&
+            readQueueNums == that.readQueueNums &&
+            perm == that.perm &&
+            order == that.order &&
+            Objects.equal(topicName, that.topicName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java
new file mode 100644
index 0000000..53a0e21
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Resource;
+import org.apache.commons.collections.CollectionUtils;
+
+public abstract class AbstractCommonService {
+    @Resource
+    protected MQAdminExt mqAdminExt;
+    protected final Set<String> changeToBrokerNameSet(HashMap<String, Set<String>> clusterAddrTable,
+        List<String> clusterNameList, List<String> brokerNameList) {
+        Set<String> finalBrokerNameList = Sets.newHashSet();
+        if (CollectionUtils.isNotEmpty(clusterNameList)) {
+            try {
+                for (String clusterName : clusterNameList) {
+                    finalBrokerNameList.addAll(clusterAddrTable.get(clusterName));
+                }
+            }
+            catch (Exception e) {
+                throw Throwables.propagate(e);
+            }
+        }
+        if (CollectionUtils.isNotEmpty(brokerNameList)) {
+            finalBrokerNameList.addAll(brokerNameList);
+        }
+        return finalBrokerNameList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ClusterService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ClusterService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ClusterService.java
new file mode 100644
index 0000000..43489fb
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ClusterService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import java.util.Map;
+import java.util.Properties;
+
+public interface ClusterService {
+    Map<String, Object> list();
+
+    Properties getBrokerConfig(String brokerAddr);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java
new file mode 100644
index 0000000..1d9ac12
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat;
+import org.apache.rocketmq.console.model.GroupConsumeInfo;
+import org.apache.rocketmq.console.model.TopicConsumerInfo;
+import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
+import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest;
+import org.apache.rocketmq.console.model.request.ResetOffsetRequest;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface ConsumerService {
+    List<GroupConsumeInfo> queryGroupList();
+
+    GroupConsumeInfo queryGroup(String consumerGroup);
+
+
+    List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName);
+
+    List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName);
+
+    Map<String, TopicConsumerInfo> queryConsumeStatsListByTopicName(String topic);
+
+    Map<String /*consumerGroup*/, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest resetOffsetRequest);
+
+    List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group);
+
+    boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest);
+
+    boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo);
+
+    Set<String> fetchBrokerNameSetBySubscriptionGroup(String group);
+
+    ConsumerConnection getConsumerConnection(String consumerGroup);
+
+    ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java
new file mode 100644
index 0000000..27a3645
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import com.google.common.cache.LoadingCache;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+public interface DashboardCollectService {
+    // todo just move the task to org.apache.rocketmq.console.task.DashboardCollectTask
+    // the code can be reconstruct
+    LoadingCache<String, List<String>> getBrokerMap();
+
+    LoadingCache<String, List<String>> getTopicMap();
+
+    Map<String, List<String>> jsonDataFile2map(File file);
+
+    Map<String, List<String>> getBrokerCache(String date);
+
+    Map<String, List<String>> getTopicCache(String date);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardService.java
new file mode 100644
index 0000000..a4cf798
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import java.util.List;
+import java.util.Map;
+
+public interface DashboardService {
+    /**
+     * @param date format yyyy-MM-dd
+     */
+    Map<String, List<String>> queryBrokerData(String date);
+
+    /**
+     * @param date format yyyy-MM-dd
+     */
+    Map<String, List<String>> queryTopicData(String date);
+
+    /**
+     * @param date format yyyy-MM-dd
+     * @param topicName
+     */
+    List<String> queryTopicData(String date, String topicName);
+
+    List<String> queryTopicCurrentData();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
new file mode 100644
index 0000000..e56b4d8
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.apache.rocketmq.console.model.MessageView;
+
+import java.util.List;
+
+public interface MessageService {
+    /**
+     * @param subject
+     * @param msgId
+     */
+    Pair<MessageView, List<MessageTrack>> viewMessage(String subject, final String msgId);
+
+    List<MessageView> queryMessageByTopicAndKey(final String topic, final String key);
+
+    /**
+     * @param topic
+     * @param begin
+     * @param end
+     * org.apache.rocketmq.tools.command.message.PrintMessageSubCommand
+     */
+    List<MessageView> queryMessageByTopic(final String topic, final long begin,
+        final long end);
+
+    List<MessageTrack> messageTrackDetail(MessageExt msg);
+
+    ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup,
+        String clientId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MonitorService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MonitorService.java
new file mode 100644
index 0000000..4bf659c
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MonitorService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import java.util.Map;
+import org.apache.rocketmq.console.model.ConsumerMonitorConfig;
+
+public interface MonitorService {
+    boolean createOrUpdateConsumerMonitor(String name, ConsumerMonitorConfig config);
+
+    Map<String, ConsumerMonitorConfig> queryConsumerMonitorConfig();
+
+    ConsumerMonitorConfig queryConsumerMonitorConfigByGroupName(String consumeGroupName);
+
+    boolean deleteConsumerMonitor(String consumeGroupName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/OpsService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/OpsService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/OpsService.java
new file mode 100644
index 0000000..d3bd68c
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/OpsService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import java.util.Map;
+import org.apache.rocketmq.console.service.checker.CheckerType;
+
+public interface OpsService {
+    Map<String, Object> homePageInfo();
+
+    void updateNameSvrAddrList(String nameSvrAddrList);
+
+    String getNameSvrList();
+
+    Map<CheckerType,Object> rocketMqStatusCheck();
+
+    boolean updateIsVIPChannel(String useVIPChannel);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ProducerService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ProducerService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ProducerService.java
new file mode 100644
index 0000000..cda7c48
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ProducerService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+
+public interface ProducerService {
+    ProducerConnection getProducerConnection(String producerGroup, String topic);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/TopicService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/TopicService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/TopicService.java
new file mode 100644
index 0000000..41a6b3b
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/TopicService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
+import org.apache.rocketmq.console.model.request.TopicConfigInfo;
+
+import java.util.List;
+
+public interface TopicService {
+    TopicList fetchAllTopicList();
+
+    TopicStatsTable stats(String topic);
+
+    TopicRouteData route(String topic);
+
+    GroupList queryTopicConsumerInfo(String topic);
+
+    void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest);
+
+    TopicConfig examineTopicConfig(String topic, String brokerName);
+
+    List<TopicConfigInfo> examineTopicConfig(String topic);
+
+    boolean deleteTopic(String topic, String clusterName);
+
+    boolean deleteTopic(String topic);
+
+    boolean deleteTopicInBroker(String brokerName, String topic);
+
+    SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java
new file mode 100644
index 0000000..8a85008
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker;
+
+public enum CheckerType {
+    CLUSTER_HEALTH_CHECK,
+    TOPIC_ONLY_ONE_BROKER_CHECK
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java
new file mode 100644
index 0000000..3b8f58d
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker;
+
+public interface RocketMqChecker {
+    public Object doCheck();
+
+    public CheckerType checkerType();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java
new file mode 100644
index 0000000..5c2c893
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker.impl;
+
+import org.apache.rocketmq.console.service.checker.CheckerType;
+import org.apache.rocketmq.console.service.checker.RocketMqChecker;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ClusterHealthCheckerImpl implements RocketMqChecker {
+    @Override
+    public Object doCheck() {
+        return null;
+    }
+
+    @Override
+    public CheckerType checkerType() {
+        return CheckerType.CLUSTER_HEALTH_CHECK;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java
new file mode 100644
index 0000000..0f06a13
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker.impl;
+
+import org.apache.rocketmq.console.service.checker.CheckerType;
+import org.apache.rocketmq.console.service.checker.RocketMqChecker;
+import org.springframework.stereotype.Service;
+
+@Service
+public class TopicOnlyOneBrokerCheckerImpl implements RocketMqChecker {
+    @Override
+    public Object doCheck() {
+        return null;
+    }
+
+    @Override
+    public CheckerType checkerType() {
+        return CheckerType.TOPIC_ONLY_ONE_BROKER_CHECK;
+    }
+}