You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/08 11:15:11 UTC
[incubator-tubemq] branch TUBEMQ-421 updated: [TUBEMQ-493] reorg
restful apis and reorg services (#383)
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
new a9566ff [TUBEMQ-493] reorg restful apis and reorg services (#383)
a9566ff is described below
commit a9566ff789c117f18bce3c954ae8f926690144f4
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Fri Jan 8 19:15:05 2021 +0800
[TUBEMQ-493] reorg restful apis and reorg services (#383)
* [TUBEMQ-491] rebalance group or consumer
* [TUBEMQ-496] clean offset in consumer group
* [TUBEMQ-493] reorg restful apis and reorg services
* [TUBEMQ-493] reuse MasterService.baseRequestMaster and use unified path name
* [TUBEMQ-493] avoid using wildcard in import area.
* [TUBEMQ-493] avoid using wildcard in import area delete some code from other branch.
* [TUBEMQ-493] delete some personal property
---
.../controller/cluster/ClusterController.java | 8 +-
.../manager/controller/group/GroupController.java | 142 +++++++++++
.../request/AddBlackGroupReq.java} | 13 +-
.../request/DeleteBlackGroupReq.java} | 12 +-
.../request/DeleteOffsetReq.java} | 15 +-
.../manager/controller/node/NodeController.java | 164 ++++--------
.../controller/node/request/AddBrokersReq.java | 16 +-
.../controller/node/request/AddTopicReq.java | 24 +-
.../controller/node/request/BatchAddTopicReq.java | 3 +-
...dTopicReq.java => BrokerSetReadOrWriteReq.java} | 11 +-
...{BatchAddTopicReq.java => DeleteBrokerReq.java} | 10 +-
...ddTopicReq.java => OnlineOfflineBrokerReq.java} | 10 +-
...{BatchAddTopicReq.java => ReloadBrokerReq.java} | 9 +-
.../controller/topic/TopicWebController.java | 282 +++------------------
.../request/DeleteTopicReq.java} | 13 +-
.../request/ModifyTopicReq.java} | 24 +-
.../request/RebalanceConsumerReq.java} | 16 +-
.../request/RebalanceGroupReq.java} | 13 +-
.../request/SetAuthControlReq.java} | 13 +-
.../MasterService.java} | 40 ++-
.../apache/tubemq/manager/service/NodeService.java | 110 ++++----
.../tubemq/manager/service/TopicService.java | 206 +++++++++++++++
.../tubemq/manager/service/TubeMQHttpConst.java | 20 ++
.../tube/CleanOffsetResult.java} | 14 +-
.../tube/RebalanceGroupResult.java} | 12 +-
.../service/tube/TubeHttpGroupDetailInfo.java | 69 +++++
.../apache/tubemq/manager/utils/ConvertUtils.java | 59 +++--
.../src/main/resources/application.properties | 2 +-
28 files changed, 759 insertions(+), 571 deletions(-)
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
index 562813c..a2242a1 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -18,7 +18,7 @@
package org.apache.tubemq.manager.controller.cluster;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.utils.MasterUtils.*;
+import static org.apache.tubemq.manager.service.MasterService.*;
import com.google.gson.Gson;
import java.util.Map;
@@ -27,7 +27,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.utils.MasterUtils;
+import org.apache.tubemq.manager.service.MasterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestBody;
@@ -48,7 +48,7 @@ public class ClusterController {
private NodeRepository nodeRepository;
@Autowired
- public MasterUtils masterUtil;
+ public MasterService masterService;
/**
* query cluster info
@@ -57,7 +57,7 @@ public class ClusterController {
produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody String queryInfo(
@RequestParam Map<String, String> queryBody) throws Exception {
- String url = masterUtil.getQueryUrl(queryBody);
+ String url = masterService.getQueryUrl(queryBody);
return queryMaster(url);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
new file mode 100644
index 0000000..12b4fc4
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller.group;
+
+
+
+import static org.apache.tubemq.manager.service.MasterService.requestMaster;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
+import static org.apache.tubemq.manager.service.MasterService.queryMaster;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER_GROUP;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER;
+
+import com.google.gson.Gson;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.group.request.AddBlackGroupReq;
+import org.apache.tubemq.manager.controller.group.request.DeleteBlackGroupReq;
+import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
+import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
+import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
+import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
+import org.apache.tubemq.manager.service.TopicService;
+import org.apache.tubemq.manager.service.MasterService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping(path = "/v1/group")
+@Slf4j
+public class GroupController {
+
+ public Gson gson = new Gson();
+
+ @Autowired
+ private MasterService masterService;
+
+ @Autowired
+ private TopicService topicService;
+
+
+ @PostMapping("")
+ public @ResponseBody TubeMQResult groupMethodProxy(
+ @RequestParam String method, @RequestBody String req) throws Exception {
+ switch (method) {
+ case ADD:
+ return masterService.baseRequestMaster(gson.fromJson(req, BatchAddGroupAuthReq.class));
+ case DELETE:
+ return masterService.baseRequestMaster(gson.fromJson(req, DeleteGroupReq.class));
+ case REBALANCE_CONSUMER_GROUP:
+ return topicService.rebalanceGroup(gson.fromJson(req, RebalanceGroupReq.class));
+ case REBALANCE_CONSUMER:
+ return masterService.baseRequestMaster(gson.fromJson(req, RebalanceConsumerReq.class));
+ default:
+ return TubeMQResult.getErrorResult("no such method");
+ }
+ }
+
+ /**
+ * query the consumer group for certain topic
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ @GetMapping("/")
+ public @ResponseBody String queryConsumer(
+ @RequestParam Map<String, String> req) throws Exception {
+ String url = masterService.getQueryUrl(req);
+ return queryMaster(url);
+ }
+
+
+ @PostMapping("/offset")
+ public @ResponseBody TubeMQResult offsetProxy(
+ @RequestParam String method, @RequestBody String req) {
+ switch (method) {
+ case CLONE:
+ return topicService.cloneOffsetToOtherGroups(gson.fromJson(req, CloneOffsetReq.class));
+ case DELETE:
+ return topicService.deleteOffset(gson.fromJson(req, DeleteOffsetReq.class));
+ default:
+ return TubeMQResult.getErrorResult("no such method");
+ }
+ }
+
+
+ @PostMapping("/blackGroup")
+ public @ResponseBody TubeMQResult BlackGroupProxy(
+ @RequestParam String method, @RequestBody String req) {
+ switch (method) {
+ case ADD:
+ return masterService.baseRequestMaster(gson.fromJson(req, AddBlackGroupReq.class));
+ case DELETE:
+ return masterService.baseRequestMaster(gson.fromJson(req, DeleteBlackGroupReq.class));
+ default:
+ return TubeMQResult.getErrorResult("no such method");
+ }
+ }
+
+
+ /**
+ * query the black list for certain topic
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ @GetMapping("/blackGroup")
+ public @ResponseBody String queryBlackGroup(
+ @RequestParam Map<String, String> req) throws Exception {
+ String url = masterService.getQueryUrl(req);
+ return queryMaster(url);
+ }
+
+
+
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/AddBlackGroupReq.java
similarity index 73%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/AddBlackGroupReq.java
index 0201643..71280e9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/AddBlackGroupReq.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.group.request;
-import java.util.List;
import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class AddBlackGroupReq extends BaseReq {
+ private String groupName;
+ private String topicName;
+ private String confModAuthToken;
+ private String createUser;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteBlackGroupReq.java
similarity index 75%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteBlackGroupReq.java
index 0201643..bcb9079 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteBlackGroupReq.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.group.request;
-import java.util.List;
import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class DeleteBlackGroupReq extends BaseReq {
+ private String topicName;
+ private String confModAuthToken;
+ private String groupName;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteOffsetReq.java
similarity index 73%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteOffsetReq.java
index 0201643..c56041c 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteOffsetReq.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.group.request;
-import java.util.List;
import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class DeleteOffsetReq extends BaseReq {
+ private String groupName;
+ private String modifyUser;
+ private String topicName;
+ private Boolean onlyMemory;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 2c81825..4689762 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -17,48 +17,46 @@
package org.apache.tubemq.manager.controller.node;
-import com.google.common.collect.Lists;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
-import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.BrokerSetReadOrWriteReq;
import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
-import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
-import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.controller.node.request.DeleteBrokerReq;
+import org.apache.tubemq.manager.controller.node.request.OnlineOfflineBrokerReq;
+import org.apache.tubemq.manager.controller.node.request.ReloadBrokerReq;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.tube.*;
-import org.apache.tubemq.manager.utils.MasterUtils;
+import org.apache.tubemq.manager.service.MasterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
-import static org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.*;
-import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
-import static org.apache.tubemq.manager.utils.MasterUtils.*;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADMIN_QUERY_CLUSTER_INFO;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_METHOD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OFFLINE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ONLINE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_QUERY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SET_READ_OR_WRITE;
+import static org.apache.tubemq.manager.service.MasterService.*;
@RestController
@RequestMapping(path = "/v1/node")
@Slf4j
public class NodeController {
- public static final String NO_SUCH_METHOD = "no such method";
- public static final String OP_QUERY = "op_query";
- public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info";
+
private final Gson gson = new Gson();
- private static final CloseableHttpClient httpclient = HttpClients.createDefault();
@Autowired
NodeService nodeService;
@@ -67,7 +65,7 @@ public class NodeController {
NodeRepository nodeRepository;
@Autowired
- MasterUtils masterUtil;
+ MasterService masterService;
/**
* query brokers in certain cluster
@@ -91,11 +89,11 @@ public class NodeController {
* query brokers' run status
* this method supports batch operation
*/
- @RequestMapping(value = "/query/brokerStatus", method = RequestMethod.GET,
+ @RequestMapping(value = "/broker/status", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody String queryBrokerDetail(
@RequestParam Map<String, String> queryBody) throws Exception {
- String url = masterUtil.getQueryUrl(queryBody);
+ String url = masterService.getQueryUrl(queryBody);
return queryMaster(url);
}
@@ -104,115 +102,41 @@ public class NodeController {
* query brokers' configuration
* this method supports batch operation
*/
- @RequestMapping(value = "/query/brokerConfig", method = RequestMethod.GET,
+ @RequestMapping(value = "/broker/config", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody String queryBrokerConfig(
@RequestParam Map<String, String> queryBody) throws Exception {
- String url = masterUtil.getQueryUrl(queryBody);
+ String url = masterService.getQueryUrl(queryBody);
return queryMaster(url);
}
- /**
- * clone source broker to generate brokers with the same config and copy the topics in it.
- * @param req
- * @return
- * @throws Exception
- */
- @RequestMapping(value = "/clone", method = RequestMethod.POST)
- public @ResponseBody String cloneBrokers(
- @RequestBody CloneBrokersReq req) throws Exception {
- int clusterId = req.getClusterId();
- TubeMQResult tubeResult = nodeService.cloneBrokersWithTopic(req, clusterId);
- return gson.toJson(tubeResult);
- }
-
-
/**
- * add brokers to cluster, need to check token and
- * make sure user has authorization to modify it.
+ * broker method proxy
+ * divides the operation on broker to different method
*/
- @RequestMapping(value = "/add", method = RequestMethod.POST)
- public @ResponseBody String addBrokers(
- @RequestBody AddBrokersReq req) throws Exception {
- String token = req.getConfModAuthToken();
- int clusterId = req.getClusterId();
-
- if (StringUtils.isNotBlank(token)) {
- NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
- clusterId);
- TubeMQResult result = addBrokersToCluster(req, masterEntry);
- return gson.toJson(result);
- } else {
- TubeMQResult result = new TubeMQResult();
- result.setErrCode(-1);
- result.setResult(false);
- result.setErrMsg("token is not correct");
- return gson.toJson(result);
+ @RequestMapping(value = "/broker")
+ public @ResponseBody
+ TubeMQResult brokerMethodProxy(
+ @RequestParam String method, @RequestBody String req) throws Exception {
+ switch (method) {
+ case CLONE:
+ return nodeService.cloneBrokersWithTopic(gson.fromJson(req, CloneBrokersReq.class));
+ case ADD:
+ return masterService.baseRequestMaster(gson.fromJson(req, AddBrokersReq.class));
+ case ONLINE:
+ case OFFLINE:
+ return masterService.baseRequestMaster(gson.fromJson(req, OnlineOfflineBrokerReq.class));
+ case RELOAD:
+ return masterService.baseRequestMaster(gson.fromJson(req, ReloadBrokerReq.class));
+ case DELETE:
+ return masterService.baseRequestMaster(gson.fromJson(req, DeleteBrokerReq.class));
+ case SET_READ_OR_WRITE:
+ return masterService.baseRequestMaster(gson.fromJson(req, BrokerSetReadOrWriteReq.class));
+ default:
+ return TubeMQResult.getErrorResult("no such method");
}
-
- }
-
-
- /**
- * online brokers in cluster
- * this method supports batch operation
- */
- @RequestMapping(value = "/online", method = RequestMethod.GET)
- public @ResponseBody String onlineBrokers(
- @RequestParam Map<String, String> queryBody) throws Exception {
- return gson.toJson(masterUtil.redirectToMaster(queryBody));
- }
-
- /**
- * reload brokers in cluster
- * this method supports batch operation
- */
- @RequestMapping(value = "/reload", method = RequestMethod.GET)
- public @ResponseBody String reloadBrokers(
- @RequestParam Map<String, String> queryBody) throws Exception {
- return gson.toJson(masterUtil.redirectToMaster(queryBody));
- }
-
- /**
- * delete brokers in cluster
- * this method supports batch operation
- */
- @RequestMapping(value = "/delete", method = RequestMethod.GET)
- public @ResponseBody String deleteBrokers(
- @RequestParam Map<String, String> queryBody) throws Exception {
- TubeMQResult result = masterUtil.redirectToMaster(queryBody);
- return gson.toJson(result);
- }
-
- /**
- * change brokers' read mode in cluster
- * this method supports batch operation
- */
- @RequestMapping(value = "/setRead", method = RequestMethod.GET)
- public @ResponseBody String setBrokersRead(
- @RequestParam Map<String, String> queryBody) throws Exception {
- TubeMQResult result = masterUtil.redirectToMaster(queryBody);
- return gson.toJson(result);
- }
-
- /**
- * change brokers' write mode in cluster
- * this method supports batch operation
- */
- @RequestMapping(value = "/setWrite", method = RequestMethod.GET)
- public @ResponseBody String setBrokersWrite(
- @RequestParam Map<String, String> queryBody) throws Exception {
- TubeMQResult result = masterUtil.redirectToMaster(queryBody);
- return gson.toJson(result);
- }
-
- private TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
- String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- TubeMQResult tubeMQResult = requestMaster(url);
- return tubeMQResult;
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
index 5bfb486..aea0a2e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
@@ -17,8 +17,6 @@
package org.apache.tubemq.manager.controller.node.request;
-
-import lombok.Builder;
import lombok.Data;
import org.apache.tubemq.manager.service.tube.BrokerConf;
@@ -27,24 +25,12 @@ import java.util.List;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.*;
@Data
-public class AddBrokersReq {
+public class AddBrokersReq extends BaseReq{
private String confModAuthToken;
private String createUser;
- private int clusterId;
-
- /**
- * admin_bath_add_broker_configure
- */
- private String method;
-
- /**
- * op_modify
- */
- private String type;
-
private List<BrokerConf> brokerJsonSet;
public static AddBrokersReq getAddBrokerReq(String token, int clusterId) {
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
index 1753be6..39f0504 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
@@ -22,17 +22,15 @@ import lombok.Data;
@Data
public class AddTopicReq extends BaseReq {
- public String createUser;
- public String deleteWhen;
- public Integer unflushThreshold;
- public Boolean acceptPublish;
- public Integer numPartitions;
- public Integer unflushInterval;
- public Boolean acceptSubscribe;
- public String method;
- public String type;
- public String brokerId;
- public String confModAuthToken;
- public String topicName;
- public String deletePolicy;
+ private String createUser;
+ private String deleteWhen;
+ private Integer unflushThreshold;
+ private Boolean acceptPublish;
+ private Integer numPartitions;
+ private Integer unflushInterval;
+ private Boolean acceptSubscribe;
+ private String brokerId;
+ private String confModAuthToken;
+ private String topicName;
+ private String deletePolicy;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
index 0201643..08ec6ae 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
@@ -21,8 +21,7 @@ import java.util.List;
import lombok.Data;
@Data
-public class BatchAddTopicReq {
+public class BatchAddTopicReq extends BaseReq{
List<AddTopicReq> addTopicReqs;
List<Integer> brokerIds;
- Integer clusterId;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerSetReadOrWriteReq.java
similarity index 79%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerSetReadOrWriteReq.java
index 0201643..3b8fbb2 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerSetReadOrWriteReq.java
@@ -17,12 +17,13 @@
package org.apache.tubemq.manager.controller.node.request;
-import java.util.List;
import lombok.Data;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class BrokerSetReadOrWriteReq extends BaseReq{
+ private Boolean isAcceptPublish;
+ private Boolean isAcceptSubscribe;
+ private String modifyUser;
+ private String brokerId;
+ private String confModAuthToken;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/DeleteBrokerReq.java
similarity index 83%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/DeleteBrokerReq.java
index 0201643..df59bf6 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/DeleteBrokerReq.java
@@ -17,12 +17,12 @@
package org.apache.tubemq.manager.controller.node.request;
-import java.util.List;
import lombok.Data;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class DeleteBrokerReq extends BaseReq{
+ private Boolean isReservedData;
+ private String modifyUser;
+ private String brokerId;
+ private String confModAuthToken;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/OnlineOfflineBrokerReq.java
similarity index 85%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/OnlineOfflineBrokerReq.java
index 0201643..1d14533 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/OnlineOfflineBrokerReq.java
@@ -17,12 +17,12 @@
package org.apache.tubemq.manager.controller.node.request;
-import java.util.List;
+
import lombok.Data;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class OnlineOfflineBrokerReq extends BaseReq{
+ private Integer brokerId;
+ private String modifyUser;
+ private String confModAuthToken;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/ReloadBrokerReq.java
similarity index 86%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/ReloadBrokerReq.java
index 0201643..65f65b7 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/ReloadBrokerReq.java
@@ -17,12 +17,11 @@
package org.apache.tubemq.manager.controller.node.request;
-import java.util.List;
import lombok.Data;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class ReloadBrokerReq extends BaseReq{
+ private Integer brokerId;
+ private String modifyUser;
+ private String confModAuthToken;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index 9511c96..b4c4fc0 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -18,27 +18,31 @@
package org.apache.tubemq.manager.controller.topic;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.utils.MasterUtils.TUBE_REQUEST_PATH;
-import static org.apache.tubemq.manager.utils.MasterUtils.queryMaster;
-import static org.apache.tubemq.manager.utils.MasterUtils.requestMaster;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.AUTH_CONTROL;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.MODIFY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.REMOVE;
+import static org.apache.tubemq.manager.service.MasterService.queryMaster;
+import static org.apache.tubemq.manager.service.MasterService.requestMaster;
+
import com.google.gson.Gson;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
-import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
-import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
-import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
+import org.apache.tubemq.manager.controller.topic.request.DeleteTopicReq;
+import org.apache.tubemq.manager.controller.topic.request.ModifyTopicReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
+import org.apache.tubemq.manager.controller.topic.request.SetAuthControlReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.repository.TopicRepository;
import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.TopicBackendWorker;
-import org.apache.tubemq.manager.utils.ConvertUtils;
-import org.apache.tubemq.manager.utils.MasterUtils;
+import org.apache.tubemq.manager.service.MasterService;
+import org.apache.tubemq.manager.service.TopicService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
@@ -54,98 +58,35 @@ import org.springframework.web.bind.annotation.RestController;
public class TopicWebController {
@Autowired
- private TopicRepository topicRepository;
-
- @Autowired
- private TopicBackendWorker topicBackendWorker;
-
- @Autowired
private NodeService nodeService;
- @Autowired
- private NodeRepository nodeRepository;
-
public Gson gson = new Gson();
@Autowired
- private MasterUtils masterUtils;
-
- /**
- * add topic to brokers
- * @param req
- * @return
- */
- @PostMapping("/add")
- public TubeMQResult addTopic(@RequestBody BatchAddTopicReq req) {
- if (req.getClusterId() == null) {
- return TubeMQResult.getErrorResult("please input clusterId");
- }
- NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
- req.getClusterId());
- if (masterEntry == null) {
- return TubeMQResult.getErrorResult("no such cluster");
- }
- return nodeService.addTopicsToBrokers(masterEntry, req.getBrokerIds(), req.getAddTopicReqs());
- }
-
- /**
- * given one topic, copy its config and clone to brokers
- * if no broker is is provided, topics will be cloned to all brokers in cluster
- * @param req
- * @return
- * @throws Exception
- */
- @PostMapping("/clone")
- public TubeMQResult cloneTopic(@RequestBody CloneTopicReq req) throws Exception {
- if (req.getClusterId() == null) {
- return TubeMQResult.getErrorResult("please input clusterId");
- }
- NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
- req.getClusterId());
- if (masterEntry == null) {
- return TubeMQResult.getErrorResult("no such cluster");
+ private MasterService masterService;
+
+ /**
+ * broker method proxy
+ * divides the operation on broker to different method
+ */
+ @RequestMapping(value = "")
+ public @ResponseBody TubeMQResult topicMethodProxy(
+ @RequestParam String method, @RequestBody String req) throws Exception {
+ switch (method) {
+ case ADD:
+ return nodeService.addTopic(gson.fromJson(req, BatchAddTopicReq.class));
+ case CLONE:
+ return nodeService.cloneTopicToBrokers(gson.fromJson(req, CloneTopicReq.class));
+ case AUTH_CONTROL:
+ return masterService.baseRequestMaster(gson.fromJson(req, SetAuthControlReq.class));
+ case MODIFY:
+ return masterService.baseRequestMaster(gson.fromJson(req, ModifyTopicReq.class));
+ case DELETE:
+ case REMOVE:
+ return masterService.baseRequestMaster(gson.fromJson(req, DeleteTopicReq.class));
+ default:
+ return TubeMQResult.getErrorResult("no such method");
}
- return nodeService.cloneTopicToBrokers(req, masterEntry);
- }
-
- /**
- * batch modify topic config
- * @param req
- * @return
- * @throws Exception
- */
- @PostMapping("/modify")
- public @ResponseBody String modifyTopics(
- @RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
- return queryMaster(url);
- }
-
- /**
- * batch delete topic info
- * @param req
- * @return
- * @throws Exception
- */
- @PostMapping("/delete")
- public @ResponseBody String deleteTopics(
- @RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
- return queryMaster(url);
- }
-
-
- /**
- * batch remove topics
- * @param req
- * @return
- * @throws Exception
- */
- @PostMapping("/remove")
- public @ResponseBody String removeTopics(
- @RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
- return queryMaster(url);
}
/**
@@ -154,10 +95,10 @@ public class TopicWebController {
* @return
* @throws Exception
*/
- @PostMapping("/query/consumer-auth")
+ @GetMapping("/consumerAuth")
public @ResponseBody String queryConsumerAuth(
@RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
+ String url = masterService.getQueryUrl(req);
return queryMaster(url);
}
@@ -167,150 +108,11 @@ public class TopicWebController {
* @return
* @throws Exception
*/
- @PostMapping("/query/topic-config")
+ @GetMapping("/topicConfig")
public @ResponseBody String queryTopicConfig(
@RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
- return queryMaster(url);
- }
-
- /**
- * add group to black list for certain topic
- * @param req
- * @return
- * @throws Exception
- */
- @GetMapping("/add/blackGroup")
- public @ResponseBody TubeMQResult addBlackGroup(
- @RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
- return requestMaster(url);
- }
-
- /**
- * delete group to black list for certain topic
- * @param req
- * @return
- * @throws Exception
- */
- @GetMapping("/delete/blackGroup")
- public @ResponseBody TubeMQResult deleteBlackGroup(
- @RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
- return requestMaster(url);
- }
-
- /**
- * query the black list for certain topic
- * @param req
- * @return
- * @throws Exception
- */
- @GetMapping("/query/blackGroup")
- public @ResponseBody String queryBlackGroup(
- @RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
- return queryMaster(url);
- }
-
- /**
- * batch add consumer group for certain topic
- * @param req
- * @return
- * @throws Exception
- */
- @PostMapping("/add/group")
- public @ResponseBody TubeMQResult addConsumer(
- @RequestBody BatchAddGroupAuthReq req) throws Exception {
- NodeEntry nodeEntry =
- nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(req.getClusterId());
- if (nodeEntry == null) {
- return TubeMQResult.getErrorResult("no such cluster");
- }
- String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + ConvertUtils.convertReqToQueryStr(req);
- return requestMaster(url);
- }
-
-
- /**
- * delete consumer group for certain topic
- * @param req
- * @return
- * @throws Exception
- */
- @PostMapping("/delete/group")
- public @ResponseBody TubeMQResult deleteConsumer(
- @RequestBody DeleteGroupReq req) throws Exception {
- NodeEntry nodeEntry =
- nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(req.getClusterId());
- if (nodeEntry == null) {
- return TubeMQResult.getErrorResult("no such cluster");
- }
- String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + ConvertUtils.convertReqToQueryStr(req);
- return requestMaster(url);
- }
-
- /**
- * enable auth control for topics
- * @param req
- * @return
- * @throws Exception
- */
- @GetMapping("/enable/auth-control")
- public @ResponseBody TubeMQResult enableAuthControl(
- @RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
- return requestMaster(url);
- }
-
- /**
- * disable auth control for topics
- * @param req
- * @return
- * @throws Exception
- */
- @GetMapping("/disable/auth-control")
- public @ResponseBody TubeMQResult disableAuthControl(
- @RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
- return requestMaster(url);
- }
-
- /**
- * query the consumer group for certain topic
- * @param req
- * @return
- * @throws Exception
- */
- @GetMapping("/query/group")
- public @ResponseBody String queryConsumer(
- @RequestParam Map<String, String> req) throws Exception {
- String url = masterUtils.getQueryUrl(req);
+ String url = masterService.getQueryUrl(req);
return queryMaster(url);
}
-
- /**
- * clone offset from one group to another
- * @param req
- * @return
- * @throws Exception
- */
- @PostMapping("/clone/offset")
- public @ResponseBody TubeMQResult cloneOffset(
- @RequestBody CloneOffsetReq req) throws Exception {
- if (req.getClusterId() == null) {
- return TubeMQResult.getErrorResult("please input clusterId");
- }
- NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
- req.getClusterId());
- if (masterEntry == null) {
- return TubeMQResult.getErrorResult("no such cluster");
- }
- return nodeService.cloneOffsetToOtherGroups(req, masterEntry);
- }
-
-
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/DeleteTopicReq.java
similarity index 73%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/DeleteTopicReq.java
index 0201643..2482773 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/DeleteTopicReq.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.topic.request;
-import java.util.List;
import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class DeleteTopicReq extends BaseReq {
+ private String topicName;
+ private String brokerId;
+ private String modifyUser;
+ private String confModAuthToken;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/ModifyTopicReq.java
similarity index 54%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/ModifyTopicReq.java
index 0201643..4d19696 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/ModifyTopicReq.java
@@ -15,14 +15,26 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.topic.request;
-import java.util.List;
import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class ModifyTopicReq extends BaseReq {
+ private String modifyUser;
+ private String deleteWhen;
+ private Integer unflushThreshold;
+ private Boolean acceptPublish;
+ private Integer numPartitions;
+ private Integer unflushInterval;
+ private Boolean acceptSubscribe;
+ private String brokerId;
+ private String confModAuthToken;
+ private String topicName;
+ private String deletePolicy;
+ private Integer unflushDataHold;
+ private Integer numTopicStores;
+ private Integer memCacheMsgCntInK;
+ private Integer memCacheMsgSizeInMB;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceConsumerReq.java
similarity index 71%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceConsumerReq.java
index 0201643..040d5ea 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceConsumerReq.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.topic.request;
-import java.util.List;
import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class RebalanceConsumerReq extends BaseReq {
+ public String groupName;
+ public String confModAuthToken;
+ public Integer reJoinWait;
+ public String modifyUser;
+ public String consumerId;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceGroupReq.java
similarity index 73%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceGroupReq.java
index 0201643..92271ca 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceGroupReq.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.topic.request;
-import java.util.List;
import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class RebalanceGroupReq extends BaseReq {
+ public String groupName;
+ public String confModAuthToken;
+ public Integer reJoinWait;
+ public String modifyUser;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/SetAuthControlReq.java
similarity index 73%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/SetAuthControlReq.java
index 0201643..1dcb756 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/SetAuthControlReq.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.topic.request;
-import java.util.List;
import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class SetAuthControlReq extends BaseReq {
+ private String confModAuthToken;
+ private String topicName;
+ private String createUser;
+ private Boolean isEnable;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java
similarity index 79%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java
index 5dea029..8f726e9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java
@@ -16,7 +16,7 @@
*/
-package org.apache.tubemq.manager.utils;
+package org.apache.tubemq.manager.service;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
@@ -27,6 +27,8 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
@@ -43,13 +45,14 @@ import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
+import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
@Slf4j
@Component
-public class MasterUtils {
+public class MasterService {
- public static final int SUCCESS_CODE = 0;
private static CloseableHttpClient httpclient = HttpClients.createDefault();
private static Gson gson = new Gson();
public static final String TUBE_REQUEST_PATH = "webapi.htm";
@@ -69,7 +72,7 @@ public class MasterUtils {
- public static TubeMQResult requestMaster(String url) throws Exception {
+ public static TubeMQResult requestMaster(String url) {
log.info("start to request {}", url);
HttpGet httpGet = new HttpGet(url);
@@ -113,21 +116,30 @@ public class MasterUtils {
}
+ public TubeMQResult baseRequestMaster(BaseReq req) {
+ if (req.getClusterId() == null) {
+ return TubeMQResult.getErrorResult("please input clusterId");
+ }
+ NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+ req.getClusterId());
+ if (masterEntry == null) {
+ return TubeMQResult.getErrorResult("no such cluster");
+ }
+ String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+ + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+ return requestMaster(url);
+ }
- public TubeMQResult redirectToMaster(Map<String, String> queryBody) throws Exception {
- int clusterId = Integer.parseInt(queryBody.get("clusterId"));
- queryBody.remove("clusterId");
- NodeEntry nodeEntry =
- nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
- if (nodeEntry == null) {
- return TubeMQResult.getErrorResult("ClusterId doesn't exist");
+ public NodeEntry getMasterNode(BaseReq req) {
+ if (req.getClusterId() == null) {
+ return null;
}
- String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
- return requestMaster(url);
+ return nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+ req.getClusterId());
}
+
public String getQueryUrl(Map<String, String> queryBody) throws Exception {
int clusterId = Integer.parseInt(queryBody.get("clusterId"));
queryBody.remove("clusterId");
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index 927025a..ddcfc75 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -19,13 +19,16 @@ package org.apache.tubemq.manager.service;
import static org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
+import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
+import static org.apache.tubemq.manager.service.MasterService.queryMaster;
+import static org.apache.tubemq.manager.service.MasterService.requestMaster;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD_TUBE_TOPIC;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_CLUSTER;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD_BROKER;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.TOPIC_CONFIG_INFO;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
-import static org.apache.tubemq.manager.utils.MasterUtils.*;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
@@ -44,15 +47,13 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
-import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.tube.*;
-import org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList.BrokerInfo;
-import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -75,12 +76,15 @@ public class NodeService {
private final TopicBackendWorker worker;
- @Value("${manager.broker.webPort:8081}")
- private int brokerWebPort;
-
@Autowired
private NodeRepository nodeRepository;
+ @Autowired
+ private TopicService topicService;
+
+ @Autowired
+ private MasterService masterService;
+
public NodeService(TopicBackendWorker worker) {
this.worker = worker;
}
@@ -92,7 +96,7 @@ public class NodeService {
* @return
* @throws IOException
*/
- private TubeHttpBrokerInfoList requestClusterNodeStatus(NodeEntry nodeEntry) throws IOException {
+ private TubeHttpBrokerInfoList requestClusterNodeStatus(NodeEntry nodeEntry) {
String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort() + BROKER_RUN_STATUS;
HttpGet httpget = new HttpGet(url);
try (CloseableHttpResponse response = httpclient.execute(httpget)) {
@@ -112,26 +116,17 @@ public class NodeService {
}
- private TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic) {
- String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
- + TOPIC_CONFIG_INFO + "&topicName=" + topic;
- HttpGet httpget = new HttpGet(url);
- try (CloseableHttpResponse response = httpclient.execute(httpget)) {
- TubeHttpTopicInfoList topicInfoList =
- gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
- TubeHttpTopicInfoList.class);
- if (topicInfoList.getErrCode() == SUCCESS_CODE) {
- return topicInfoList;
- }
- } catch (Exception ex) {
- log.error("exception caught while requesting broker status", ex);
- }
- return null;
- }
- public TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req, int clusterId) throws Exception {
+ /**
+ * clone source broker to generate brokers with the same config and copy the topics in it.
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ public TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req) throws Exception {
+ int clusterId = req.getClusterId();
// 1. query source broker config
QueryBrokerCfgReq queryReq = QueryBrokerCfgReq.getReq(req.getSourceBrokerId());
NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
@@ -222,14 +217,6 @@ public class NodeService {
}
- public TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
- String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- TubeMQResult tubeMQResult = requestMaster(url);
- return tubeMQResult;
- }
-
-
private boolean configBrokersForTopics(NodeEntry nodeEntry,
Set<String> topics, List<Integer> brokerList, int maxBrokers) {
List<Integer> finalBrokerList = brokerList.subList(0, maxBrokers);
@@ -293,7 +280,7 @@ public class NodeService {
// 1. check tubemq cluster by topic name, remove pending topic if has added.
Set<String> brandNewTopics = new HashSet<>();
for (String topic : pendingTopic.keySet()) {
- TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(nodeEntry, topic);
+ TubeHttpTopicInfoList topicInfoList = topicService.requestTopicConfigInfo(nodeEntry, topic);
if (topicInfoList != null) {
// get broker list by topic request
List<Integer> topicBrokerList = topicInfoList.getTopicBrokerIdList();
@@ -421,10 +408,22 @@ public class NodeService {
return null;
}
- public TubeMQResult cloneTopicToBrokers(CloneTopicReq req, NodeEntry master) throws Exception {
+ /**
+ * given one topic, copy its config and clone to brokers
+ * if no broker is is provided, topics will be cloned to all brokers in cluster
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ public TubeMQResult cloneTopicToBrokers(CloneTopicReq req) throws Exception {
+
+ NodeEntry master = masterService.getMasterNode(req);
+ if (master == null) {
+ return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
+ }
// 1 query topic config
- TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getSourceTopicName());
+ TubeHttpTopicInfoList topicInfoList = topicService.requestTopicConfigInfo(master, req.getSourceTopicName());
if (topicInfoList == null) {
return TubeMQResult.getErrorResult("no such topic");
@@ -449,33 +448,16 @@ public class NodeService {
}
- public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req, NodeEntry master)
- throws Exception {
-
- // 1. query the corresponding brokers having given topic
- TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
- TubeMQResult result = new TubeMQResult();
-
- if (topicInfoList == null) {
- return result;
- }
-
- List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
- // 2. for each broker, request to clone offset
- for (TopicInfo topicInfo : topicInfos) {
- String brokerIp = topicInfo.getBrokerIp();
- String url = SCHEMA + brokerIp + ":" + brokerWebPort
- + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- result = requestMaster(url);
- if (result.getErrCode() != SUCCESS_CODE) {
- return result;
- }
+ /**
+ * add topic to brokers
+ * @param req
+ * @return
+ */
+ public TubeMQResult addTopic(BatchAddTopicReq req) {
+ NodeEntry masterEntry = masterService.getMasterNode(req);
+ if (masterEntry == null) {
+ return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
}
-
-
- return result;
+ return addTopicsToBrokers(masterEntry, req.getBrokerIds(), req.getAddTopicReqs());
}
-
-
-
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
new file mode 100644
index 0000000..1abff54
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service;
+
+
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY_GROUP_DETAIL_INFO;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.TOPIC_CONFIG_INFO;
+import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+import static org.apache.tubemq.manager.utils.ConvertUtils.convertToRebalanceConsumerReq;
+import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
+import static org.apache.tubemq.manager.service.MasterService.requestMaster;
+
+import com.google.gson.Gson;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
+import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
+import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
+import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.tube.CleanOffsetResult;
+import org.apache.tubemq.manager.service.tube.RebalanceGroupResult;
+import org.apache.tubemq.manager.service.tube.TubeHttpGroupDetailInfo;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
+import org.apache.tubemq.manager.utils.ConvertUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * node service to query broker/master/standby status of tube cluster.
+ */
+@Slf4j
+@Component
+public class TopicService {
+
+ private final CloseableHttpClient httpclient = HttpClients.createDefault();
+ private final Gson gson = new Gson();
+
+ @Value("${manager.broker.webPort:8081}")
+ private int brokerWebPort;
+
+ @Autowired
+ private MasterService masterService;
+
+ private TubeHttpGroupDetailInfo requestGroupRunInfo(NodeEntry nodeEntry, String group) {
+ String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ + QUERY_GROUP_DETAIL_INFO + "&consumeGroup=" + group;
+ HttpGet httpget = new HttpGet(url);
+ try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+ TubeHttpGroupDetailInfo groupDetailInfo =
+ gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+ TubeHttpGroupDetailInfo.class);
+ if (groupDetailInfo.getErrCode() == 0) {
+ return groupDetailInfo;
+ }
+ } catch (Exception ex) {
+ log.error("exception caught while requesting group status", ex);
+ }
+ return null;
+ }
+
+
+ public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req) {
+
+ NodeEntry master = masterService.getMasterNode(req);
+ if (master == null) {
+ return TubeMQResult.getErrorResult("no such cluster");
+ }
+ // 1. query the corresponding brokers having given topic
+ TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
+ TubeMQResult result = new TubeMQResult();
+
+ if (topicInfoList == null) {
+ return result;
+ }
+
+ List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+ // 2. for each broker, request to clone offset
+ for (TopicInfo topicInfo : topicInfos) {
+ String brokerIp = topicInfo.getBrokerIp();
+ String url = SCHEMA + brokerIp + ":" + brokerWebPort
+ + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+ result = requestMaster(url);
+ if (result.getErrCode() != SUCCESS_CODE) {
+ return result;
+ }
+ }
+
+ return result;
+ }
+
+
+ public TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic) {
+ String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ + TOPIC_CONFIG_INFO + "&topicName=" + topic;
+ HttpGet httpget = new HttpGet(url);
+ try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+ TubeHttpTopicInfoList topicInfoList =
+ gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+ TubeHttpTopicInfoList.class);
+ if (topicInfoList.getErrCode() == SUCCESS_CODE) {
+ return topicInfoList;
+ }
+ } catch (Exception ex) {
+ log.error("exception caught while requesting broker status", ex);
+ }
+ return null;
+ }
+
+
+ public TubeMQResult rebalanceGroup(RebalanceGroupReq req) {
+
+ NodeEntry master = masterService.getMasterNode(req);
+ if (master == null) {
+ return TubeMQResult.getErrorResult("no such cluster");
+ }
+
+ // 1. get all consumer ids in group
+ List<String> consumerIds = Objects
+ .requireNonNull(requestGroupRunInfo(master, req.getGroupName())).getConsumerIds();
+ RebalanceGroupResult rebalanceGroupResult = new RebalanceGroupResult();
+
+ // 2. rebalance consumers in group
+ consumerIds.forEach(consumerId -> {
+ RebalanceConsumerReq rebalanceConsumerReq = convertToRebalanceConsumerReq(req,
+ consumerId);
+ String url = SCHEMA + master.getIp() + ":" + master.getWebPort()
+ + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(rebalanceConsumerReq);
+ TubeMQResult result = requestMaster(url);
+ if (result.getErrCode() != 0) {
+ rebalanceGroupResult.getFailConsumers().add(consumerId);
+ }
+ rebalanceGroupResult.getSuccessConsumers().add(consumerId);
+ });
+
+ TubeMQResult tubeResult = new TubeMQResult();
+ tubeResult.setData(gson.toJson(rebalanceGroupResult));
+
+ return tubeResult;
+ }
+
+
+ public TubeMQResult deleteOffset(DeleteOffsetReq req) {
+
+ NodeEntry master = masterService.getMasterNode(req);
+ if (master == null) {
+ return TubeMQResult.getErrorResult("no such cluster");
+ }
+
+ // 1. query the corresponding brokers having given topic
+ TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
+ TubeMQResult result = new TubeMQResult();
+ CleanOffsetResult cleanOffsetResult = new CleanOffsetResult();
+ if (topicInfoList == null) {
+ return TubeMQResult.getErrorResult("no such topic");
+ }
+
+ List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+ // 2. for each broker, request to delete offset
+ for (TopicInfo topicInfo : topicInfos) {
+ String brokerIp = topicInfo.getBrokerIp();
+ String url = SCHEMA + brokerIp + ":" + brokerWebPort
+ + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+ result = requestMaster(url);
+ if (result.getErrCode() != SUCCESS_CODE) {
+ cleanOffsetResult.getFailBrokers().add(brokerIp);
+ } else {
+ cleanOffsetResult.getSuccessBrokers().add(brokerIp);
+ }
+ }
+
+ result.setData(gson.toJson(cleanOffsetResult));
+
+ return result;
+ }
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
index 0fa83d6..04cf17b 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
@@ -23,6 +23,8 @@ public class TubeMQHttpConst {
"/webapi.htm?type=op_query&method=admin_query_broker_run_status";
public static final String TOPIC_CONFIG_INFO =
"/webapi.htm?type=op_query&method=admin_query_topic_info";
+ public static final String QUERY_GROUP_DETAIL_INFO =
+ "/webapi.htm?type=op_query&method=admin_query_consume_group_detail";
public static final String ADD_TUBE_TOPIC =
"/webapi.htm?type=op_modify&method=admin_add_new_topic_record";
public static final String RELOAD_BROKER =
@@ -33,4 +35,22 @@ public class TubeMQHttpConst {
public static final String BATCH_ADD_BROKER = "admin_bath_add_broker_configure";
public static final String WEB_API = "webapi";
public static final String BATCH_ADD_TOPIC = "admin_add_new_topic_record";
+ public static final String REBALANCE_GROUP = "admin_rebalance_group_allocate";
+ public static final String AUTH_CONTROL = "authControl";
+ public static final String MODIFY = "modify";
+ public static final String DELETE = "delete";
+ public static final String REMOVE = "remove";
+ public static final String NO_SUCH_METHOD = "no such method";
+ public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info";
+ public static final String CLONE = "clone";
+ public static final String ADD = "add";
+ public static final String ONLINE = "online";
+ public static final String RELOAD = "reload";
+ public static final String SET_READ_OR_WRITE = "setReadOrWrite";
+ public static final String OFFLINE = "offline";
+ public static final String REBALANCE_CONSUMER_GROUP = "rebalanceGroup";
+ public static final String REBALANCE_CONSUMER = "rebalanceConsumer";
+ public static final String NO_SUCH_CLUSTER = "no such cluster";
+ public static final Integer SUCCESS_CODE = 0;
+
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/CleanOffsetResult.java
similarity index 77%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/CleanOffsetResult.java
index 0201643..9fe4e04 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/CleanOffsetResult.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.service.tube;
+
+import com.google.common.collect.Lists;
import java.util.List;
import lombok.Data;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class CleanOffsetResult {
+ private List<String> failBrokers = Lists.newArrayList();
+
+ private List<String> successBrokers = Lists.newArrayList();
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/RebalanceGroupResult.java
similarity index 77%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/RebalanceGroupResult.java
index 0201643..5db9ee3 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/RebalanceGroupResult.java
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.service.tube;
+
+import com.google.common.collect.Lists;
import java.util.List;
import lombok.Data;
@Data
-public class BatchAddTopicReq {
- List<AddTopicReq> addTopicReqs;
- List<Integer> brokerIds;
- Integer clusterId;
+public class RebalanceGroupResult {
+ public List<String> failConsumers = Lists.newArrayList();
+
+ public List<String> successConsumers = Lists.newArrayList();
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java
new file mode 100644
index 0000000..6ff4568
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.tube;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import lombok.Data;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.service.tube.TubeHttpClusterInfoList.ClusterData;
+
+@Data
+public class TubeHttpGroupDetailInfo {
+
+ private String errMsg;
+
+ private int errCode;
+
+ private int count;
+
+ private List<String> topicSet;
+
+ private String consumeGroup;
+
+ private List<ConsumerInfo> data;
+
+ @Data
+ public static class ConsumerInfo {
+
+ private String consumerId;
+
+ private Integer parCount;
+
+ private List<PartitionInfo> parInfo;
+
+ @Data
+ public static class PartitionInfo {
+ private String brokerAddr;
+ private String topic;
+ private Integer partId;
+ }
+
+ }
+
+ public List<String> getConsumerIds() {
+ List<String> consumerIds = Lists.newArrayList();
+ data.forEach(consumerInfo -> {
+ consumerIds.add(consumerInfo.getConsumerId());
+ });
+ return consumerIds;
+ }
+
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index f59bb73..58fdd15 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -18,20 +18,27 @@
package org.apache.tubemq.manager.utils;
import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Field;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_MODIFY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_QUERY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_GROUP;
+@Slf4j
public class ConvertUtils {
public static Gson gson = new Gson();
- public static String convertReqToQueryStr(Object req) throws Exception {
+ public static String convertReqToQueryStr(Object req) {
List<String> queryList = new ArrayList<>();
Class<?> clz = req.getClass();
List fieldsList = new ArrayList<Field[]>();
@@ -42,26 +49,44 @@ public class ConvertUtils {
clz = clz.getSuperclass();
}
- for (Object fields:fieldsList) {
- Field[] f = (Field[]) fields;
- for (Field field : f) {
- field.setAccessible(true);
- Object o = field.get(req);
- String value;
- // convert list to json string
- if (o == null) {
- continue;
+ try {
+ for (Object fields:fieldsList) {
+ Field[] f = (Field[]) fields;
+ for (Field field : f) {
+ field.setAccessible(true);
+ Object o = field.get(req);
+ String value;
+ // convert list to json string
+ if (o == null) {
+ continue;
+ }
+ if (o instanceof List) {
+ value = gson.toJson(o);
+ } else {
+ value = o.toString();
+ }
+ queryList.add(field.getName() + "=" + URLEncoder.encode(
+ value, UTF_8.toString()));
}
- if (o instanceof List) {
- value = gson.toJson(o);
- } else {
- value = o.toString();
- }
- queryList.add(field.getName() + "=" + URLEncoder.encode(
- value, UTF_8.toString()));
}
+ } catch (Exception e) {
+ log.error("exception occurred while parsing object {}", gson.toJson(req), e);
+ return StringUtils.EMPTY;
}
return StringUtils.join(queryList, "&");
}
+
+
+ public static RebalanceConsumerReq convertToRebalanceConsumerReq(RebalanceGroupReq req, String consumerId) {
+ RebalanceConsumerReq consumerReq = new RebalanceConsumerReq();
+ consumerReq.setConsumerId(consumerId);
+ consumerReq.setConfModAuthToken(req.getConfModAuthToken());
+ consumerReq.setGroupName(req.getGroupName());
+ consumerReq.setModifyUser(req.getModifyUser());
+ consumerReq.setReJoinWait(req.getReJoinWait());
+ consumerReq.setType(OP_MODIFY);
+ consumerReq.setMethod(REBALANCE_GROUP);
+ return consumerReq;
+ }
}
diff --git a/tubemq-manager/src/main/resources/application.properties b/tubemq-manager/src/main/resources/application.properties
index 9afbdab..dee51b7 100644
--- a/tubemq-manager/src/main/resources/application.properties
+++ b/tubemq-manager/src/main/resources/application.properties
@@ -14,4 +14,4 @@
# limitations under the License.
spring.jpa.hibernate.ddl-auto=update
-# configuration for manager
\ No newline at end of file
+# configuration for manager