You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/06/15 02:53:19 UTC
[49/51] [partial] incubator-rocketmq-externals git commit: Release
rocketmq-console 1.0.0 version
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/NamesvrController.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/NamesvrController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/NamesvrController.java
new file mode 100644
index 0000000..ad6c25a
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/NamesvrController.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.controller;
+
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.aspect.admin.annotation.OriginalControllerReturnValue;
+import org.apache.rocketmq.console.service.OpsService;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping("/rocketmq")
+public class NamesvrController {
+ @Resource
+ private OpsService opsService;
+
+ @RequestMapping(value = "/nsaddr", method = RequestMethod.GET)
+ @ResponseBody
+ @OriginalControllerReturnValue
+ public Object nsaddr() {
+ return opsService.getNameSvrList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/OpsController.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/OpsController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/OpsController.java
new file mode 100644
index 0000000..d82862f
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/OpsController.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.controller;
+
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.service.OpsService;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping("/ops")
+public class OpsController {
+
+ @Resource
+ private OpsService opsService;
+
+ @RequestMapping(value = "/homePage.query", method = RequestMethod.GET)
+ @ResponseBody
+ public Object homePage() {
+ return opsService.homePageInfo();
+ }
+
+ @RequestMapping(value = "/updateNameSvrAddr.do", method = RequestMethod.POST)
+ @ResponseBody
+ public Object updateNameSvrAddr(@RequestParam String nameSvrAddrList) {
+ opsService.updateNameSvrAddrList(nameSvrAddrList);
+ return true;
+ }
+
+ @RequestMapping(value = "/updateIsVIPChannel.do", method = RequestMethod.POST)
+ @ResponseBody
+ public Object updateIsVIPChannel(@RequestParam String useVIPChannel) {
+ opsService.updateIsVIPChannel(useVIPChannel);
+ return true;
+ }
+
+
+ @RequestMapping(value = "/rocketMqStatus.query", method = RequestMethod.GET)
+ @ResponseBody
+ public Object clusterStatus() {
+ return opsService.rocketMqStatusCheck();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/ProducerController.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/ProducerController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/ProducerController.java
new file mode 100644
index 0000000..1a69de5
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/ProducerController.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.controller;
+
+import javax.annotation.Resource;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.console.model.ConnectionInfo;
+import org.apache.rocketmq.console.service.ProducerService;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping("/producer")
+public class ProducerController {
+
+ @Resource
+ private ProducerService producerService;
+
+ @RequestMapping(value = "/producerConnection.query", method = {RequestMethod.GET})
+ @ResponseBody
+ public Object producerConnection(@RequestParam String producerGroup, @RequestParam String topic) {
+ ProducerConnection producerConnection = producerService.getProducerConnection(producerGroup, topic);
+ producerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(producerConnection.getConnectionSet()));
+ return producerConnection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TestController.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TestController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TestController.java
new file mode 100644
index 0000000..d7af1ad
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TestController.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.controller;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import java.util.List;
+import javax.annotation.Resource;
+import org.apache.rocketmq.console.config.RMQConfigure;
+import org.apache.rocketmq.console.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping("/test")
+public class TestController {
+ private Logger logger = LoggerFactory.getLogger(TestController.class);
+ private String testTopic = "TestTopic";
+
+ @Resource
+ private RMQConfigure rMQConfigure;
+
+ @RequestMapping(value = "/runTask.do", method = RequestMethod.GET)
+ @ResponseBody
+ public Object list() throws MQClientException, RemotingException, InterruptedException {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(testTopic + "Group");
+ consumer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ consumer.subscribe(testTopic, "*");
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ logger.info("receiveMessage msgSize={}", msgs.size());
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ consumer.start();
+ final DefaultMQProducer producer = new DefaultMQProducer(testTopic + "Group");
+ producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
+ producer.setNamesrvAddr(rMQConfigure.getNamesrvAddr());
+ producer.start();
+
+ new Thread(new Runnable() {
+
+ @Override public void run() {
+
+ int i = 0;
+ while (true) {
+ try {
+ Message msg = new Message(testTopic,
+ "TagA" + i,
+ "KEYS" + i,
+ ("Hello RocketMQ " + i).getBytes()
+ );
+ Thread.sleep(1000L);
+ SendResult sendResult = producer.send(msg);
+ logger.info("sendMessage={}", JsonUtil.obj2String(sendResult));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ try {
+ Thread.sleep(1000);
+ }
+ catch (Exception ignore) {
+ }
+ }
+ }
+ }
+ }).start();
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TopicController.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TopicController.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TopicController.java
new file mode 100644
index 0000000..90819af
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TopicController.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.controller;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
+import org.apache.rocketmq.console.model.request.TopicConfigInfo;
+import org.apache.rocketmq.console.service.ConsumerService;
+import org.apache.rocketmq.console.service.TopicService;
+import org.apache.rocketmq.console.util.JsonUtil;
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+
+import javax.annotation.Resource;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@RequestMapping("/topic")
+public class TopicController {
+ private Logger logger = LoggerFactory.getLogger(TopicController.class);
+
+ @Resource
+ private TopicService topicService;
+
+ @Resource
+ private ConsumerService consumerService;
+
+ @RequestMapping(value = "/list.query", method = RequestMethod.GET)
+ @ResponseBody
+ public Object list() throws MQClientException, RemotingException, InterruptedException {
+ return topicService.fetchAllTopicList();
+ }
+
+ @RequestMapping(value = "/stats.query", method = RequestMethod.GET)
+ @ResponseBody
+ public Object stats(@RequestParam String topic) {
+ return topicService.stats(topic);
+ }
+
+ @RequestMapping(value = "/route.query", method = RequestMethod.GET)
+ @ResponseBody
+ public Object route(@RequestParam String topic) {
+ return topicService.route(topic);
+ }
+
+
+ @RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST})
+ @ResponseBody
+ public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {
+ Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()),
+ "clusterName or brokerName can not be all blank");
+ logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));
+ topicService.createOrUpdate(topicCreateOrUpdateRequest);
+ return true;
+ }
+
+ @RequestMapping(value = "/queryConsumerByTopic.query")
+ @ResponseBody
+ public Object queryConsumerByTopic(@RequestParam String topic) {
+ return consumerService.queryConsumeStatsListByTopicName(topic);
+ }
+
+ @RequestMapping(value = "/queryTopicConsumerInfo.query")
+ @ResponseBody
+ public Object queryTopicConsumerInfo(@RequestParam String topic) {
+ return topicService.queryTopicConsumerInfo(topic);
+ }
+
+ @RequestMapping(value = "/examineTopicConfig.query")
+ @ResponseBody
+ public Object examineTopicConfig(@RequestParam String topic,
+ @RequestParam(required = false) String brokerName) throws RemotingException, MQClientException, InterruptedException {
+ return topicService.examineTopicConfig(topic);
+ }
+
+ @RequestMapping(value = "/sendTopicMessage.do", method = {RequestMethod.POST})
+ @ResponseBody
+ public Object sendTopicMessage(
+ @RequestBody SendTopicMessageRequest sendTopicMessageRequest) throws RemotingException, MQClientException, InterruptedException {
+ return topicService.sendTopicMessageRequest(sendTopicMessageRequest);
+ }
+
+ @RequestMapping(value = "/deleteTopic.do", method = {RequestMethod.POST})
+ @ResponseBody
+ public Object delete(@RequestParam(required = false) String clusterName, @RequestParam String topic) {
+ return topicService.deleteTopic(topic, clusterName);
+ }
+
+ @RequestMapping(value = "/deleteTopicByBroker.do", method = {RequestMethod.POST})
+ @ResponseBody
+ public Object deleteTopicByBroker(@RequestParam String brokerName, @RequestParam String topic) {
+ return topicService.deleteTopicInBroker(brokerName, topic);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/exception/ServiceException.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/exception/ServiceException.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/exception/ServiceException.java
new file mode 100644
index 0000000..7ad166b
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/exception/ServiceException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.exception;
+
+public class ServiceException extends RuntimeException {
+ private static final long serialVersionUID = 9213584003139969215L;
+ private int code;
+
+ public ServiceException(int code, String message) {
+ super(message);
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConnectionInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConnectionInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConnectionInfo.java
new file mode 100644
index 0000000..6e8dd19
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConnectionInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.protocol.body.Connection;
+
+public class ConnectionInfo extends Connection {
+ private String versionDesc;
+
+ public static ConnectionInfo buildConnectionInfo(Connection connection) {
+ ConnectionInfo connectionInfo = new ConnectionInfo();
+ connectionInfo.setClientId(connection.getClientId());
+ connectionInfo.setClientAddr(connection.getClientAddr());
+ connectionInfo.setLanguage(connection.getLanguage());
+ connectionInfo.setVersion(connection.getVersion());
+ connectionInfo.setVersionDesc(MQVersion.getVersionDesc(connection.getVersion()));
+ return connectionInfo;
+ }
+
+ public static HashSet<Connection> buildConnectionInfoHashSet(Collection<Connection> connectionList) {
+ HashSet<Connection> connectionHashSet = Sets.newHashSet();
+ for (Connection connection : connectionList) {
+ connectionHashSet.add(buildConnectionInfo(connection));
+ }
+ return connectionHashSet;
+ }
+
+ public String getVersionDesc() {
+ return versionDesc;
+ }
+
+ public void setVersionDesc(String versionDesc) {
+ this.versionDesc = versionDesc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerGroupRollBackStat.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerGroupRollBackStat.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerGroupRollBackStat.java
new file mode 100644
index 0000000..3ddfe07
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerGroupRollBackStat.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.admin.RollbackStats;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class ConsumerGroupRollBackStat {
+ private boolean status;
+ private String errMsg;
+ private List<RollbackStats> rollbackStatsList = Lists.newArrayList();
+
+ public ConsumerGroupRollBackStat(boolean status) {
+ this.status = status;
+ }
+
+ public ConsumerGroupRollBackStat(boolean status, String errMsg) {
+ this.status = status;
+ this.errMsg = errMsg;
+ }
+
+ public String getErrMsg() {
+ return errMsg;
+ }
+
+ public void setErrMsg(String errMsg) {
+ this.errMsg = errMsg;
+ }
+
+ public boolean isStatus() {
+ return status;
+ }
+
+ public void setStatus(boolean status) {
+ this.status = status;
+ }
+
+ public List<RollbackStats> getRollbackStatsList() {
+ return rollbackStatsList;
+ }
+
+ public void setRollbackStatsList(List<RollbackStats> rollbackStatsList) {
+ this.rollbackStatsList = rollbackStatsList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerMonitorConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerMonitorConfig.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerMonitorConfig.java
new file mode 100644
index 0000000..9124f00
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/ConsumerMonitorConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+public class ConsumerMonitorConfig {
+ private int minCount;
+ private int maxDiffTotal;
+
+ public ConsumerMonitorConfig() {
+ }
+
+ public ConsumerMonitorConfig(int minCount, int maxDiffTotal) {
+ this.minCount = minCount;
+ this.maxDiffTotal = maxDiffTotal;
+ }
+
+ public int getMinCount() {
+ return minCount;
+ }
+
+ public void setMinCount(int minCount) {
+ this.minCount = minCount;
+ }
+
+ public int getMaxDiffTotal() {
+ return maxDiffTotal;
+ }
+
+ public void setMaxDiffTotal(int maxDiffTotal) {
+ this.maxDiffTotal = maxDiffTotal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java
new file mode 100644
index 0000000..27c5f92
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/GroupConsumeInfo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
+ private String group;
+ private String version;
+ private int count;
+ private ConsumeType consumeType;
+ private MessageModel messageModel;
+ private int consumeTps;
+ private long diffTotal = -1;
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+
+ public ConsumeType getConsumeType() {
+ return consumeType;
+ }
+
+ public void setConsumeType(ConsumeType consumeType) {
+ this.consumeType = consumeType;
+ }
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+
+ public long getDiffTotal() {
+ return diffTotal;
+ }
+
+ public void setDiffTotal(long diffTotal) {
+ this.diffTotal = diffTotal;
+ }
+
+ @Override
+ public int compareTo(GroupConsumeInfo o) {
+ if (this.count != o.count) {
+ return o.count - this.count;
+ }
+
+ return (int) (o.diffTotal - diffTotal);
+ }
+
+ public int getConsumeTps() {
+ return consumeTps;
+ }
+
+ public void setConsumeTps(int consumeTps) {
+ this.consumeTps = consumeTps;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageView.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageView.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageView.java
new file mode 100644
index 0000000..4011cad
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/MessageView.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import com.google.common.base.Charsets;
+import org.springframework.beans.BeanUtils;
+
+import java.net.SocketAddress;
+import java.util.Map;
+
+public class MessageView {
+
+ /** from MessageExt **/
+ private int queueId;
+ private int storeSize;
+ private long queueOffset;
+ private int sysFlag;
+ private long bornTimestamp;
+ private SocketAddress bornHost;
+ private long storeTimestamp;
+ private SocketAddress storeHost;
+ private String msgId;
+ private long commitLogOffset;
+ private int bodyCRC;
+ private int reconsumeTimes;
+ private long preparedTransactionOffset;
+ /**from MessageExt**/
+
+ /** from Message **/
+ private String topic;
+ private int flag;
+ private Map<String, String> properties;
+ private String messageBody; // body
+
+ /** from Message **/
+
+ public static MessageView fromMessageExt(MessageExt messageExt) {
+ MessageView messageView = new MessageView();
+ BeanUtils.copyProperties(messageExt, messageView);
+ if (messageExt.getBody() != null) {
+ messageView.setMessageBody(new String(messageExt.getBody(), Charsets.UTF_8));
+ }
+ return messageView;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getFlag() {
+ return flag;
+ }
+
+ public void setFlag(int flag) {
+ this.flag = flag;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+ public int getStoreSize() {
+ return storeSize;
+ }
+
+ public void setStoreSize(int storeSize) {
+ this.storeSize = storeSize;
+ }
+
+ public long getQueueOffset() {
+ return queueOffset;
+ }
+
+ public void setQueueOffset(long queueOffset) {
+ this.queueOffset = queueOffset;
+ }
+
+ public int getSysFlag() {
+ return sysFlag;
+ }
+
+ public void setSysFlag(int sysFlag) {
+ this.sysFlag = sysFlag;
+ }
+
+ public long getBornTimestamp() {
+ return bornTimestamp;
+ }
+
+ public void setBornTimestamp(long bornTimestamp) {
+ this.bornTimestamp = bornTimestamp;
+ }
+
+ public SocketAddress getBornHost() {
+ return bornHost;
+ }
+
+ public void setBornHost(SocketAddress bornHost) {
+ this.bornHost = bornHost;
+ }
+
+ public long getStoreTimestamp() {
+ return storeTimestamp;
+ }
+
+ public void setStoreTimestamp(long storeTimestamp) {
+ this.storeTimestamp = storeTimestamp;
+ }
+
+ public SocketAddress getStoreHost() {
+ return storeHost;
+ }
+
+ public void setStoreHost(SocketAddress storeHost) {
+ this.storeHost = storeHost;
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+ public long getCommitLogOffset() {
+ return commitLogOffset;
+ }
+
+ public void setCommitLogOffset(long commitLogOffset) {
+ this.commitLogOffset = commitLogOffset;
+ }
+
+ public int getBodyCRC() {
+ return bodyCRC;
+ }
+
+ public void setBodyCRC(int bodyCRC) {
+ this.bodyCRC = bodyCRC;
+ }
+
+ public int getReconsumeTimes() {
+ return reconsumeTimes;
+ }
+
+ public void setReconsumeTimes(int reconsumeTimes) {
+ this.reconsumeTimes = reconsumeTimes;
+ }
+
+ public long getPreparedTransactionOffset() {
+ return preparedTransactionOffset;
+ }
+
+ public void setPreparedTransactionOffset(long preparedTransactionOffset) {
+ this.preparedTransactionOffset = preparedTransactionOffset;
+ }
+
+ public String getMessageBody() {
+ return messageBody;
+ }
+
+ public void setMessageBody(String messageBody) {
+ this.messageBody = messageBody;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java
new file mode 100644
index 0000000..a6d67a8
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/QueueStatInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.springframework.beans.BeanUtils;
+
+public class QueueStatInfo {
+ private String brokerName;
+ private int queueId;
+ private long brokerOffset;
+ private long consumerOffset;
+ private String clientInfo;
+ private long lastTimestamp;
+
+ public static QueueStatInfo fromOffsetTableEntry(MessageQueue key, OffsetWrapper value) {
+ QueueStatInfo queueStatInfo = new QueueStatInfo();
+ BeanUtils.copyProperties(key, queueStatInfo);
+ BeanUtils.copyProperties(value, queueStatInfo);
+ return queueStatInfo;
+ }
+
+ public String getClientInfo() {
+ return clientInfo;
+ }
+
+ public void setClientInfo(String clientInfo) {
+ this.clientInfo = clientInfo;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+ public long getBrokerOffset() {
+ return brokerOffset;
+ }
+
+ public void setBrokerOffset(long brokerOffset) {
+ this.brokerOffset = brokerOffset;
+ }
+
+ public long getConsumerOffset() {
+ return consumerOffset;
+ }
+
+ public void setConsumerOffset(long consumerOffset) {
+ this.consumerOffset = consumerOffset;
+ }
+
+ public long getLastTimestamp() {
+ return lastTimestamp;
+ }
+
+ public void setLastTimestamp(long lastTimestamp) {
+ this.lastTimestamp = lastTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java
new file mode 100644
index 0000000..bd6d8e2
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/TopicConsumerInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class TopicConsumerInfo {
+ private String topic;
+ private long diffTotal;
+ private long lastTimestamp;
+ private List<QueueStatInfo> queueStatInfoList = Lists.newArrayList();
+
+ public TopicConsumerInfo(String topic) {
+ this.topic = topic;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public long getDiffTotal() {
+ return diffTotal;
+ }
+
+ public void setDiffTotal(long diffTotal) {
+ this.diffTotal = diffTotal;
+ }
+
+ public List<QueueStatInfo> getQueueStatInfoList() {
+ return queueStatInfoList;
+ }
+
+ public long getLastTimestamp() {
+ return lastTimestamp;
+ }
+
+ public void appendQueueStatInfo(QueueStatInfo queueStatInfo) {
+ queueStatInfoList.add(queueStatInfo);
+ diffTotal = diffTotal + (queueStatInfo.getBrokerOffset() - queueStatInfo.getConsumerOffset());
+ lastTimestamp = Math.max(lastTimestamp, queueStatInfo.getLastTimestamp());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java
new file mode 100644
index 0000000..cee155c
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ConsumerConfigInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+
+import java.util.List;
+
+public class ConsumerConfigInfo {
+ private List<String> clusterNameList;
+
+ private List<String> brokerNameList;
+ private SubscriptionGroupConfig subscriptionGroupConfig;
+
+ public ConsumerConfigInfo() {
+ }
+
+ public ConsumerConfigInfo(List<String> brokerNameList, SubscriptionGroupConfig subscriptionGroupConfig) {
+ this.brokerNameList = brokerNameList;
+ this.subscriptionGroupConfig = subscriptionGroupConfig;
+ }
+
+ public List<String> getClusterNameList() {
+ return clusterNameList;
+ }
+
+ public void setClusterNameList(List<String> clusterNameList) {
+ this.clusterNameList = clusterNameList;
+ }
+
+ public List<String> getBrokerNameList() {
+ return brokerNameList;
+ }
+
+ public void setBrokerNameList(List<String> brokerNameList) {
+ this.brokerNameList = brokerNameList;
+ }
+
+ public SubscriptionGroupConfig getSubscriptionGroupConfig() {
+ return subscriptionGroupConfig;
+ }
+
+ public void setSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
+ this.subscriptionGroupConfig = subscriptionGroupConfig;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java
new file mode 100644
index 0000000..152256b
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/DeleteSubGroupRequest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import java.util.List;
+
+public class DeleteSubGroupRequest {
+ private String groupName;
+ private List<String> brokerNameList;
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+ public List<String> getBrokerNameList() {
+ return brokerNameList;
+ }
+
+ public void setBrokerNameList(List<String> brokerNameList) {
+ this.brokerNameList = brokerNameList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java
new file mode 100644
index 0000000..22263af
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/ResetOffsetRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import java.util.List;
+
+public class ResetOffsetRequest {
+ private List<String> consumerGroupList;
+ private String topic;
+ private long resetTime;
+ private boolean force;
+
+ public List<String> getConsumerGroupList() {
+ return consumerGroupList;
+ }
+
+ public void setConsumerGroupList(List<String> consumerGroupList) {
+ this.consumerGroupList = consumerGroupList;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public long getResetTime() {
+ return resetTime;
+ }
+
+ public void setResetTime(long resetTime) {
+ this.resetTime = resetTime;
+ }
+
+ public boolean isForce() {
+ return force;
+ }
+
+ public void setForce(boolean force) {
+ this.force = force;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java
new file mode 100644
index 0000000..c7ffa8a
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/SendTopicMessageRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+public class SendTopicMessageRequest {
+ private String topic;
+ private String key;
+ private String tag;
+ private String messageBody;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public String getTag() {
+ return tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
+ public String getMessageBody() {
+ return messageBody;
+ }
+
+ public void setMessageBody(String messageBody) {
+ this.messageBody = messageBody;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java
new file mode 100644
index 0000000..e1f56d4
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/model/request/TopicConfigInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.model.request;
+
+import com.google.common.base.Objects;
+
+import java.util.List;
+
+public class TopicConfigInfo {
+
+ private List<String> clusterNameList;
+ private List<String> brokerNameList;
+
+ /** topicConfig */
+ private String topicName;
+ private int writeQueueNums;
+ private int readQueueNums;
+ private int perm;
+ private boolean order;
+
+ public List<String> getClusterNameList() {
+ return clusterNameList;
+ }
+
+ public void setClusterNameList(List<String> clusterNameList) {
+ this.clusterNameList = clusterNameList;
+ }
+
+ /** topicConfig */
+
+
+
+ public List<String> getBrokerNameList() {
+ return brokerNameList;
+ }
+
+ public void setBrokerNameList(List<String> brokerNameList) {
+ this.brokerNameList = brokerNameList;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public int getWriteQueueNums() {
+ return writeQueueNums;
+ }
+
+ public void setWriteQueueNums(int writeQueueNums) {
+ this.writeQueueNums = writeQueueNums;
+ }
+
+ public int getReadQueueNums() {
+ return readQueueNums;
+ }
+
+ public void setReadQueueNums(int readQueueNums) {
+ this.readQueueNums = readQueueNums;
+ }
+
+ public int getPerm() {
+ return perm;
+ }
+
+ public void setPerm(int perm) {
+ this.perm = perm;
+ }
+
+ public boolean isOrder() {
+ return order;
+ }
+
+ public void setOrder(boolean order) {
+ this.order = order;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ TopicConfigInfo that = (TopicConfigInfo) o;
+ return writeQueueNums == that.writeQueueNums &&
+ readQueueNums == that.readQueueNums &&
+ perm == that.perm &&
+ order == that.order &&
+ Objects.equal(topicName, that.topicName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(topicName, writeQueueNums, readQueueNums, perm, order);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java
new file mode 100644
index 0000000..53a0e21
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/AbstractCommonService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Resource;
+import org.apache.commons.collections.CollectionUtils;
+
+public abstract class AbstractCommonService {
+ @Resource
+ protected MQAdminExt mqAdminExt;
+ protected final Set<String> changeToBrokerNameSet(HashMap<String, Set<String>> clusterAddrTable,
+ List<String> clusterNameList, List<String> brokerNameList) {
+ Set<String> finalBrokerNameList = Sets.newHashSet();
+ if (CollectionUtils.isNotEmpty(clusterNameList)) {
+ try {
+ for (String clusterName : clusterNameList) {
+ finalBrokerNameList.addAll(clusterAddrTable.get(clusterName));
+ }
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ if (CollectionUtils.isNotEmpty(brokerNameList)) {
+ finalBrokerNameList.addAll(brokerNameList);
+ }
+ return finalBrokerNameList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ClusterService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ClusterService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ClusterService.java
new file mode 100644
index 0000000..43489fb
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ClusterService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import java.util.Map;
+import java.util.Properties;
+
+public interface ClusterService {
+ Map<String, Object> list();
+
+ Properties getBrokerConfig(String brokerAddr);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java
new file mode 100644
index 0000000..1d9ac12
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ConsumerService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat;
+import org.apache.rocketmq.console.model.GroupConsumeInfo;
+import org.apache.rocketmq.console.model.TopicConsumerInfo;
+import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
+import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest;
+import org.apache.rocketmq.console.model.request.ResetOffsetRequest;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface ConsumerService {
+ List<GroupConsumeInfo> queryGroupList();
+
+ GroupConsumeInfo queryGroup(String consumerGroup);
+
+
+ List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName);
+
+ List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName);
+
+ Map<String, TopicConsumerInfo> queryConsumeStatsListByTopicName(String topic);
+
+ Map<String /*consumerGroup*/, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest resetOffsetRequest);
+
+ List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String group);
+
+ boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest);
+
+ boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo);
+
+ Set<String> fetchBrokerNameSetBySubscriptionGroup(String group);
+
+ ConsumerConnection getConsumerConnection(String consumerGroup);
+
+ ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java
new file mode 100644
index 0000000..27a3645
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardCollectService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import com.google.common.cache.LoadingCache;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+public interface DashboardCollectService {
+ // todo just move the task to org.apache.rocketmq.console.task.DashboardCollectTask
+ // the code can be reconstruct
+ LoadingCache<String, List<String>> getBrokerMap();
+
+ LoadingCache<String, List<String>> getTopicMap();
+
+ Map<String, List<String>> jsonDataFile2map(File file);
+
+ Map<String, List<String>> getBrokerCache(String date);
+
+ Map<String, List<String>> getTopicCache(String date);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardService.java
new file mode 100644
index 0000000..a4cf798
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/DashboardService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import java.util.List;
+import java.util.Map;
+
+public interface DashboardService {
+ /**
+ * @param date format yyyy-MM-dd
+ */
+ Map<String, List<String>> queryBrokerData(String date);
+
+ /**
+ * @param date format yyyy-MM-dd
+ */
+ Map<String, List<String>> queryTopicData(String date);
+
+ /**
+ * @param date format yyyy-MM-dd
+ * @param topicName
+ */
+ List<String> queryTopicData(String date, String topicName);
+
+ List<String> queryTopicCurrentData();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
new file mode 100644
index 0000000..e56b4d8
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MessageService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.apache.rocketmq.console.model.MessageView;
+
+import java.util.List;
+
+public interface MessageService {
+ /**
+ * @param subject
+ * @param msgId
+ */
+ Pair<MessageView, List<MessageTrack>> viewMessage(String subject, final String msgId);
+
+ List<MessageView> queryMessageByTopicAndKey(final String topic, final String key);
+
+ /**
+ * @param topic
+ * @param begin
+ * @param end
+ * org.apache.rocketmq.tools.command.message.PrintMessageSubCommand
+ */
+ List<MessageView> queryMessageByTopic(final String topic, final long begin,
+ final long end);
+
+ List<MessageTrack> messageTrackDetail(MessageExt msg);
+
+ ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup,
+ String clientId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MonitorService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MonitorService.java
new file mode 100644
index 0000000..4bf659c
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/MonitorService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import java.util.Map;
+import org.apache.rocketmq.console.model.ConsumerMonitorConfig;
+
+public interface MonitorService {
+ boolean createOrUpdateConsumerMonitor(String name, ConsumerMonitorConfig config);
+
+ Map<String, ConsumerMonitorConfig> queryConsumerMonitorConfig();
+
+ ConsumerMonitorConfig queryConsumerMonitorConfigByGroupName(String consumeGroupName);
+
+ boolean deleteConsumerMonitor(String consumeGroupName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/OpsService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/OpsService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/OpsService.java
new file mode 100644
index 0000000..d3bd68c
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/OpsService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.service;
+
+import java.util.Map;
+import org.apache.rocketmq.console.service.checker.CheckerType;
+
+public interface OpsService {
+ Map<String, Object> homePageInfo();
+
+ void updateNameSvrAddrList(String nameSvrAddrList);
+
+ String getNameSvrList();
+
+ Map<CheckerType,Object> rocketMqStatusCheck();
+
+ boolean updateIsVIPChannel(String useVIPChannel);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ProducerService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ProducerService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ProducerService.java
new file mode 100644
index 0000000..cda7c48
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/ProducerService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+
+public interface ProducerService {
+ ProducerConnection getProducerConnection(String producerGroup, String topic);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/TopicService.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/TopicService.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/TopicService.java
new file mode 100644
index 0000000..41a6b3b
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/TopicService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.console.service;
+
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.console.model.request.SendTopicMessageRequest;
+import org.apache.rocketmq.console.model.request.TopicConfigInfo;
+
+import java.util.List;
+
+public interface TopicService {
+ TopicList fetchAllTopicList();
+
+ TopicStatsTable stats(String topic);
+
+ TopicRouteData route(String topic);
+
+ GroupList queryTopicConsumerInfo(String topic);
+
+ void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest);
+
+ TopicConfig examineTopicConfig(String topic, String brokerName);
+
+ List<TopicConfigInfo> examineTopicConfig(String topic);
+
+ boolean deleteTopic(String topic, String clusterName);
+
+ boolean deleteTopic(String topic);
+
+ boolean deleteTopicInBroker(String brokerName, String topic);
+
+ SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java
new file mode 100644
index 0000000..8a85008
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/CheckerType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker;
+
+public enum CheckerType {
+ CLUSTER_HEALTH_CHECK,
+ TOPIC_ONLY_ONE_BROKER_CHECK
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java
new file mode 100644
index 0000000..3b8f58d
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/RocketMqChecker.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker;
+
+public interface RocketMqChecker {
+ public Object doCheck();
+
+ public CheckerType checkerType();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java
new file mode 100644
index 0000000..5c2c893
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/ClusterHealthCheckerImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker.impl;
+
+import org.apache.rocketmq.console.service.checker.CheckerType;
+import org.apache.rocketmq.console.service.checker.RocketMqChecker;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ClusterHealthCheckerImpl implements RocketMqChecker {
+ @Override
+ public Object doCheck() {
+ return null;
+ }
+
+ @Override
+ public CheckerType checkerType() {
+ return CheckerType.CLUSTER_HEALTH_CHECK;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e218eef9/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java
new file mode 100644
index 0000000..0f06a13
--- /dev/null
+++ b/rocketmq-console/src/main/java/org/apache/rocketmq/console/service/checker/impl/TopicOnlyOneBrokerCheckerImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.console.service.checker.impl;
+
+import org.apache.rocketmq.console.service.checker.CheckerType;
+import org.apache.rocketmq.console.service.checker.RocketMqChecker;
+import org.springframework.stereotype.Service;
+
+@Service
+public class TopicOnlyOneBrokerCheckerImpl implements RocketMqChecker {
+ @Override
+ public Object doCheck() {
+ return null;
+ }
+
+ @Override
+ public CheckerType checkerType() {
+ return CheckerType.TOPIC_ONLY_ONE_BROKER_CHECK;
+ }
+}