You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/09 03:55:45 UTC
[incubator-inlong] branch master updated: [INLONG-2353] Tube manager cluster adds support for multi-master configuration (#2360)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 98a2753 [INLONG-2353] Tube manager cluster adds support for multi-master configuration (#2360)
98a2753 is described below
commit 98a27539504fe303837dbd63b74fcec6c05dd9d9
Author: bluewang <88...@users.noreply.github.com>
AuthorDate: Wed Feb 9 11:55:42 2022 +0800
[INLONG-2353] Tube manager cluster adds support for multi-master configuration (#2360)
Co-authored-by: v_lizhwang <v_...@tencent.com>
---
.../tubemq/manager/controller/TubeMQResult.java | 6 +++++
.../controller/cluster/ClusterController.java | 12 ++++++---
.../controller/cluster/request/AddClusterReq.java | 7 +++--
.../tubemq/manager/service/ClusterServiceImpl.java | 16 +++++++-----
.../tubemq/manager/service/MasterServiceImpl.java | 30 +++++++++++++++++-----
.../manager/service/interfaces/MasterService.java | 8 ++++++
.../manager/controller/TestClusterController.java | 6 +++--
7 files changed, 65 insertions(+), 20 deletions(-)
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java
index 3cfa0ee..38580b6 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java
@@ -34,6 +34,8 @@ public class TubeMQResult {
private boolean result = true;
private Object data;
+ public static final int ERR_CODE = -1;
+
private static Gson json = new Gson();
public static TubeMQResult errorResult(String errorMsg) {
@@ -61,4 +63,8 @@ public class TubeMQResult {
.result(true).data(data).build();
}
+ public boolean isError() {
+ return ERR_CODE == errCode;
+ }
+
}
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
index 2c2b755..59c30d1 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
@@ -47,6 +47,8 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
+import static org.apache.inlong.tubemq.manager.service.TubeConst.SUCCESS_CODE;
+
@RestController
@RequestMapping(path = "/v1/cluster")
@Slf4j
@@ -96,10 +98,14 @@ public class ClusterController {
if (!req.legal()) {
return TubeMQResult.errorResult(TubeMQErrorConst.PARAM_ILLEGAL);
}
- TubeMQResult checkResult = masterService.checkMasterNodeStatus(req.getMasterIp(), req.getMasterWebPort());
- if (checkResult.getErrCode() != TubeConst.SUCCESS_CODE) {
- return TubeMQResult.errorResult("please check master ip and webPort");
+ List<String> masterIps = req.getMasterIps();
+ for (String masterIp : masterIps) {
+ TubeMQResult checkResult = masterService.checkMasterNodeStatus(masterIp, req.getMasterWebPort());
+ if (checkResult.getErrCode() != SUCCESS_CODE) {
+ return TubeMQResult.errorResult("please check master ip and webPort");
+ }
}
+
// 2. add cluster and master node
clusterService.addClusterAndMasterNode(req);
return new TubeMQResult();
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
index bef03dc..2eafbac 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
@@ -18,11 +18,14 @@
package org.apache.inlong.tubemq.manager.controller.cluster.request;
import lombok.Data;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import java.util.List;
+
@Data
public class AddClusterReq {
- private String masterIp;
+ private List<String> masterIps;
private String clusterName;
private Integer masterPort;
private Integer masterWebPort;
@@ -30,6 +33,6 @@ public class AddClusterReq {
private String token;
public boolean legal() {
- return StringUtils.isNotBlank(masterIp) && masterPort != null && StringUtils.isNotBlank(token);
+ return CollectionUtils.isNotEmpty(masterIps) && masterPort != null && StringUtils.isNotBlank(token);
}
}
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
index 41b7016..83f2209 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
@@ -95,13 +95,15 @@ public class ClusterServiceImpl implements ClusterService {
if (clusterEntry == null) {
return;
}
- MasterEntry masterEntry = new MasterEntry();
- masterEntry.setPort(req.getMasterPort());
- masterEntry.setClusterId(clusterEntry.getClusterId());
- masterEntry.setWebPort(req.getMasterWebPort());
- masterEntry.setIp(req.getMasterIp());
- masterEntry.setToken(req.getToken());
- nodeService.addNode(masterEntry);
+ for (String masterIp : req.getMasterIps()) {
+ MasterEntry masterEntry = new MasterEntry();
+ masterEntry.setPort(req.getMasterPort());
+ masterEntry.setClusterId(clusterEntry.getClusterId());
+ masterEntry.setWebPort(req.getMasterWebPort());
+ masterEntry.setIp(masterIp);
+ masterEntry.setToken(req.getToken());
+ nodeService.addNode(masterEntry);
+ }
}
}
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
index ba3635c..917c1a4 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
@@ -23,9 +23,11 @@ import com.google.gson.Gson;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -128,14 +130,30 @@ public class MasterServiceImpl implements MasterService {
if (clusterId == null) {
return null;
}
- MasterEntry master = masterRepository
- .findMasterEntryByClusterIdEquals(
+ List<MasterEntry> masters = getMasterNodes(clusterId);
+
+ for (MasterEntry masterEntry : masters) {
+ if (!checkMasterNodeStatus(masterEntry.getIp(),
+ masterEntry.getWebPort()).isError()) {
+ return masterEntry;
+ }
+ }
+
+ throw new RuntimeException("cluster id " + clusterId + "no master node, please check");
+ }
+
+ @Override
+ public List<MasterEntry> getMasterNodes(Long clusterId) {
+ if (clusterId == null) {
+ return null;
+ }
+ List<MasterEntry> masters = masterRepository
+ .findMasterEntriesByClusterIdEquals(
clusterId);
- if (master == null) {
- throw new RuntimeException("cluster id "
- + clusterId + "no master node, please check");
+ if (CollectionUtils.isEmpty(masters)) {
+ throw new RuntimeException("cluster id " + clusterId + "no master node, please check");
}
- return master;
+ return masters;
}
@Override
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
index 33cbf22..a7e57e5 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
@@ -17,6 +17,7 @@
package org.apache.inlong.tubemq.manager.service.interfaces;
+import java.util.List;
import java.util.Map;
import org.apache.inlong.tubemq.manager.controller.TubeMQResult;
@@ -68,6 +69,13 @@ public interface MasterService {
MasterEntry getMasterNode(Long clusterId);
/**
+ * get master node in one cluster
+ * @param clusterId
+ * @return
+ */
+ List<MasterEntry> getMasterNodes(Long clusterId);
+
+ /**
* use queryBody to generate queryUrl for master query
*
* @param queryBody
diff --git a/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java b/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
index 9ac7583..a813167 100644
--- a/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
+++ b/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
@@ -46,6 +46,8 @@ import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.RequestBuilder;
+import java.util.Collections;
+
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@@ -157,7 +159,7 @@ public class TestClusterController {
AddClusterReq req = new AddClusterReq();
req.setClusterName("test");
- req.setMasterIp("127.0.0.1");
+ req.setMasterIps(Collections.singletonList("127.0.0.1"));
req.setMasterWebPort(8080);
req.setMasterPort(8089);
req.setToken("abc");
@@ -172,7 +174,7 @@ public class TestClusterController {
.contentType(MediaType.APPLICATION_JSON).content(gson.toJson(req));
MvcResult result = mockMvc.perform(request).andReturn();
String resultStr = result.getResponse().getContentAsString();
- String expectRes = "{\"errMsg\":\"\",\"errCode\":0,\"result\":true,\"data\":null}";
+ String expectRes = "{\"errMsg\":\"\",\"errCode\":0,\"result\":true,\"data\":null,\"error\":false}";
Assert.assertEquals(resultStr, expectRes);
}
}