You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/24 10:53:25 UTC

[incubator-tubemq] branch TUBEMQ-421 updated (37739df -> 5c69de9)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a change to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git.


    from 37739df  [TUBEMQ-439] Add cluster info query (#334)
     new 512b160  [TUBEMQ-485] add one or more broker in cluster
     new 5c69de9  [TUBEMQ-478] delete\reload\online brokers in cluster

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tubemq/manager/controller/TubeMQResult.java    |   2 +-
 .../controller/cluster/ClusterController.java      |  53 ++---------
 .../manager/controller/node/NodeController.java    |  87 ++++++++++++++++-
 .../controller/node/request/AddBrokersReq.java     |  26 ++++--
 .../node/request/BrokerConf.java}                  |  52 +++++------
 .../apache/tubemq/manager/service/NodeService.java |   1 +
 .../apache/tubemq/manager/utils/ConvertUtils.java  |  53 +++++++++++
 .../apache/tubemq/manager/utils/MasterUtils.java   | 103 +++++++++++++++++++++
 .../manager/controller/TestBusinessController.java |   2 +
 .../manager/controller/TestNodeController.java     |   9 +-
 10 files changed, 299 insertions(+), 89 deletions(-)
 copy tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageListener.java => tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java (68%)
 copy tubemq-manager/src/main/java/org/apache/tubemq/manager/{entry/NodeEntry.java => controller/node/request/BrokerConf.java} (55%)
 create mode 100644 tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
 create mode 100644 tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java


[incubator-tubemq] 01/02: [TUBEMQ-485] add one or more broker in cluster

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 512b1600afb3d9d9eb6e39ae16a0db33be3cd86c
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Wed Dec 23 10:31:47 2020 +0800

    [TUBEMQ-485] add one or more broker in cluster
---
 .../manager/controller/node/NodeController.java    | 99 +++++++++++++++++++++-
 .../controller/node/request/AddBrokersReq.java     | 46 ++++++++++
 .../controller/node/request/BrokerConf.java        | 47 ++++++++++
 .../apache/tubemq/manager/service/NodeService.java |  8 ++
 .../apache/tubemq/manager/utils/ConvertUtils.java  | 53 ++++++++++++
 .../manager/controller/TestBusinessController.java |  2 +
 .../manager/controller/TestNodeController.java     | 49 ++++++++++-
 7 files changed, 300 insertions(+), 4 deletions(-)

diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index fe64b7b..7b108e7 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -19,12 +19,30 @@ package org.apache.tubemq.manager.controller.node;
 
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.BrokerConf;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.NodeService;
+import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+
 @RestController
 @RequestMapping(path = "/v1/node")
 @Slf4j
@@ -33,12 +51,17 @@ public class NodeController {
     public static final String NO_SUCH_METHOD = "no such method";
     public static final String OP_QUERY = "op_query";
     public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info";
-
+    public static final String TUBE_REQUEST_PATH = "webapi.htm";
     private final Gson gson = new Gson();
+    private static final CloseableHttpClient httpclient = HttpClients.createDefault();
 
     @Autowired
     NodeService nodeService;
 
+    @Autowired
+    NodeRepository nodeRepository;
+
+
     @RequestMapping(value = "/query", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
     public @ResponseBody String queryInfo(@RequestParam String type, @RequestParam String method,
@@ -51,4 +74,78 @@ public class NodeController {
         return gson.toJson(TubeMQResult.getErrorResult(NO_SUCH_METHOD));
     }
 
+
+    /**
+     * add brokers to cluster, need to check token and
+     * make sure user has authorization to modify it.
+     */
+    @RequestMapping(value = "/add", method = RequestMethod.POST)
+    public @ResponseBody String addBrokersToCluster(
+            @RequestBody AddBrokersReq req) throws Exception {
+        String token = req.getConfModAuthToken();
+        int clusterId = req.getClusterId();
+
+        if (StringUtils.isNotBlank(token)) {
+            NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+                    clusterId);
+            return addToMasterAndRepo(req, masterEntry);
+        } else {
+            TubeMQResult result = new TubeMQResult();
+            result.setErrCode(-1);
+            result.setResult(false);
+            result.setErrMsg("token is not correct");
+            return gson.toJson(result);
+        }
+
+    }
+
+    private String addToMasterAndRepo(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
+
+        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+
+        log.info("start to request {}", url);
+        HttpGet httpGet = new HttpGet(url);
+        TubeMQResult defaultResult = new TubeMQResult();
+
+        try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
+            TubeHttpResponse result =
+                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+                            TubeHttpResponse.class);
+            if (result.getCode() == 0 && result.getErrCode() == 0) {
+                // save brokers to db when success
+                saveAllBrokers(req.getBrokerJsonSet(), req.getClusterId(), masterEntry);
+            } else {
+                return result.getErrMsg();
+            }
+        } catch (Exception ex) {
+            log.error("exception caught while requesting broker status", ex);
+            defaultResult.setErrCode(-1);
+            defaultResult.setResult(false);
+            defaultResult.setErrMsg(ex.getMessage());
+        }
+
+        return gson.toJson(defaultResult);
+    }
+
+
+
+
+    private void saveAllBrokers(List<BrokerConf> brokerConfList, int clusterId, NodeEntry masterEntry) {
+        List<NodeEntry> nodeEntries = new ArrayList<>();
+        for (BrokerConf brokerConf : brokerConfList) {
+            NodeEntry node = new NodeEntry();
+            node.setBroker(true);
+            node.setClusterId(clusterId);
+            node.setClusterName(masterEntry.getClusterName());
+            node.setBrokerId(brokerConf.getBrokerId());
+            node.setMaster(false);
+            node.setIp(brokerConf.getBrokerIp());
+            node.setStandby(false);
+            node.setPort(brokerConf.getBrokerPort());
+            nodeEntries.add(node);
+        }
+        nodeService.saveNodes(nodeEntries);
+    }
+
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
new file mode 100644
index 0000000..13d10c4
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller.node.request;
+
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class AddBrokersReq {
+
+    public String confModAuthToken;
+
+    public String createUser;
+
+    public int clusterId;
+
+    /**
+     * admin_bath_add_broker_configure
+     */
+    public String method;
+
+    /**
+     * op_modify
+     */
+    public String type;
+
+    public List<BrokerConf> brokerJsonSet;
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
new file mode 100644
index 0000000..fc48d47
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller.node.request;
+
+
+import lombok.Data;
+
+import java.security.SecureRandom;
+
+@Data
+public class BrokerConf {
+
+    public String brokerIp;
+    public Integer brokerPort;
+    public Integer brokerId;
+    public String deleteWhen;
+    public Integer numPartitions;
+    public Integer unflushThreshold;
+    public Integer unflushIntegererval;
+    public Integer unflushDataHold;
+    public boolean acceptPublish;
+    public boolean acceptSubscribe;
+    public String createUser;
+    public Integer brokerTLSPort;
+    public Integer numTopicStores;
+    public Integer memCacheMsgCntInK;
+    public Integer memCacheMsgSizeInMB;
+    public Integer memCacheFlushIntegervl;
+
+}
+
+
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index 2e819cc..a9d39b3 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -292,6 +292,14 @@ public class NodeService {
     }
 
 
+
+    public void saveNodes(List<NodeEntry> nodes) {
+        nodeRepository.saveAll(nodes);
+    }
+
+
+
+
     public void close() throws IOException {
         httpclient.close();
     }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
new file mode 100644
index 0000000..126b347
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.utils;
+
+import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Field;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class ConvertUtils {
+
+    public static Gson gson = new Gson();
+
+    public static String convertReqToQueryStr(Object req) throws Exception {
+        List<String> queryList = new ArrayList<>();
+        Class<?> clz = req.getClass();
+        Field[] fields = clz.getDeclaredFields();
+        for (Field field : fields) {
+            field.setAccessible(true);
+            Object o = field.get(req);
+            String value;
+            // convert list to json string
+            if (o instanceof List) {
+                value = gson.toJson(o);
+            } else {
+                value = o.toString();
+            }
+            queryList.add(field.getName() + "=" + URLEncoder.encode(
+                    value, UTF_8.toString()));
+        }
+        return StringUtils.join(queryList, "&");
+    }
+}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
index 0dbec60..1b3c0ed 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
@@ -96,4 +96,6 @@ public class TestBusinessController {
         assertThat(Objects.requireNonNull(responseEntity.getBody()).getErrCode()).isEqualTo(-1);
         assertTrue(responseEntity.getBody().getErrMsg().contains("exception for test"));
     }
+
+
 }
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
index 0b8958e..935bb62 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
@@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.http.MediaType;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.MvcResult;
@@ -37,6 +38,7 @@ import org.springframework.test.web.servlet.RequestBuilder;
 import java.util.List;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
@@ -94,6 +96,14 @@ public class TestNodeController {
                     "\"standby\":[],\"broker\":[]}}]," +
                     "\"errMsg\":\"\",\"errCode\":0,\"result\":true}";
 
+    private NodeEntry getNodeEntry() {
+        NodeEntry nodeEntry = new NodeEntry();
+        nodeEntry.setMaster(true);
+        nodeEntry.setIp("127.0.0.1");
+        nodeEntry.setWebPort(8084);
+        return nodeEntry;
+    }
+
     @Test
     public void testClusterInfo() throws Exception {
         List<NodeEntry> nodeEntries = getOneNodeEntry();
@@ -123,6 +133,39 @@ public class TestNodeController {
                 result.getResponse().getContentType());
     }
 
-
-
-}
+    @Test
+    public void testAddBrokersToCluster() throws Exception {
+        String jsonStr = "{\n" +
+                "\t\"confModAuthToken\": \"abc\",\n" +
+                "\t\"createUser\": \"test\",\n" +
+                "\t\"clusterId\": 1,\n" +
+                "\t\"method\": \"admin_bath_add_broker_configure\",\n" +
+                "\t\"type\": \"op_modify\",\n" +
+                "\t\"brokerJsonSet\": [{\n" +
+                "\t\t\"brokerId\": 234,\n" +
+                "\t\t\"brokerIp\": \"127.0 .0 .1\",\n" +
+                "\t\t\"brokerPort\": 8124,\n" +
+                "\t\t\"numPartitions\": 3,\n" +
+                "\t\t\"unflushThreshold\": 55,\n" +
+                "\t\t\"unflushInterval\": 10000,\n" +
+                "\t\t\"deleteWhen\": \"0 0 6,18 * * ?\",\n" +
+                "\t\t\"deletePolicy\": \"delete,168\",\n" +
+                "\t\t\"acceptPublish\": \"true\",\n" +
+                "\t\t\"acceptSubscribe\": \"true\",\n" +
+                "\t\t\"createUser\": \"gosonzhang\",\n" +
+                "\t\t\"createDate\": \"20151116142135\",\n" +
+                "\t\t\"modifyUser\": \"gosonzhang\",\n" +
+                "\t\t\"modifyDate\": \"20151117161515\"\n" +
+                "\t}]\n" +
+                "\n" +
+                "}";
+        NodeEntry nodeEntry = getNodeEntry();
+        doReturn(nodeEntry).when(nodeRepository).findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class));
+        RequestBuilder request = post("/v1/node/add")
+                .contentType(MediaType.APPLICATION_JSON).content(jsonStr);
+        MvcResult result = mockMvc.perform(request).andReturn();
+        String resultStr = result.getResponse().getContentAsString();
+        log.info("result json string is {}, response type is {}", resultStr,
+                result.getResponse().getContentType());
+        }
+    }


[incubator-tubemq] 02/02: [TUBEMQ-478] delete\reload\online brokers in cluster

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 5c69de9b540150de1d40a58363805fc259dffceb
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Thu Dec 24 11:56:39 2020 +0800

    [TUBEMQ-478] delete\reload\online brokers in cluster
---
 .../tubemq/manager/controller/TubeMQResult.java    |   2 +-
 .../controller/cluster/ClusterController.java      |  53 ++---------
 .../manager/controller/node/NodeController.java    |  82 +++++++---------
 .../apache/tubemq/manager/service/NodeService.java |   7 --
 .../apache/tubemq/manager/utils/MasterUtils.java   | 103 +++++++++++++++++++++
 .../manager/controller/TestNodeController.java     |  50 +---------
 6 files changed, 149 insertions(+), 148 deletions(-)

diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
index 939d0a8..6e4a970 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
@@ -32,7 +32,7 @@ public class TubeMQResult {
     private boolean result = true;
 
     public static TubeMQResult getErrorResult(String errorMsg) {
-        return TubeMQResult.builder().errCode(1)
+        return TubeMQResult.builder().errCode(-1)
                 .errMsg(errorMsg).result(false).build();
     }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
index 53e1aa3..4b58313 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -17,24 +17,17 @@
 
 package org.apache.tubemq.manager.controller.cluster;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static org.apache.tubemq.manager.utils.MasterUtils.*;
 
 import com.google.gson.Gson;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.utils.MasterUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -49,52 +42,18 @@ import org.springframework.web.bind.annotation.RestController;
 @Slf4j
 public class ClusterController {
 
-    private final CloseableHttpClient httpclient = HttpClients.createDefault();
     private final Gson gson = new Gson();
-
-    private static final String TUBE_REQUEST_PATH = "webapi.htm";
-
     @Autowired
     private NodeRepository nodeRepository;
 
-
-    private String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
-        List<String> queryList = new ArrayList<>();
-
-        for (Map.Entry<String, String> entry : requestMap.entrySet()) {
-            queryList.add(entry.getKey() + "=" + URLEncoder.encode(
-                    entry.getValue(), UTF_8.toString()));
-        }
-        return StringUtils.join(queryList, "&");
-    }
-
-    private String queryMaster(String url) {
-        log.info("start to request {}", url);
-        HttpGet httpGet = new HttpGet(url);
-        TubeMQResult defaultResult = new TubeMQResult();
-        try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
-            // return result json to response
-            return EntityUtils.toString(response.getEntity());
-        } catch (Exception ex) {
-            log.error("exception caught while requesting broker status", ex);
-            defaultResult.setErrCode(-1);
-            defaultResult.setResult(false);
-            defaultResult.setErrMsg(ex.getMessage());
-        }
-        return gson.toJson(defaultResult);
-    }
+    @Autowired
+    public MasterUtils masterUtil;
 
     @RequestMapping(value = "/query", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
     public @ResponseBody String queryInfo(
             @RequestParam Map<String, String> queryBody) throws Exception {
-        int clusterId = Integer.parseInt(queryBody.get("clusterId"));
-        queryBody.remove("clusterId");
-        NodeEntry nodeEntry =
-                nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
-        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
-        return queryMaster(url);
+        return gson.toJson(masterUtil.redirectToMaster(queryBody));
     }
 
     /**
@@ -114,7 +73,7 @@ public class ClusterController {
                     clusterId);
             String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
                     + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(requestBody);
-            return queryMaster(url);
+            return gson.toJson(requestMaster(url));
         } else {
             TubeMQResult result = new TubeMQResult();
             result.setErrCode(-1);
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 7b108e7..8a6e586 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -20,28 +20,27 @@ package org.apache.tubemq.manager.controller.node;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
 import org.apache.tubemq.manager.controller.node.request.BrokerConf;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
+import org.apache.tubemq.manager.utils.MasterUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 
-import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
+import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
 import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+import static org.apache.tubemq.manager.utils.MasterUtils.*;
 
 @RestController
 @RequestMapping(path = "/v1/node")
@@ -51,7 +50,6 @@ public class NodeController {
     public static final String NO_SUCH_METHOD = "no such method";
     public static final String OP_QUERY = "op_query";
     public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info";
-    public static final String TUBE_REQUEST_PATH = "webapi.htm";
     private final Gson gson = new Gson();
     private static final CloseableHttpClient httpclient = HttpClients.createDefault();
 
@@ -61,6 +59,8 @@ public class NodeController {
     @Autowired
     NodeRepository nodeRepository;
 
+    @Autowired
+    MasterUtils masterUtil;
 
     @RequestMapping(value = "/query", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
@@ -71,7 +71,7 @@ public class NodeController {
             return nodeService.queryClusterInfo(clusterId);
         }
 
-        return gson.toJson(TubeMQResult.getErrorResult(NO_SUCH_METHOD));
+        return gson.toJson(getErrorResult(NO_SUCH_METHOD));
     }
 
 
@@ -88,7 +88,8 @@ public class NodeController {
         if (StringUtils.isNotBlank(token)) {
             NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
                     clusterId);
-            return addToMasterAndRepo(req, masterEntry);
+            TubeMQResult result = addBrokersToCluster(req, masterEntry);
+            return gson.toJson(result);
         } else {
             TubeMQResult result = new TubeMQResult();
             result.setErrCode(-1);
@@ -99,53 +100,38 @@ public class NodeController {
 
     }
 
-    private String addToMasterAndRepo(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
 
-        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+    /**
+     * add brokers to cluster, need to check token and
+     * make sure user has authorization to modify it.
+     */
+    @RequestMapping(value = "/online", method = RequestMethod.GET)
+    public @ResponseBody String onlineBrokers(
+            @RequestParam Map<String, String> queryBody) throws Exception {
+        return gson.toJson(masterUtil.redirectToMaster(queryBody));
+    }
 
-        log.info("start to request {}", url);
-        HttpGet httpGet = new HttpGet(url);
-        TubeMQResult defaultResult = new TubeMQResult();
-
-        try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
-            TubeHttpResponse result =
-                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
-                            TubeHttpResponse.class);
-            if (result.getCode() == 0 && result.getErrCode() == 0) {
-                // save brokers to db when success
-                saveAllBrokers(req.getBrokerJsonSet(), req.getClusterId(), masterEntry);
-            } else {
-                return result.getErrMsg();
-            }
-        } catch (Exception ex) {
-            log.error("exception caught while requesting broker status", ex);
-            defaultResult.setErrCode(-1);
-            defaultResult.setResult(false);
-            defaultResult.setErrMsg(ex.getMessage());
-        }
 
-        return gson.toJson(defaultResult);
+    @RequestMapping(value = "/reload", method = RequestMethod.GET)
+    public @ResponseBody String reloadBrokers(
+            @RequestParam Map<String, String> queryBody) throws Exception {
+        return gson.toJson(masterUtil.redirectToMaster(queryBody));
     }
 
 
+    @RequestMapping(value = "/delete", method = RequestMethod.GET)
+    public @ResponseBody String deleteBrokers(
+            @RequestParam Map<String, String> queryBody) throws Exception {
+        TubeMQResult result = masterUtil.redirectToMaster(queryBody);
+        return gson.toJson(result);
+    }
 
-
-    private void saveAllBrokers(List<BrokerConf> brokerConfList, int clusterId, NodeEntry masterEntry) {
-        List<NodeEntry> nodeEntries = new ArrayList<>();
-        for (BrokerConf brokerConf : brokerConfList) {
-            NodeEntry node = new NodeEntry();
-            node.setBroker(true);
-            node.setClusterId(clusterId);
-            node.setClusterName(masterEntry.getClusterName());
-            node.setBrokerId(brokerConf.getBrokerId());
-            node.setMaster(false);
-            node.setIp(brokerConf.getBrokerIp());
-            node.setStandby(false);
-            node.setPort(brokerConf.getBrokerPort());
-            nodeEntries.add(node);
-        }
-        nodeService.saveNodes(nodeEntries);
+    private TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
+        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+        TubeMQResult tubeMQResult = requestMaster(url);
+        return tubeMQResult;
     }
 
+
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index a9d39b3..836fe8a 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -293,13 +293,6 @@ public class NodeService {
 
 
 
-    public void saveNodes(List<NodeEntry> nodes) {
-        nodeRepository.saveAll(nodes);
-    }
-
-
-
-
     public void close() throws IOException {
         httpclient.close();
     }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
new file mode 100644
index 0000000..95def0c
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tubemq.manager.utils;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.InputStreamReader;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+
+
+@Slf4j
+@Component
+public class MasterUtils {
+
+    private static CloseableHttpClient httpclient = HttpClients.createDefault();
+    private static Gson gson = new Gson();
+    public static final String TUBE_REQUEST_PATH = "webapi.htm";
+
+    @Autowired
+    NodeRepository nodeRepository;
+
+    public static String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
+        List<String> queryList = new ArrayList<>();
+
+        for (Map.Entry<String, String> entry : requestMap.entrySet()) {
+            queryList.add(entry.getKey() + "=" + URLEncoder.encode(
+                    entry.getValue(), UTF_8.toString()));
+        }
+        return StringUtils.join(queryList, "&");
+    }
+
+
+
+    public static TubeMQResult requestMaster(String url) throws Exception {
+
+        log.info("start to request {}", url);
+        HttpGet httpGet = new HttpGet(url);
+        TubeMQResult defaultResult = new TubeMQResult();
+
+        try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
+            TubeHttpResponse tubeResponse =
+                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+                            TubeHttpResponse.class);
+            if (tubeResponse.getCode() == 0 && tubeResponse.getErrCode() == 0) {
+                return defaultResult;
+            } else {
+                defaultResult = getErrorResult(tubeResponse.getErrMsg());
+            }
+        } catch (Exception ex) {
+            log.error("exception caught while requesting broker status", ex);
+            defaultResult = getErrorResult(ex.getMessage());
+        }
+        return defaultResult;
+    }
+
+
+
+
+    public TubeMQResult redirectToMaster(Map<String, String> queryBody) throws Exception {
+        int clusterId = Integer.parseInt(queryBody.get("clusterId"));
+        queryBody.remove("clusterId");
+        NodeEntry nodeEntry =
+                nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
+        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
+        return requestMaster(url);
+    }
+}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
index 935bb62..9b8ce64 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
@@ -30,13 +30,17 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.MvcResult;
 import org.springframework.test.web.servlet.RequestBuilder;
 
 import java.util.List;
+import java.util.Objects;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
@@ -96,14 +100,6 @@ public class TestNodeController {
                     "\"standby\":[],\"broker\":[]}}]," +
                     "\"errMsg\":\"\",\"errCode\":0,\"result\":true}";
 
-    private NodeEntry getNodeEntry() {
-        NodeEntry nodeEntry = new NodeEntry();
-        nodeEntry.setMaster(true);
-        nodeEntry.setIp("127.0.0.1");
-        nodeEntry.setWebPort(8084);
-        return nodeEntry;
-    }
-
     @Test
     public void testClusterInfo() throws Exception {
         List<NodeEntry> nodeEntries = getOneNodeEntry();
@@ -132,40 +128,4 @@ public class TestNodeController {
         log.info("result json string is {}, response type is {}", resultStr,
                 result.getResponse().getContentType());
     }
-
-    @Test
-    public void testAddBrokersToCluster() throws Exception {
-        String jsonStr = "{\n" +
-                "\t\"confModAuthToken\": \"abc\",\n" +
-                "\t\"createUser\": \"test\",\n" +
-                "\t\"clusterId\": 1,\n" +
-                "\t\"method\": \"admin_bath_add_broker_configure\",\n" +
-                "\t\"type\": \"op_modify\",\n" +
-                "\t\"brokerJsonSet\": [{\n" +
-                "\t\t\"brokerId\": 234,\n" +
-                "\t\t\"brokerIp\": \"127.0 .0 .1\",\n" +
-                "\t\t\"brokerPort\": 8124,\n" +
-                "\t\t\"numPartitions\": 3,\n" +
-                "\t\t\"unflushThreshold\": 55,\n" +
-                "\t\t\"unflushInterval\": 10000,\n" +
-                "\t\t\"deleteWhen\": \"0 0 6,18 * * ?\",\n" +
-                "\t\t\"deletePolicy\": \"delete,168\",\n" +
-                "\t\t\"acceptPublish\": \"true\",\n" +
-                "\t\t\"acceptSubscribe\": \"true\",\n" +
-                "\t\t\"createUser\": \"gosonzhang\",\n" +
-                "\t\t\"createDate\": \"20151116142135\",\n" +
-                "\t\t\"modifyUser\": \"gosonzhang\",\n" +
-                "\t\t\"modifyDate\": \"20151117161515\"\n" +
-                "\t}]\n" +
-                "\n" +
-                "}";
-        NodeEntry nodeEntry = getNodeEntry();
-        doReturn(nodeEntry).when(nodeRepository).findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class));
-        RequestBuilder request = post("/v1/node/add")
-                .contentType(MediaType.APPLICATION_JSON).content(jsonStr);
-        MvcResult result = mockMvc.perform(request).andReturn();
-        String resultStr = result.getResponse().getContentAsString();
-        log.info("result json string is {}, response type is {}", resultStr,
-                result.getResponse().getContentType());
-        }
-    }
+}