You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/25 08:52:49 UTC

[incubator-tubemq] branch TUBEMQ-421 updated: [TUBEMQ-480] batch set brokers to read or write mode and query brokers run info or config

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
     new 746b8c1  [TUBEMQ-480] batch set brokers to read or write mode and query brokers run info or config
746b8c1 is described below

commit 746b8c182ce0570e8c694e506c8c4f3f5a5c60e9
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Thu Dec 24 15:43:13 2020 +0800

    [TUBEMQ-480] batch set brokers to read or write mode and query brokers run info or config
---
 .../controller/cluster/ClusterController.java      |  7 ++-
 .../manager/controller/node/NodeController.java    | 71 +++++++++++++++++++---
 .../apache/tubemq/manager/utils/MasterUtils.java   | 36 +++++++++++
 3 files changed, 103 insertions(+), 11 deletions(-)

diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
index 4b58313..562813c 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -43,17 +43,22 @@ import org.springframework.web.bind.annotation.RestController;
 public class ClusterController {
 
     private final Gson gson = new Gson();
+
     @Autowired
     private NodeRepository nodeRepository;
 
     @Autowired
     public MasterUtils masterUtil;
 
+    /**
+     * query cluster info
+     */
     @RequestMapping(value = "/query", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
     public @ResponseBody String queryInfo(
             @RequestParam Map<String, String> queryBody) throws Exception {
-        return gson.toJson(masterUtil.redirectToMaster(queryBody));
+        String url = masterUtil.getQueryUrl(queryBody);
+        return queryMaster(url);
     }
 
     /**
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 8a6e586..ab5567d 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -24,7 +24,6 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
-import org.apache.tubemq.manager.controller.node.request.BrokerConf;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.NodeService;
@@ -33,8 +32,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
@@ -62,20 +59,44 @@ public class NodeController {
     @Autowired
     MasterUtils masterUtil;
 
-    @RequestMapping(value = "/query", method = RequestMethod.GET,
+    @RequestMapping(value = "/query/clusterInfo", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
     public @ResponseBody String queryInfo(@RequestParam String type, @RequestParam String method,
             @RequestParam(required = false) Integer clusterId) {
-
         if (method.equals(ADMIN_QUERY_CLUSTER_INFO) && type.equals(OP_QUERY)) {
             return nodeService.queryClusterInfo(clusterId);
         }
-
         return gson.toJson(getErrorResult(NO_SUCH_METHOD));
     }
 
 
     /**
+     * query brokers' run status
+     * this method supports batch operation
+     */
+    @RequestMapping(value = "/query/brokerStatus", method = RequestMethod.GET,
+            produces = MediaType.APPLICATION_JSON_VALUE)
+    public @ResponseBody String queryBrokerDetail(
+            @RequestParam Map<String, String> queryBody) throws Exception {
+        String url = masterUtil.getQueryUrl(queryBody);
+        return queryMaster(url);
+    }
+
+
+    /**
+     * query brokers' configuration
+     * this method supports batch operation
+     */
+    @RequestMapping(value = "/query/brokerConfig", method = RequestMethod.GET,
+            produces = MediaType.APPLICATION_JSON_VALUE)
+    public @ResponseBody String queryBrokerConfig(
+            @RequestParam Map<String, String> queryBody) throws Exception {
+        String url = masterUtil.getQueryUrl(queryBody);
+        return queryMaster(url);
+    }
+
+
+    /**
      * add brokers to cluster, need to check token and
      * make sure user has authorization to modify it.
      */
@@ -102,8 +123,8 @@ public class NodeController {
 
 
     /**
-     * add brokers to cluster, need to check token and
-     * make sure user has authorization to modify it.
+     * online brokers in cluster
+     * this method supports batch operation
      */
     @RequestMapping(value = "/online", method = RequestMethod.GET)
     public @ResponseBody String onlineBrokers(
@@ -111,14 +132,20 @@ public class NodeController {
         return gson.toJson(masterUtil.redirectToMaster(queryBody));
     }
 
-
+    /**
+     * reload brokers in cluster
+     * this method supports batch operation
+     */
     @RequestMapping(value = "/reload", method = RequestMethod.GET)
     public @ResponseBody String reloadBrokers(
             @RequestParam Map<String, String> queryBody) throws Exception {
         return gson.toJson(masterUtil.redirectToMaster(queryBody));
     }
 
-
+    /**
+     * delete brokers in cluster
+     * this method supports batch operation
+     */
     @RequestMapping(value = "/delete", method = RequestMethod.GET)
     public @ResponseBody String deleteBrokers(
             @RequestParam Map<String, String> queryBody) throws Exception {
@@ -126,6 +153,30 @@ public class NodeController {
         return gson.toJson(result);
     }
 
+    /**
+     * change brokers' read mode in cluster
+     * this method supports batch operation
+     */
+    @RequestMapping(value = "/setRead", method = RequestMethod.GET)
+    public @ResponseBody String setBrokersRead(
+            @RequestParam Map<String, String> queryBody) throws Exception {
+        TubeMQResult result = masterUtil.redirectToMaster(queryBody);
+        return gson.toJson(result);
+    }
+
+    /**
+     * change brokers' write mode in cluster
+     * this method supports batch operation
+     */
+    @RequestMapping(value = "/setWrite", method = RequestMethod.GET)
+    public @ResponseBody String setBrokersWrite(
+            @RequestParam Map<String, String> queryBody) throws Exception {
+        TubeMQResult result = masterUtil.redirectToMaster(queryBody);
+        return gson.toJson(result);
+    }
+
+
+
     private TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
         String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
                 + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
index 95def0c..811dbca 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
@@ -25,12 +25,14 @@ import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.RequestBody;
 
 import java.io.InputStreamReader;
 import java.net.URLEncoder;
@@ -88,6 +90,27 @@ public class MasterUtils {
         return defaultResult;
     }
 
+    /**
+     * query master to get node info
+     * @param url
+     * @return query info
+     */
+    public static String queryMaster(String url) {
+        log.info("start to request {}", url);
+        HttpGet httpGet = new HttpGet(url);
+        TubeMQResult defaultResult = new TubeMQResult();
+        try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
+            // return result json to response
+            return EntityUtils.toString(response.getEntity());
+        } catch (Exception ex) {
+            log.error("exception caught while requesting broker status", ex);
+            defaultResult.setErrCode(-1);
+            defaultResult.setResult(false);
+            defaultResult.setErrMsg(ex.getMessage());
+        }
+        return gson.toJson(defaultResult);
+    }
+
 
 
 
@@ -96,8 +119,21 @@ public class MasterUtils {
         queryBody.remove("clusterId");
         NodeEntry nodeEntry =
                 nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
+        if (nodeEntry == null) {
+            return TubeMQResult.getErrorResult("ClusterId doesn't exist");
+        }
         String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
                 + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
         return requestMaster(url);
     }
+
+    public String getQueryUrl(Map<String, String> queryBody) throws Exception {
+        int clusterId = Integer.parseInt(queryBody.get("clusterId"));
+        queryBody.remove("clusterId");
+        NodeEntry nodeEntry =
+                nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
+        return SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
+    }
+
 }