You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/25 08:52:49 UTC
[incubator-tubemq] branch TUBEMQ-421 updated: [TUBEMQ-480] batch
set brokers to read or write mode and query brokers run info or config
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
new 746b8c1 [TUBEMQ-480] batch set brokers to read or write mode and query brokers run info or config
746b8c1 is described below
commit 746b8c182ce0570e8c694e506c8c4f3f5a5c60e9
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Thu Dec 24 15:43:13 2020 +0800
[TUBEMQ-480] batch set brokers to read or write mode and query brokers run info or config
---
.../controller/cluster/ClusterController.java | 7 ++-
.../manager/controller/node/NodeController.java | 71 +++++++++++++++++++---
.../apache/tubemq/manager/utils/MasterUtils.java | 36 +++++++++++
3 files changed, 103 insertions(+), 11 deletions(-)
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
index 4b58313..562813c 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -43,17 +43,22 @@ import org.springframework.web.bind.annotation.RestController;
public class ClusterController {
private final Gson gson = new Gson();
+
@Autowired
private NodeRepository nodeRepository;
@Autowired
public MasterUtils masterUtil;
+ /**
+ * query cluster info
+ */
@RequestMapping(value = "/query", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody String queryInfo(
@RequestParam Map<String, String> queryBody) throws Exception {
- return gson.toJson(masterUtil.redirectToMaster(queryBody));
+ String url = masterUtil.getQueryUrl(queryBody);
+ return queryMaster(url);
}
/**
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 8a6e586..ab5567d 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -24,7 +24,6 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
-import org.apache.tubemq.manager.controller.node.request.BrokerConf;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.NodeService;
@@ -33,8 +32,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
@@ -62,20 +59,44 @@ public class NodeController {
@Autowired
MasterUtils masterUtil;
- @RequestMapping(value = "/query", method = RequestMethod.GET,
+ @RequestMapping(value = "/query/clusterInfo", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody String queryInfo(@RequestParam String type, @RequestParam String method,
@RequestParam(required = false) Integer clusterId) {
-
if (method.equals(ADMIN_QUERY_CLUSTER_INFO) && type.equals(OP_QUERY)) {
return nodeService.queryClusterInfo(clusterId);
}
-
return gson.toJson(getErrorResult(NO_SUCH_METHOD));
}
/**
+ * query brokers' run status
+ * this method supports batch operation
+ */
+ @RequestMapping(value = "/query/brokerStatus", method = RequestMethod.GET,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public @ResponseBody String queryBrokerDetail(
+ @RequestParam Map<String, String> queryBody) throws Exception {
+ String url = masterUtil.getQueryUrl(queryBody);
+ return queryMaster(url);
+ }
+
+
+ /**
+ * query brokers' configuration
+ * this method supports batch operation
+ */
+ @RequestMapping(value = "/query/brokerConfig", method = RequestMethod.GET,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public @ResponseBody String queryBrokerConfig(
+ @RequestParam Map<String, String> queryBody) throws Exception {
+ String url = masterUtil.getQueryUrl(queryBody);
+ return queryMaster(url);
+ }
+
+
+ /**
* add brokers to cluster, need to check token and
* make sure user has authorization to modify it.
*/
@@ -102,8 +123,8 @@ public class NodeController {
/**
- * add brokers to cluster, need to check token and
- * make sure user has authorization to modify it.
+ * online brokers in cluster
+ * this method supports batch operation
*/
@RequestMapping(value = "/online", method = RequestMethod.GET)
public @ResponseBody String onlineBrokers(
@@ -111,14 +132,20 @@ public class NodeController {
return gson.toJson(masterUtil.redirectToMaster(queryBody));
}
-
+ /**
+ * reload brokers in cluster
+ * this method supports batch operation
+ */
@RequestMapping(value = "/reload", method = RequestMethod.GET)
public @ResponseBody String reloadBrokers(
@RequestParam Map<String, String> queryBody) throws Exception {
return gson.toJson(masterUtil.redirectToMaster(queryBody));
}
-
+ /**
+ * delete brokers in cluster
+ * this method supports batch operation
+ */
@RequestMapping(value = "/delete", method = RequestMethod.GET)
public @ResponseBody String deleteBrokers(
@RequestParam Map<String, String> queryBody) throws Exception {
@@ -126,6 +153,30 @@ public class NodeController {
return gson.toJson(result);
}
+ /**
+ * change brokers' read mode in cluster
+ * this method supports batch operation
+ */
+ @RequestMapping(value = "/setRead", method = RequestMethod.GET)
+ public @ResponseBody String setBrokersRead(
+ @RequestParam Map<String, String> queryBody) throws Exception {
+ TubeMQResult result = masterUtil.redirectToMaster(queryBody);
+ return gson.toJson(result);
+ }
+
+ /**
+ * change brokers' write mode in cluster
+ * this method supports batch operation
+ */
+ @RequestMapping(value = "/setWrite", method = RequestMethod.GET)
+ public @ResponseBody String setBrokersWrite(
+ @RequestParam Map<String, String> queryBody) throws Exception {
+ TubeMQResult result = masterUtil.redirectToMaster(queryBody);
+ return gson.toJson(result);
+ }
+
+
+
private TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
index 95def0c..811dbca 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
@@ -25,12 +25,14 @@ import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.RequestBody;
import java.io.InputStreamReader;
import java.net.URLEncoder;
@@ -88,6 +90,27 @@ public class MasterUtils {
return defaultResult;
}
+ /**
+ * query master to get node info
+ * @param url
+ * @return query info
+ */
+ public static String queryMaster(String url) {
+ log.info("start to request {}", url);
+ HttpGet httpGet = new HttpGet(url);
+ TubeMQResult defaultResult = new TubeMQResult();
+ try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
+ // return result json to response
+ return EntityUtils.toString(response.getEntity());
+ } catch (Exception ex) {
+ log.error("exception caught while requesting broker status", ex);
+ defaultResult.setErrCode(-1);
+ defaultResult.setResult(false);
+ defaultResult.setErrMsg(ex.getMessage());
+ }
+ return gson.toJson(defaultResult);
+ }
+
@@ -96,8 +119,21 @@ public class MasterUtils {
queryBody.remove("clusterId");
NodeEntry nodeEntry =
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
+ if (nodeEntry == null) {
+ return TubeMQResult.getErrorResult("ClusterId doesn't exist");
+ }
String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
return requestMaster(url);
}
+
+ public String getQueryUrl(Map<String, String> queryBody) throws Exception {
+ int clusterId = Integer.parseInt(queryBody.get("clusterId"));
+ queryBody.remove("clusterId");
+ NodeEntry nodeEntry =
+ nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
+ return SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
+ }
+
}