You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/07 01:41:02 UTC

[incubator-inlong] branch master updated: [INLONG-3514][TubeMQ] Cluster query adds name and IP attributes (#3522)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 82db828e2 [INLONG-3514][TubeMQ] Cluster query adds name and IP attributes (#3522)
82db828e2 is described below

commit 82db828e26154fcefe537e3741f9c283d49845df
Author: bluewang <88...@users.noreply.github.com>
AuthorDate: Thu Apr 7 09:40:58 2022 +0800

    [INLONG-3514][TubeMQ] Cluster query adds name and IP attributes (#3522)
---
 .../controller/cluster/ClusterController.java      | 81 +++++++++++++++++-----
 .../manager/controller/cluster/vo/ClusterVo.java   |  5 +-
 .../manager/repository/ClusterRepository.java      |  7 ++
 .../manager/repository/MasterRepository.java       |  8 +++
 .../tubemq/manager/service/ClusterServiceImpl.java |  6 ++
 .../tubemq/manager/service/MasterServiceImpl.java  | 19 +++--
 .../manager/service/interfaces/ClusterService.java | 10 +++
 .../manager/service/interfaces/MasterService.java  |  7 ++
 .../inlong/tubemq/manager/utils/ConvertUtils.java  |  4 +-
 9 files changed, 122 insertions(+), 25 deletions(-)

diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
index 357b5d3f2..5530e4691 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
@@ -43,6 +43,7 @@ import org.apache.inlong.tubemq.manager.service.interfaces.MasterService;
 import org.apache.inlong.tubemq.manager.utils.ConvertUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
+import org.springframework.util.CollectionUtils;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -62,6 +63,7 @@ import static org.apache.inlong.tubemq.manager.service.TubeConst.SUCCESS_CODE;
 public class ClusterController {
 
     private final Gson gson = new Gson();
+    private final TubeMQResult result = new TubeMQResult();
 
     @Autowired
     private ClusterService clusterService;
@@ -127,22 +129,30 @@ public class ClusterController {
      */
     @RequestMapping(value = "", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
-    public TubeMQResult queryCluster(@RequestParam(required = false) Integer clusterId) {
-        // return all clusters if no clusterId passed
-        if (clusterId == null) {
+    public TubeMQResult queryCluster(@RequestParam(required = false) Integer clusterId,
+            @RequestParam(required = false) String clusterName, @RequestParam(required = false) String masterIp) {
+        TubeMQResult result = new TubeMQResult();
+        if (clusterId == null && clusterName == null && masterIp == null) {
             return queryAllClusterVo();
         }
-
-        ClusterEntry clusterEntry = clusterService.getOneCluster(clusterId);
-        if (clusterEntry == null) {
-            return TubeMQResult.errorResult("no such cluster with id " + clusterId);
+        if (clusterId != null) {
+            ClusterEntry clusterEntry = clusterService.getOneCluster(clusterId);
+            if (clusterEntry == null) {
+                return TubeMQResult.errorResult("no such cluster with id " + clusterId);
+            }
+            List<MasterEntry> masterNodes = masterService.getMasterNodes(clusterEntry.getClusterId());
+            ClusterVo allCount = getCountInCluster(clusterId);
+            result.setData(Lists.newArrayList(ConvertUtils.convertToClusterVo(clusterEntry, masterNodes, allCount)));
+            return  result;
+        }
+        if (clusterName != null) {
+            result = queryClusterByClusterName(clusterName);
+            return result;
+        }
+        if (masterIp != null) {
+            result = queryClusterByMasterIp(masterIp);
+            return result;
         }
-
-        MasterEntry masterNode = masterService.getMasterNode(clusterEntry.getClusterId());
-
-        ClusterVo allCount = getAllCount(clusterId);
-        TubeMQResult result = new TubeMQResult();
-        result.setData(Lists.newArrayList(ConvertUtils.convertToClusterVo(clusterEntry, masterNode, allCount)));
         return result;
     }
 
@@ -156,9 +166,9 @@ public class ClusterController {
         List<ClusterEntry> allClusters = clusterService.getAllClusters();
         List<ClusterVo> clusterVos = Lists.newArrayList();
         for (ClusterEntry cluster : allClusters) {
-            MasterEntry masterNode = masterService.getMasterNode(cluster.getClusterId());
-            ClusterVo allCount = getAllCount(Integer.valueOf((int) cluster.getClusterId()));
-            ClusterVo clusterVo = ConvertUtils.convertToClusterVo(cluster, masterNode, allCount);
+            List<MasterEntry> masterNodes = masterService.getMasterNodes(cluster.getClusterId());
+            ClusterVo allCount = getCountInCluster(Integer.valueOf((int) cluster.getClusterId()));
+            ClusterVo clusterVo = ConvertUtils.convertToClusterVo(cluster, masterNodes, allCount);
             clusterVos.add(clusterVo);
         }
         result.setData(clusterVos);
@@ -199,7 +209,7 @@ public class ClusterController {
      *
      * @param clusterId
      */
-    public ClusterVo getAllCount(Integer clusterId) {
+    public ClusterVo getCountInCluster(Integer clusterId) {
         ClusterVo clusterVo = new ClusterVo();
         int brokerSize = getBrokerSize(clusterId);
         ClusterVo countVo = getTopicAndPartitionCount(clusterId);
@@ -314,4 +324,41 @@ public class ClusterController {
         return storeCount;
     }
 
+    /**
+     * query cluster by cluster name
+     */
+    public TubeMQResult queryClusterByClusterName(String clusterName) {
+        ClusterEntry clusterEntry = clusterService.getOneCluster(clusterName);
+        if (clusterEntry == null) {
+            return TubeMQResult.errorResult("no such cluster with name " + clusterName);
+        }
+        List<MasterEntry> masterNodes = masterService.getMasterNodes(clusterEntry.getClusterId());
+        ClusterVo allCount = getCountInCluster((int) clusterEntry.getClusterId());
+        result.setData(Lists.newArrayList(ConvertUtils.convertToClusterVo(clusterEntry, masterNodes, allCount)));
+        return result;
+    }
+
+    /**
+     * query cluster by cluster masterIp
+     */
+    public TubeMQResult queryClusterByMasterIp(String masterIp) {
+        List<ClusterEntry> clusterEntryList = Lists.newArrayList();
+        List<MasterEntry> masterNodes = masterService.getMasterNodes(masterIp);
+        if (CollectionUtils.isEmpty(masterNodes)) {
+            return TubeMQResult.errorResult("no such cluster with ip " + masterIp);
+        }
+        for (MasterEntry masterNode : masterNodes) {
+            ClusterEntry cluster = clusterService.getOneCluster(masterNode.getClusterId());
+            clusterEntryList.add(cluster);
+        }
+        List<ClusterVo> clusterVos = Lists.newArrayList();
+        for (ClusterEntry clusterEntry : clusterEntryList) {
+            ClusterVo allCount = getCountInCluster((int) clusterEntry.getClusterId());
+            ClusterVo clusterVo = ConvertUtils.convertToClusterVo(clusterEntry, masterNodes, allCount);
+            clusterVos.add(clusterVo);
+        }
+        result.setData(clusterVos);
+        return result;
+    }
+
 }
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/vo/ClusterVo.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/vo/ClusterVo.java
index 500873c37..8f74a7d3f 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/vo/ClusterVo.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/vo/ClusterVo.java
@@ -19,13 +19,16 @@ package org.apache.inlong.tubemq.manager.controller.cluster.vo;
 
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.inlong.tubemq.manager.entry.MasterEntry;
+
+import java.util.List;
 
 @Data
 @NoArgsConstructor
 public class ClusterVo {
     private Long clusterId;
     private String clusterName;
-    private String masterIp;
+    private List<MasterEntry> masterEntries;
     private int reloadBrokerSize;
     private int brokerCount;
     private int topicCount;
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/ClusterRepository.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/ClusterRepository.java
index 274dd1e38..c0b6745d2 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/ClusterRepository.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/ClusterRepository.java
@@ -37,4 +37,11 @@ public interface ClusterRepository extends JpaRepository<ClusterEntry, Long> {
      * @return
      */
     Integer deleteByClusterId(Long clusterId);
+
+    /**
+     * find clusterEntry by clusterName
+     * @param clusterName
+     * @return
+     */
+    ClusterEntry findClusterEntryByClusterName(String clusterName);
 }
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/MasterRepository.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/MasterRepository.java
index 862ebe711..e0b653664 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/MasterRepository.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/MasterRepository.java
@@ -48,4 +48,12 @@ public interface MasterRepository extends JpaRepository<MasterEntry, Long> {
      * @return
      */
     List<MasterEntry> findAll();
+
+    /**
+     *
+     * find all nodes in ip
+     *
+     * @return
+     */
+    List<MasterEntry> findMasterEntryByIpEquals(String masterIp);
 }
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
index 1a0d1720d..4fcd2963b 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
@@ -76,6 +76,12 @@ public class ClusterServiceImpl implements ClusterService {
                 .findClusterEntryByClusterId(clusterId);
     }
 
+    @Override
+    public ClusterEntry getOneCluster(String clusterName) {
+        return clusterRepository
+                .findClusterEntryByClusterName(clusterName);
+    }
+
     @Override
     public List<ClusterEntry> getAllClusters() {
         return clusterRepository.findAll();
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
index f9ebd183d..3d6379544 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
@@ -147,15 +147,25 @@ public class MasterServiceImpl implements MasterService {
         if (clusterId == null) {
             return null;
         }
-        List<MasterEntry> masters = masterRepository
-                .findMasterEntriesByClusterIdEquals(
-                        clusterId);
+        List<MasterEntry> masters = masterRepository.findMasterEntriesByClusterIdEquals(clusterId);
         if (CollectionUtils.isEmpty(masters)) {
             throw new RuntimeException("cluster id " + clusterId + "no master node, please check");
         }
         return masters;
     }
 
+    @Override
+    public List<MasterEntry> getMasterNodes(String masterIp) {
+        if (masterIp == null) {
+            return null;
+        }
+        List<MasterEntry> masters = masterRepository.findMasterEntryByIpEquals(masterIp);
+        if (CollectionUtils.isEmpty(masters)) {
+            throw new RuntimeException("master ip " + masterIp + "no master node, please check");
+        }
+        return masters;
+    }
+
     @Override
     public String getQueryUrl(Map<String, String> queryBody) throws Exception {
         int clusterId = Integer.parseInt(queryBody.get("clusterId"));
@@ -174,8 +184,7 @@ public class MasterServiceImpl implements MasterService {
 
     @Override
     public String getQueryCountUrl(Integer clusterId, String method) {
-        MasterEntry masterEntry =
-                masterRepository.findMasterEntryByClusterIdEquals(clusterId);
+        MasterEntry masterEntry = getMasterNode(Long.valueOf(clusterId));
         return TubeConst.SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
                 + method + "&" + "clusterId=" + clusterId;
     }
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/ClusterService.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/ClusterService.java
index a1c755489..a9a1d2c41 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/ClusterService.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/ClusterService.java
@@ -51,6 +51,15 @@ public interface ClusterService {
      */
     ClusterEntry getOneCluster(long clusterId);
 
+    /**
+     *
+     * get one cluster by cluster name
+     *
+     * @param clusterName
+     * @return
+     */
+    ClusterEntry getOneCluster(String clusterName);
+
     /**
      * get all clusters
      *
@@ -65,4 +74,5 @@ public interface ClusterService {
      * @return
      */
     TubeMQResult modifyCluster(ClusterDto clusterDto);
+
 }
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
index c94b7af5e..0ac583b24 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
@@ -75,6 +75,13 @@ public interface MasterService {
      */
     List<MasterEntry> getMasterNodes(Long clusterId);
 
+    /**
+     * get master in master ip
+     * @param masterIp
+     * @return
+     */
+    List<MasterEntry> getMasterNodes(String masterIp);
+
     /**
      * use queryBody to generate queryUrl for master query
      *
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/utils/ConvertUtils.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/utils/ConvertUtils.java
index 65b7baeff..14cbb9cb4 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/utils/ConvertUtils.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/utils/ConvertUtils.java
@@ -112,10 +112,10 @@ public class ConvertUtils {
     }
 
     public static ClusterVo convertToClusterVo(ClusterEntry clusterEntry,
-                                               MasterEntry masterEntry, ClusterVo clusterVo) {
+                                               List<MasterEntry> masterEntries, ClusterVo clusterVo) {
         ClusterVo cluster = new ClusterVo();
         cluster.setClusterId(clusterEntry.getClusterId());
-        cluster.setMasterIp(masterEntry.getIp());
+        cluster.setMasterEntries(masterEntries);
         cluster.setClusterName(clusterEntry.getClusterName());
         cluster.setReloadBrokerSize(clusterEntry.getReloadBrokerSize());
         cluster.setBrokerCount(clusterVo.getBrokerCount());