You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/24 10:53:27 UTC

[incubator-tubemq] 02/02: [TUBEMQ-478] delete\reload\online brokers in cluster

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 5c69de9b540150de1d40a58363805fc259dffceb
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Thu Dec 24 11:56:39 2020 +0800

    [TUBEMQ-478] delete\reload\online brokers in cluster
---
 .../tubemq/manager/controller/TubeMQResult.java    |   2 +-
 .../controller/cluster/ClusterController.java      |  53 ++---------
 .../manager/controller/node/NodeController.java    |  82 +++++++---------
 .../apache/tubemq/manager/service/NodeService.java |   7 --
 .../apache/tubemq/manager/utils/MasterUtils.java   | 103 +++++++++++++++++++++
 .../manager/controller/TestNodeController.java     |  50 +---------
 6 files changed, 149 insertions(+), 148 deletions(-)

diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
index 939d0a8..6e4a970 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
@@ -32,7 +32,7 @@ public class TubeMQResult {
     private boolean result = true;
 
     public static TubeMQResult getErrorResult(String errorMsg) {
-        return TubeMQResult.builder().errCode(1)
+        return TubeMQResult.builder().errCode(-1)
                 .errMsg(errorMsg).result(false).build();
     }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
index 53e1aa3..4b58313 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -17,24 +17,17 @@
 
 package org.apache.tubemq.manager.controller.cluster;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static org.apache.tubemq.manager.utils.MasterUtils.*;
 
 import com.google.gson.Gson;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.utils.MasterUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -49,52 +42,18 @@ import org.springframework.web.bind.annotation.RestController;
 @Slf4j
 public class ClusterController {
 
-    private final CloseableHttpClient httpclient = HttpClients.createDefault();
     private final Gson gson = new Gson();
-
-    private static final String TUBE_REQUEST_PATH = "webapi.htm";
-
     @Autowired
     private NodeRepository nodeRepository;
 
-
-    private String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
-        List<String> queryList = new ArrayList<>();
-
-        for (Map.Entry<String, String> entry : requestMap.entrySet()) {
-            queryList.add(entry.getKey() + "=" + URLEncoder.encode(
-                    entry.getValue(), UTF_8.toString()));
-        }
-        return StringUtils.join(queryList, "&");
-    }
-
-    private String queryMaster(String url) {
-        log.info("start to request {}", url);
-        HttpGet httpGet = new HttpGet(url);
-        TubeMQResult defaultResult = new TubeMQResult();
-        try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
-            // return result json to response
-            return EntityUtils.toString(response.getEntity());
-        } catch (Exception ex) {
-            log.error("exception caught while requesting broker status", ex);
-            defaultResult.setErrCode(-1);
-            defaultResult.setResult(false);
-            defaultResult.setErrMsg(ex.getMessage());
-        }
-        return gson.toJson(defaultResult);
-    }
+    @Autowired
+    public MasterUtils masterUtil;
 
     @RequestMapping(value = "/query", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
     public @ResponseBody String queryInfo(
             @RequestParam Map<String, String> queryBody) throws Exception {
-        int clusterId = Integer.parseInt(queryBody.get("clusterId"));
-        queryBody.remove("clusterId");
-        NodeEntry nodeEntry =
-                nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
-        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
-        return queryMaster(url);
+        return gson.toJson(masterUtil.redirectToMaster(queryBody));
     }
 
     /**
@@ -114,7 +73,7 @@ public class ClusterController {
                     clusterId);
             String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
                     + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(requestBody);
-            return queryMaster(url);
+            return gson.toJson(requestMaster(url));
         } else {
             TubeMQResult result = new TubeMQResult();
             result.setErrCode(-1);
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 7b108e7..8a6e586 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -20,28 +20,27 @@ package org.apache.tubemq.manager.controller.node;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
 import org.apache.tubemq.manager.controller.node.request.BrokerConf;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
+import org.apache.tubemq.manager.utils.MasterUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 
-import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
+import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
 import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+import static org.apache.tubemq.manager.utils.MasterUtils.*;
 
 @RestController
 @RequestMapping(path = "/v1/node")
@@ -51,7 +50,6 @@ public class NodeController {
     public static final String NO_SUCH_METHOD = "no such method";
     public static final String OP_QUERY = "op_query";
     public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info";
-    public static final String TUBE_REQUEST_PATH = "webapi.htm";
     private final Gson gson = new Gson();
     private static final CloseableHttpClient httpclient = HttpClients.createDefault();
 
@@ -61,6 +59,8 @@ public class NodeController {
     @Autowired
     NodeRepository nodeRepository;
 
+    @Autowired
+    MasterUtils masterUtil;
 
     @RequestMapping(value = "/query", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
@@ -71,7 +71,7 @@ public class NodeController {
             return nodeService.queryClusterInfo(clusterId);
         }
 
-        return gson.toJson(TubeMQResult.getErrorResult(NO_SUCH_METHOD));
+        return gson.toJson(getErrorResult(NO_SUCH_METHOD));
     }
 
 
@@ -88,7 +88,8 @@ public class NodeController {
         if (StringUtils.isNotBlank(token)) {
             NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
                     clusterId);
-            return addToMasterAndRepo(req, masterEntry);
+            TubeMQResult result = addBrokersToCluster(req, masterEntry);
+            return gson.toJson(result);
         } else {
             TubeMQResult result = new TubeMQResult();
             result.setErrCode(-1);
@@ -99,53 +100,38 @@ public class NodeController {
 
     }
 
-    private String addToMasterAndRepo(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
 
-        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+    /**
+     * add brokers to cluster, need to check token and
+     * make sure user has authorization to modify it.
+     */
+    @RequestMapping(value = "/online", method = RequestMethod.GET)
+    public @ResponseBody String onlineBrokers(
+            @RequestParam Map<String, String> queryBody) throws Exception {
+        return gson.toJson(masterUtil.redirectToMaster(queryBody));
+    }
 
-        log.info("start to request {}", url);
-        HttpGet httpGet = new HttpGet(url);
-        TubeMQResult defaultResult = new TubeMQResult();
-
-        try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
-            TubeHttpResponse result =
-                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
-                            TubeHttpResponse.class);
-            if (result.getCode() == 0 && result.getErrCode() == 0) {
-                // save brokers to db when success
-                saveAllBrokers(req.getBrokerJsonSet(), req.getClusterId(), masterEntry);
-            } else {
-                return result.getErrMsg();
-            }
-        } catch (Exception ex) {
-            log.error("exception caught while requesting broker status", ex);
-            defaultResult.setErrCode(-1);
-            defaultResult.setResult(false);
-            defaultResult.setErrMsg(ex.getMessage());
-        }
 
-        return gson.toJson(defaultResult);
+    @RequestMapping(value = "/reload", method = RequestMethod.GET)
+    public @ResponseBody String reloadBrokers(
+            @RequestParam Map<String, String> queryBody) throws Exception {
+        return gson.toJson(masterUtil.redirectToMaster(queryBody));
     }
 
 
+    @RequestMapping(value = "/delete", method = RequestMethod.GET)
+    public @ResponseBody String deleteBrokers(
+            @RequestParam Map<String, String> queryBody) throws Exception {
+        TubeMQResult result = masterUtil.redirectToMaster(queryBody);
+        return gson.toJson(result);
+    }
 
-
-    private void saveAllBrokers(List<BrokerConf> brokerConfList, int clusterId, NodeEntry masterEntry) {
-        List<NodeEntry> nodeEntries = new ArrayList<>();
-        for (BrokerConf brokerConf : brokerConfList) {
-            NodeEntry node = new NodeEntry();
-            node.setBroker(true);
-            node.setClusterId(clusterId);
-            node.setClusterName(masterEntry.getClusterName());
-            node.setBrokerId(brokerConf.getBrokerId());
-            node.setMaster(false);
-            node.setIp(brokerConf.getBrokerIp());
-            node.setStandby(false);
-            node.setPort(brokerConf.getBrokerPort());
-            nodeEntries.add(node);
-        }
-        nodeService.saveNodes(nodeEntries);
+    private TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
+        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+        TubeMQResult tubeMQResult = requestMaster(url);
+        return tubeMQResult;
     }
 
+
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index a9d39b3..836fe8a 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -293,13 +293,6 @@ public class NodeService {
 
 
 
-    public void saveNodes(List<NodeEntry> nodes) {
-        nodeRepository.saveAll(nodes);
-    }
-
-
-
-
     public void close() throws IOException {
         httpclient.close();
     }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
new file mode 100644
index 0000000..95def0c
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tubemq.manager.utils;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.InputStreamReader;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+
+
+@Slf4j
+@Component
+public class MasterUtils {
+
+    private static CloseableHttpClient httpclient = HttpClients.createDefault();
+    private static Gson gson = new Gson();
+    public static final String TUBE_REQUEST_PATH = "webapi.htm";
+
+    @Autowired
+    NodeRepository nodeRepository;
+
+    public static String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
+        List<String> queryList = new ArrayList<>();
+
+        for (Map.Entry<String, String> entry : requestMap.entrySet()) {
+            queryList.add(entry.getKey() + "=" + URLEncoder.encode(
+                    entry.getValue(), UTF_8.toString()));
+        }
+        return StringUtils.join(queryList, "&");
+    }
+
+
+
+    public static TubeMQResult requestMaster(String url) throws Exception {
+
+        log.info("start to request {}", url);
+        HttpGet httpGet = new HttpGet(url);
+        TubeMQResult defaultResult = new TubeMQResult();
+
+        try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
+            TubeHttpResponse tubeResponse =
+                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+                            TubeHttpResponse.class);
+            if (tubeResponse.getCode() == 0 && tubeResponse.getErrCode() == 0) {
+                return defaultResult;
+            } else {
+                defaultResult = getErrorResult(tubeResponse.getErrMsg());
+            }
+        } catch (Exception ex) {
+            log.error("exception caught while requesting broker status", ex);
+            defaultResult = getErrorResult(ex.getMessage());
+        }
+        return defaultResult;
+    }
+
+
+
+
+    public TubeMQResult redirectToMaster(Map<String, String> queryBody) throws Exception {
+        int clusterId = Integer.parseInt(queryBody.get("clusterId"));
+        queryBody.remove("clusterId");
+        NodeEntry nodeEntry =
+                nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
+        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
+        return requestMaster(url);
+    }
+}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
index 935bb62..9b8ce64 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
@@ -30,13 +30,17 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.MvcResult;
 import org.springframework.test.web.servlet.RequestBuilder;
 
 import java.util.List;
+import java.util.Objects;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
@@ -96,14 +100,6 @@ public class TestNodeController {
                     "\"standby\":[],\"broker\":[]}}]," +
                     "\"errMsg\":\"\",\"errCode\":0,\"result\":true}";
 
-    private NodeEntry getNodeEntry() {
-        NodeEntry nodeEntry = new NodeEntry();
-        nodeEntry.setMaster(true);
-        nodeEntry.setIp("127.0.0.1");
-        nodeEntry.setWebPort(8084);
-        return nodeEntry;
-    }
-
     @Test
     public void testClusterInfo() throws Exception {
         List<NodeEntry> nodeEntries = getOneNodeEntry();
@@ -132,40 +128,4 @@ public class TestNodeController {
         log.info("result json string is {}, response type is {}", resultStr,
                 result.getResponse().getContentType());
     }
-
-    @Test
-    public void testAddBrokersToCluster() throws Exception {
-        String jsonStr = "{\n" +
-                "\t\"confModAuthToken\": \"abc\",\n" +
-                "\t\"createUser\": \"test\",\n" +
-                "\t\"clusterId\": 1,\n" +
-                "\t\"method\": \"admin_bath_add_broker_configure\",\n" +
-                "\t\"type\": \"op_modify\",\n" +
-                "\t\"brokerJsonSet\": [{\n" +
-                "\t\t\"brokerId\": 234,\n" +
-                "\t\t\"brokerIp\": \"127.0 .0 .1\",\n" +
-                "\t\t\"brokerPort\": 8124,\n" +
-                "\t\t\"numPartitions\": 3,\n" +
-                "\t\t\"unflushThreshold\": 55,\n" +
-                "\t\t\"unflushInterval\": 10000,\n" +
-                "\t\t\"deleteWhen\": \"0 0 6,18 * * ?\",\n" +
-                "\t\t\"deletePolicy\": \"delete,168\",\n" +
-                "\t\t\"acceptPublish\": \"true\",\n" +
-                "\t\t\"acceptSubscribe\": \"true\",\n" +
-                "\t\t\"createUser\": \"gosonzhang\",\n" +
-                "\t\t\"createDate\": \"20151116142135\",\n" +
-                "\t\t\"modifyUser\": \"gosonzhang\",\n" +
-                "\t\t\"modifyDate\": \"20151117161515\"\n" +
-                "\t}]\n" +
-                "\n" +
-                "}";
-        NodeEntry nodeEntry = getNodeEntry();
-        doReturn(nodeEntry).when(nodeRepository).findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class));
-        RequestBuilder request = post("/v1/node/add")
-                .contentType(MediaType.APPLICATION_JSON).content(jsonStr);
-        MvcResult result = mockMvc.perform(request).andReturn();
-        String resultStr = result.getResponse().getContentAsString();
-        log.info("result json string is {}, response type is {}", resultStr,
-                result.getResponse().getContentType());
-        }
-    }
+}