You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/24 10:53:27 UTC
[incubator-tubemq] 02/02: [TUBEMQ-478] delete\reload\online brokers
in cluster
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 5c69de9b540150de1d40a58363805fc259dffceb
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Thu Dec 24 11:56:39 2020 +0800
[TUBEMQ-478] delete\reload\online brokers in cluster
---
.../tubemq/manager/controller/TubeMQResult.java | 2 +-
.../controller/cluster/ClusterController.java | 53 ++---------
.../manager/controller/node/NodeController.java | 82 +++++++---------
.../apache/tubemq/manager/service/NodeService.java | 7 --
.../apache/tubemq/manager/utils/MasterUtils.java | 103 +++++++++++++++++++++
.../manager/controller/TestNodeController.java | 50 +---------
6 files changed, 149 insertions(+), 148 deletions(-)
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
index 939d0a8..6e4a970 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
@@ -32,7 +32,7 @@ public class TubeMQResult {
private boolean result = true;
public static TubeMQResult getErrorResult(String errorMsg) {
- return TubeMQResult.builder().errCode(1)
+ return TubeMQResult.builder().errCode(-1)
.errMsg(errorMsg).result(false).build();
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
index 53e1aa3..4b58313 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -17,24 +17,17 @@
package org.apache.tubemq.manager.controller.cluster;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static org.apache.tubemq.manager.utils.MasterUtils.*;
import com.google.gson.Gson;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.utils.MasterUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestBody;
@@ -49,52 +42,18 @@ import org.springframework.web.bind.annotation.RestController;
@Slf4j
public class ClusterController {
- private final CloseableHttpClient httpclient = HttpClients.createDefault();
private final Gson gson = new Gson();
-
- private static final String TUBE_REQUEST_PATH = "webapi.htm";
-
@Autowired
private NodeRepository nodeRepository;
-
- private String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
- List<String> queryList = new ArrayList<>();
-
- for (Map.Entry<String, String> entry : requestMap.entrySet()) {
- queryList.add(entry.getKey() + "=" + URLEncoder.encode(
- entry.getValue(), UTF_8.toString()));
- }
- return StringUtils.join(queryList, "&");
- }
-
- private String queryMaster(String url) {
- log.info("start to request {}", url);
- HttpGet httpGet = new HttpGet(url);
- TubeMQResult defaultResult = new TubeMQResult();
- try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
- // return result json to response
- return EntityUtils.toString(response.getEntity());
- } catch (Exception ex) {
- log.error("exception caught while requesting broker status", ex);
- defaultResult.setErrCode(-1);
- defaultResult.setResult(false);
- defaultResult.setErrMsg(ex.getMessage());
- }
- return gson.toJson(defaultResult);
- }
+ @Autowired
+ public MasterUtils masterUtil;
@RequestMapping(value = "/query", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody String queryInfo(
@RequestParam Map<String, String> queryBody) throws Exception {
- int clusterId = Integer.parseInt(queryBody.get("clusterId"));
- queryBody.remove("clusterId");
- NodeEntry nodeEntry =
- nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
- String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
- return queryMaster(url);
+ return gson.toJson(masterUtil.redirectToMaster(queryBody));
}
/**
@@ -114,7 +73,7 @@ public class ClusterController {
clusterId);
String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(requestBody);
- return queryMaster(url);
+ return gson.toJson(requestMaster(url));
} else {
TubeMQResult result = new TubeMQResult();
result.setErrCode(-1);
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 7b108e7..8a6e586 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -20,28 +20,27 @@ package org.apache.tubemq.manager.controller.node;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
import org.apache.tubemq.manager.controller.node.request.BrokerConf;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
+import org.apache.tubemq.manager.utils.MasterUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
-import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+import static org.apache.tubemq.manager.utils.MasterUtils.*;
@RestController
@RequestMapping(path = "/v1/node")
@@ -51,7 +50,6 @@ public class NodeController {
public static final String NO_SUCH_METHOD = "no such method";
public static final String OP_QUERY = "op_query";
public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info";
- public static final String TUBE_REQUEST_PATH = "webapi.htm";
private final Gson gson = new Gson();
private static final CloseableHttpClient httpclient = HttpClients.createDefault();
@@ -61,6 +59,8 @@ public class NodeController {
@Autowired
NodeRepository nodeRepository;
+ @Autowired
+ MasterUtils masterUtil;
@RequestMapping(value = "/query", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
@@ -71,7 +71,7 @@ public class NodeController {
return nodeService.queryClusterInfo(clusterId);
}
- return gson.toJson(TubeMQResult.getErrorResult(NO_SUCH_METHOD));
+ return gson.toJson(getErrorResult(NO_SUCH_METHOD));
}
@@ -88,7 +88,8 @@ public class NodeController {
if (StringUtils.isNotBlank(token)) {
NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
clusterId);
- return addToMasterAndRepo(req, masterEntry);
+ TubeMQResult result = addBrokersToCluster(req, masterEntry);
+ return gson.toJson(result);
} else {
TubeMQResult result = new TubeMQResult();
result.setErrCode(-1);
@@ -99,53 +100,38 @@ public class NodeController {
}
- private String addToMasterAndRepo(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
- String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+ /**
+ * add brokers to cluster, need to check token and
+ * make sure user has authorization to modify it.
+ */
+ @RequestMapping(value = "/online", method = RequestMethod.GET)
+ public @ResponseBody String onlineBrokers(
+ @RequestParam Map<String, String> queryBody) throws Exception {
+ return gson.toJson(masterUtil.redirectToMaster(queryBody));
+ }
- log.info("start to request {}", url);
- HttpGet httpGet = new HttpGet(url);
- TubeMQResult defaultResult = new TubeMQResult();
-
- try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
- TubeHttpResponse result =
- gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
- TubeHttpResponse.class);
- if (result.getCode() == 0 && result.getErrCode() == 0) {
- // save brokers to db when success
- saveAllBrokers(req.getBrokerJsonSet(), req.getClusterId(), masterEntry);
- } else {
- return result.getErrMsg();
- }
- } catch (Exception ex) {
- log.error("exception caught while requesting broker status", ex);
- defaultResult.setErrCode(-1);
- defaultResult.setResult(false);
- defaultResult.setErrMsg(ex.getMessage());
- }
- return gson.toJson(defaultResult);
+ @RequestMapping(value = "/reload", method = RequestMethod.GET)
+ public @ResponseBody String reloadBrokers(
+ @RequestParam Map<String, String> queryBody) throws Exception {
+ return gson.toJson(masterUtil.redirectToMaster(queryBody));
}
+ @RequestMapping(value = "/delete", method = RequestMethod.GET)
+ public @ResponseBody String deleteBrokers(
+ @RequestParam Map<String, String> queryBody) throws Exception {
+ TubeMQResult result = masterUtil.redirectToMaster(queryBody);
+ return gson.toJson(result);
+ }
-
- private void saveAllBrokers(List<BrokerConf> brokerConfList, int clusterId, NodeEntry masterEntry) {
- List<NodeEntry> nodeEntries = new ArrayList<>();
- for (BrokerConf brokerConf : brokerConfList) {
- NodeEntry node = new NodeEntry();
- node.setBroker(true);
- node.setClusterId(clusterId);
- node.setClusterName(masterEntry.getClusterName());
- node.setBrokerId(brokerConf.getBrokerId());
- node.setMaster(false);
- node.setIp(brokerConf.getBrokerIp());
- node.setStandby(false);
- node.setPort(brokerConf.getBrokerPort());
- nodeEntries.add(node);
- }
- nodeService.saveNodes(nodeEntries);
+ private TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
+ String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+ + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+ TubeMQResult tubeMQResult = requestMaster(url);
+ return tubeMQResult;
}
+
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index a9d39b3..836fe8a 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -293,13 +293,6 @@ public class NodeService {
- public void saveNodes(List<NodeEntry> nodes) {
- nodeRepository.saveAll(nodes);
- }
-
-
-
-
public void close() throws IOException {
httpclient.close();
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
new file mode 100644
index 0000000..95def0c
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tubemq.manager.utils;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.InputStreamReader;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+
+
+@Slf4j
+@Component
+public class MasterUtils {
+
+ private static CloseableHttpClient httpclient = HttpClients.createDefault();
+ private static Gson gson = new Gson();
+ public static final String TUBE_REQUEST_PATH = "webapi.htm";
+
+ @Autowired
+ NodeRepository nodeRepository;
+
+ public static String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
+ List<String> queryList = new ArrayList<>();
+
+ for (Map.Entry<String, String> entry : requestMap.entrySet()) {
+ queryList.add(entry.getKey() + "=" + URLEncoder.encode(
+ entry.getValue(), UTF_8.toString()));
+ }
+ return StringUtils.join(queryList, "&");
+ }
+
+
+
+ public static TubeMQResult requestMaster(String url) throws Exception {
+
+ log.info("start to request {}", url);
+ HttpGet httpGet = new HttpGet(url);
+ TubeMQResult defaultResult = new TubeMQResult();
+
+ try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
+ TubeHttpResponse tubeResponse =
+ gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+ TubeHttpResponse.class);
+ if (tubeResponse.getCode() == 0 && tubeResponse.getErrCode() == 0) {
+ return defaultResult;
+ } else {
+ defaultResult = getErrorResult(tubeResponse.getErrMsg());
+ }
+ } catch (Exception ex) {
+ log.error("exception caught while requesting broker status", ex);
+ defaultResult = getErrorResult(ex.getMessage());
+ }
+ return defaultResult;
+ }
+
+
+
+
+ public TubeMQResult redirectToMaster(Map<String, String> queryBody) throws Exception {
+ int clusterId = Integer.parseInt(queryBody.get("clusterId"));
+ queryBody.remove("clusterId");
+ NodeEntry nodeEntry =
+ nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
+ String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
+ return requestMaster(url);
+ }
+}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
index 935bb62..9b8ce64 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
@@ -30,13 +30,17 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.RequestBuilder;
import java.util.List;
+import java.util.Objects;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
@@ -96,14 +100,6 @@ public class TestNodeController {
"\"standby\":[],\"broker\":[]}}]," +
"\"errMsg\":\"\",\"errCode\":0,\"result\":true}";
- private NodeEntry getNodeEntry() {
- NodeEntry nodeEntry = new NodeEntry();
- nodeEntry.setMaster(true);
- nodeEntry.setIp("127.0.0.1");
- nodeEntry.setWebPort(8084);
- return nodeEntry;
- }
-
@Test
public void testClusterInfo() throws Exception {
List<NodeEntry> nodeEntries = getOneNodeEntry();
@@ -132,40 +128,4 @@ public class TestNodeController {
log.info("result json string is {}, response type is {}", resultStr,
result.getResponse().getContentType());
}
-
- @Test
- public void testAddBrokersToCluster() throws Exception {
- String jsonStr = "{\n" +
- "\t\"confModAuthToken\": \"abc\",\n" +
- "\t\"createUser\": \"test\",\n" +
- "\t\"clusterId\": 1,\n" +
- "\t\"method\": \"admin_bath_add_broker_configure\",\n" +
- "\t\"type\": \"op_modify\",\n" +
- "\t\"brokerJsonSet\": [{\n" +
- "\t\t\"brokerId\": 234,\n" +
- "\t\t\"brokerIp\": \"127.0 .0 .1\",\n" +
- "\t\t\"brokerPort\": 8124,\n" +
- "\t\t\"numPartitions\": 3,\n" +
- "\t\t\"unflushThreshold\": 55,\n" +
- "\t\t\"unflushInterval\": 10000,\n" +
- "\t\t\"deleteWhen\": \"0 0 6,18 * * ?\",\n" +
- "\t\t\"deletePolicy\": \"delete,168\",\n" +
- "\t\t\"acceptPublish\": \"true\",\n" +
- "\t\t\"acceptSubscribe\": \"true\",\n" +
- "\t\t\"createUser\": \"gosonzhang\",\n" +
- "\t\t\"createDate\": \"20151116142135\",\n" +
- "\t\t\"modifyUser\": \"gosonzhang\",\n" +
- "\t\t\"modifyDate\": \"20151117161515\"\n" +
- "\t}]\n" +
- "\n" +
- "}";
- NodeEntry nodeEntry = getNodeEntry();
- doReturn(nodeEntry).when(nodeRepository).findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class));
- RequestBuilder request = post("/v1/node/add")
- .contentType(MediaType.APPLICATION_JSON).content(jsonStr);
- MvcResult result = mockMvc.perform(request).andReturn();
- String resultStr = result.getResponse().getContentAsString();
- log.info("result json string is {}, response type is {}", resultStr,
- result.getResponse().getContentType());
- }
- }
+}