You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/09 03:55:45 UTC

[incubator-inlong] branch master updated: [INLONG-2353] Tube manager cluster adds support for multi-master configuration (#2360)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 98a2753  [INLONG-2353] Tube manager cluster adds support for multi-master configuration (#2360)
98a2753 is described below

commit 98a27539504fe303837dbd63b74fcec6c05dd9d9
Author: bluewang <88...@users.noreply.github.com>
AuthorDate: Wed Feb 9 11:55:42 2022 +0800

    [INLONG-2353] Tube manager cluster adds support for multi-master configuration (#2360)
    
    Co-authored-by: v_lizhwang <v_...@tencent.com>
---
 .../tubemq/manager/controller/TubeMQResult.java    |  6 +++++
 .../controller/cluster/ClusterController.java      | 12 ++++++---
 .../controller/cluster/request/AddClusterReq.java  |  7 +++--
 .../tubemq/manager/service/ClusterServiceImpl.java | 16 +++++++-----
 .../tubemq/manager/service/MasterServiceImpl.java  | 30 +++++++++++++++++-----
 .../manager/service/interfaces/MasterService.java  |  8 ++++++
 .../manager/controller/TestClusterController.java  |  6 +++--
 7 files changed, 65 insertions(+), 20 deletions(-)

diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java
index 3cfa0ee..38580b6 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/TubeMQResult.java
@@ -34,6 +34,8 @@ public class TubeMQResult {
     private boolean result = true;
     private Object data;
 
+    public static final int ERR_CODE = -1;
+
     private static Gson json = new Gson();
 
     public static TubeMQResult errorResult(String errorMsg) {
@@ -61,4 +63,8 @@ public class TubeMQResult {
                 .result(true).data(data).build();
     }
 
+    public boolean isError() {
+        return ERR_CODE == errCode;
+    }
+
 }
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
index 2c2b755..59c30d1 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
@@ -47,6 +47,8 @@ import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestController;
 
+import static org.apache.inlong.tubemq.manager.service.TubeConst.SUCCESS_CODE;
+
 @RestController
 @RequestMapping(path = "/v1/cluster")
 @Slf4j
@@ -96,10 +98,14 @@ public class ClusterController {
         if (!req.legal()) {
             return TubeMQResult.errorResult(TubeMQErrorConst.PARAM_ILLEGAL);
         }
-        TubeMQResult checkResult = masterService.checkMasterNodeStatus(req.getMasterIp(), req.getMasterWebPort());
-        if (checkResult.getErrCode() != TubeConst.SUCCESS_CODE) {
-            return TubeMQResult.errorResult("please check master ip and webPort");
+        List<String> masterIps = req.getMasterIps();
+        for (String masterIp : masterIps) {
+            TubeMQResult checkResult = masterService.checkMasterNodeStatus(masterIp, req.getMasterWebPort());
+            if (checkResult.getErrCode() != SUCCESS_CODE) {
+                return TubeMQResult.errorResult("please check master ip and webPort");
+            }
         }
+
         // 2. add cluster and master node
         clusterService.addClusterAndMasterNode(req);
         return new TubeMQResult();
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
index bef03dc..2eafbac 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
@@ -18,11 +18,14 @@
 package org.apache.inlong.tubemq.manager.controller.cluster.request;
 
 import lombok.Data;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.List;
+
 @Data
 public class AddClusterReq {
-    private String masterIp;
+    private List<String> masterIps;
     private String clusterName;
     private Integer masterPort;
     private Integer masterWebPort;
@@ -30,6 +33,6 @@ public class AddClusterReq {
     private String token;
 
     public boolean legal() {
-        return StringUtils.isNotBlank(masterIp) && masterPort != null && StringUtils.isNotBlank(token);
+        return CollectionUtils.isNotEmpty(masterIps) && masterPort != null && StringUtils.isNotBlank(token);
     }
 }
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
index 41b7016..83f2209 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
@@ -95,13 +95,15 @@ public class ClusterServiceImpl implements ClusterService {
         if (clusterEntry == null) {
             return;
         }
-        MasterEntry masterEntry = new MasterEntry();
-        masterEntry.setPort(req.getMasterPort());
-        masterEntry.setClusterId(clusterEntry.getClusterId());
-        masterEntry.setWebPort(req.getMasterWebPort());
-        masterEntry.setIp(req.getMasterIp());
-        masterEntry.setToken(req.getToken());
-        nodeService.addNode(masterEntry);
+        for (String masterIp : req.getMasterIps()) {
+            MasterEntry masterEntry = new MasterEntry();
+            masterEntry.setPort(req.getMasterPort());
+            masterEntry.setClusterId(clusterEntry.getClusterId());
+            masterEntry.setWebPort(req.getMasterWebPort());
+            masterEntry.setIp(masterIp);
+            masterEntry.setToken(req.getToken());
+            nodeService.addNode(masterEntry);
+        }
     }
 
 }
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
index ba3635c..917c1a4 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
@@ -23,9 +23,11 @@ import com.google.gson.Gson;
 
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -128,14 +130,30 @@ public class MasterServiceImpl implements MasterService {
         if (clusterId == null) {
             return null;
         }
-        MasterEntry master = masterRepository
-                .findMasterEntryByClusterIdEquals(
+        List<MasterEntry> masters = getMasterNodes(clusterId);
+
+        for (MasterEntry masterEntry : masters) {
+            if (!checkMasterNodeStatus(masterEntry.getIp(),
+                    masterEntry.getWebPort()).isError()) {
+                return masterEntry;
+            }
+        }
+
+        throw new RuntimeException("cluster id " + clusterId + "no master node, please check");
+    }
+
+    @Override
+    public List<MasterEntry> getMasterNodes(Long clusterId) {
+        if (clusterId == null) {
+            return null;
+        }
+        List<MasterEntry> masters = masterRepository
+                .findMasterEntriesByClusterIdEquals(
                         clusterId);
-        if (master == null) {
-            throw new RuntimeException("cluster id "
-                    + clusterId + "no master node, please check");
+        if (CollectionUtils.isEmpty(masters)) {
+            throw new RuntimeException("cluster id " + clusterId + "no master node, please check");
         }
-        return master;
+        return masters;
     }
 
     @Override
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
index 33cbf22..a7e57e5 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.tubemq.manager.service.interfaces;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.inlong.tubemq.manager.controller.TubeMQResult;
@@ -68,6 +69,13 @@ public interface MasterService {
     MasterEntry getMasterNode(Long clusterId);
 
     /**
+     * get master node in one cluster
+     * @param clusterId
+     * @return
+     */
+    List<MasterEntry> getMasterNodes(Long clusterId);
+
+    /**
      * use queryBody to generate queryUrl for master query
      *
      * @param queryBody
diff --git a/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java b/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
index 9ac7583..a813167 100644
--- a/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
+++ b/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
@@ -46,6 +46,8 @@ import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.MvcResult;
 import org.springframework.test.web.servlet.RequestBuilder;
 
+import java.util.Collections;
+
 @Slf4j
 @RunWith(SpringRunner.class)
 @SpringBootTest
@@ -157,7 +159,7 @@ public class TestClusterController {
 
         AddClusterReq req = new AddClusterReq();
         req.setClusterName("test");
-        req.setMasterIp("127.0.0.1");
+        req.setMasterIps(Collections.singletonList("127.0.0.1"));
         req.setMasterWebPort(8080);
         req.setMasterPort(8089);
         req.setToken("abc");
@@ -172,7 +174,7 @@ public class TestClusterController {
                 .contentType(MediaType.APPLICATION_JSON).content(gson.toJson(req));
         MvcResult result = mockMvc.perform(request).andReturn();
         String resultStr = result.getResponse().getContentAsString();
-        String expectRes = "{\"errMsg\":\"\",\"errCode\":0,\"result\":true,\"data\":null}";
+        String expectRes = "{\"errMsg\":\"\",\"errCode\":0,\"result\":true,\"data\":null,\"error\":false}";
         Assert.assertEquals(resultStr, expectRes);
     }
 }