You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/07 01:41:02 UTC
[incubator-inlong] branch master updated: [INLONG-3514][TubeMQ] Cluster query adds name and IP attributes (#3522)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 82db828e2 [INLONG-3514][TubeMQ] Cluster query adds name and IP attributes (#3522)
82db828e2 is described below
commit 82db828e26154fcefe537e3741f9c283d49845df
Author: bluewang <88...@users.noreply.github.com>
AuthorDate: Thu Apr 7 09:40:58 2022 +0800
[INLONG-3514][TubeMQ] Cluster query adds name and IP attributes (#3522)
---
.../controller/cluster/ClusterController.java | 81 +++++++++++++++++-----
.../manager/controller/cluster/vo/ClusterVo.java | 5 +-
.../manager/repository/ClusterRepository.java | 7 ++
.../manager/repository/MasterRepository.java | 8 +++
.../tubemq/manager/service/ClusterServiceImpl.java | 6 ++
.../tubemq/manager/service/MasterServiceImpl.java | 19 +++--
.../manager/service/interfaces/ClusterService.java | 10 +++
.../manager/service/interfaces/MasterService.java | 7 ++
.../inlong/tubemq/manager/utils/ConvertUtils.java | 4 +-
9 files changed, 122 insertions(+), 25 deletions(-)
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
index 357b5d3f2..5530e4691 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
@@ -43,6 +43,7 @@ import org.apache.inlong.tubemq.manager.service.interfaces.MasterService;
import org.apache.inlong.tubemq.manager.utils.ConvertUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
+import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -62,6 +63,7 @@ import static org.apache.inlong.tubemq.manager.service.TubeConst.SUCCESS_CODE;
public class ClusterController {
private final Gson gson = new Gson();
+ private final TubeMQResult result = new TubeMQResult();
@Autowired
private ClusterService clusterService;
@@ -127,22 +129,30 @@ public class ClusterController {
*/
@RequestMapping(value = "", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
- public TubeMQResult queryCluster(@RequestParam(required = false) Integer clusterId) {
- // return all clusters if no clusterId passed
- if (clusterId == null) {
+ public TubeMQResult queryCluster(@RequestParam(required = false) Integer clusterId,
+ @RequestParam(required = false) String clusterName, @RequestParam(required = false) String masterIp) {
+ TubeMQResult result = new TubeMQResult();
+ if (clusterId == null && clusterName == null && masterIp == null) {
return queryAllClusterVo();
}
-
- ClusterEntry clusterEntry = clusterService.getOneCluster(clusterId);
- if (clusterEntry == null) {
- return TubeMQResult.errorResult("no such cluster with id " + clusterId);
+ if (clusterId != null) {
+ ClusterEntry clusterEntry = clusterService.getOneCluster(clusterId);
+ if (clusterEntry == null) {
+ return TubeMQResult.errorResult("no such cluster with id " + clusterId);
+ }
+ List<MasterEntry> masterNodes = masterService.getMasterNodes(clusterEntry.getClusterId());
+ ClusterVo allCount = getCountInCluster(clusterId);
+ result.setData(Lists.newArrayList(ConvertUtils.convertToClusterVo(clusterEntry, masterNodes, allCount)));
+ return result;
+ }
+ if (clusterName != null) {
+ result = queryClusterByClusterName(clusterName);
+ return result;
+ }
+ if (masterIp != null) {
+ result = queryClusterByMasterIp(masterIp);
+ return result;
}
-
- MasterEntry masterNode = masterService.getMasterNode(clusterEntry.getClusterId());
-
- ClusterVo allCount = getAllCount(clusterId);
- TubeMQResult result = new TubeMQResult();
- result.setData(Lists.newArrayList(ConvertUtils.convertToClusterVo(clusterEntry, masterNode, allCount)));
return result;
}
@@ -156,9 +166,9 @@ public class ClusterController {
List<ClusterEntry> allClusters = clusterService.getAllClusters();
List<ClusterVo> clusterVos = Lists.newArrayList();
for (ClusterEntry cluster : allClusters) {
- MasterEntry masterNode = masterService.getMasterNode(cluster.getClusterId());
- ClusterVo allCount = getAllCount(Integer.valueOf((int) cluster.getClusterId()));
- ClusterVo clusterVo = ConvertUtils.convertToClusterVo(cluster, masterNode, allCount);
+ List<MasterEntry> masterNodes = masterService.getMasterNodes(cluster.getClusterId());
+ ClusterVo allCount = getCountInCluster(Integer.valueOf((int) cluster.getClusterId()));
+ ClusterVo clusterVo = ConvertUtils.convertToClusterVo(cluster, masterNodes, allCount);
clusterVos.add(clusterVo);
}
result.setData(clusterVos);
@@ -199,7 +209,7 @@ public class ClusterController {
*
* @param clusterId
*/
- public ClusterVo getAllCount(Integer clusterId) {
+ public ClusterVo getCountInCluster(Integer clusterId) {
ClusterVo clusterVo = new ClusterVo();
int brokerSize = getBrokerSize(clusterId);
ClusterVo countVo = getTopicAndPartitionCount(clusterId);
@@ -314,4 +324,41 @@ public class ClusterController {
return storeCount;
}
+ /**
+ * query cluster by cluster name
+ */
+ public TubeMQResult queryClusterByClusterName(String clusterName) {
+ ClusterEntry clusterEntry = clusterService.getOneCluster(clusterName);
+ if (clusterEntry == null) {
+ return TubeMQResult.errorResult("no such cluster with name " + clusterName);
+ }
+ List<MasterEntry> masterNodes = masterService.getMasterNodes(clusterEntry.getClusterId());
+ ClusterVo allCount = getCountInCluster((int) clusterEntry.getClusterId());
+ result.setData(Lists.newArrayList(ConvertUtils.convertToClusterVo(clusterEntry, masterNodes, allCount)));
+ return result;
+ }
+
+ /**
+ * query cluster by cluster masterIp
+ */
+ public TubeMQResult queryClusterByMasterIp(String masterIp) {
+ List<ClusterEntry> clusterEntryList = Lists.newArrayList();
+ List<MasterEntry> masterNodes = masterService.getMasterNodes(masterIp);
+ if (CollectionUtils.isEmpty(masterNodes)) {
+ return TubeMQResult.errorResult("no such cluster with ip " + masterIp);
+ }
+ for (MasterEntry masterNode : masterNodes) {
+ ClusterEntry cluster = clusterService.getOneCluster(masterNode.getClusterId());
+ clusterEntryList.add(cluster);
+ }
+ List<ClusterVo> clusterVos = Lists.newArrayList();
+ for (ClusterEntry clusterEntry : clusterEntryList) {
+ ClusterVo allCount = getCountInCluster((int) clusterEntry.getClusterId());
+ ClusterVo clusterVo = ConvertUtils.convertToClusterVo(clusterEntry, masterNodes, allCount);
+ clusterVos.add(clusterVo);
+ }
+ result.setData(clusterVos);
+ return result;
+ }
+
}
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/vo/ClusterVo.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/vo/ClusterVo.java
index 500873c37..8f74a7d3f 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/vo/ClusterVo.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/vo/ClusterVo.java
@@ -19,13 +19,16 @@ package org.apache.inlong.tubemq.manager.controller.cluster.vo;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.inlong.tubemq.manager.entry.MasterEntry;
+
+import java.util.List;
@Data
@NoArgsConstructor
public class ClusterVo {
private Long clusterId;
private String clusterName;
- private String masterIp;
+ private List<MasterEntry> masterEntries;
private int reloadBrokerSize;
private int brokerCount;
private int topicCount;
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/ClusterRepository.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/ClusterRepository.java
index 274dd1e38..c0b6745d2 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/ClusterRepository.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/ClusterRepository.java
@@ -37,4 +37,11 @@ public interface ClusterRepository extends JpaRepository<ClusterEntry, Long> {
* @return
*/
Integer deleteByClusterId(Long clusterId);
+
+ /**
+ * find clusterEntry by clusterName
+ * @param clusterName
+ * @return
+ */
+ ClusterEntry findClusterEntryByClusterName(String clusterName);
}
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/MasterRepository.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/MasterRepository.java
index 862ebe711..e0b653664 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/MasterRepository.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/repository/MasterRepository.java
@@ -48,4 +48,12 @@ public interface MasterRepository extends JpaRepository<MasterEntry, Long> {
* @return
*/
List<MasterEntry> findAll();
+
+ /**
+ *
+ * find all nodes in ip
+ *
+ * @return
+ */
+ List<MasterEntry> findMasterEntryByIpEquals(String masterIp);
}
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
index 1a0d1720d..4fcd2963b 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
@@ -76,6 +76,12 @@ public class ClusterServiceImpl implements ClusterService {
.findClusterEntryByClusterId(clusterId);
}
+ @Override
+ public ClusterEntry getOneCluster(String clusterName) {
+ return clusterRepository
+ .findClusterEntryByClusterName(clusterName);
+ }
+
@Override
public List<ClusterEntry> getAllClusters() {
return clusterRepository.findAll();
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
index f9ebd183d..3d6379544 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
@@ -147,15 +147,25 @@ public class MasterServiceImpl implements MasterService {
if (clusterId == null) {
return null;
}
- List<MasterEntry> masters = masterRepository
- .findMasterEntriesByClusterIdEquals(
- clusterId);
+ List<MasterEntry> masters = masterRepository.findMasterEntriesByClusterIdEquals(clusterId);
if (CollectionUtils.isEmpty(masters)) {
throw new RuntimeException("cluster id " + clusterId + "no master node, please check");
}
return masters;
}
+ @Override
+ public List<MasterEntry> getMasterNodes(String masterIp) {
+ if (masterIp == null) {
+ return null;
+ }
+ List<MasterEntry> masters = masterRepository.findMasterEntryByIpEquals(masterIp);
+ if (CollectionUtils.isEmpty(masters)) {
+ throw new RuntimeException("master ip " + masterIp + "no master node, please check");
+ }
+ return masters;
+ }
+
@Override
public String getQueryUrl(Map<String, String> queryBody) throws Exception {
int clusterId = Integer.parseInt(queryBody.get("clusterId"));
@@ -174,8 +184,7 @@ public class MasterServiceImpl implements MasterService {
@Override
public String getQueryCountUrl(Integer clusterId, String method) {
- MasterEntry masterEntry =
- masterRepository.findMasterEntryByClusterIdEquals(clusterId);
+ MasterEntry masterEntry = getMasterNode(Long.valueOf(clusterId));
return TubeConst.SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+ method + "&" + "clusterId=" + clusterId;
}
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/ClusterService.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/ClusterService.java
index a1c755489..a9a1d2c41 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/ClusterService.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/ClusterService.java
@@ -51,6 +51,15 @@ public interface ClusterService {
*/
ClusterEntry getOneCluster(long clusterId);
+ /**
+ *
+ * get one cluster by cluster name
+ *
+ * @param clusterName
+ * @return
+ */
+ ClusterEntry getOneCluster(String clusterName);
+
/**
* get all clusters
*
@@ -65,4 +74,5 @@ public interface ClusterService {
* @return
*/
TubeMQResult modifyCluster(ClusterDto clusterDto);
+
}
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
index c94b7af5e..0ac583b24 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/MasterService.java
@@ -75,6 +75,13 @@ public interface MasterService {
*/
List<MasterEntry> getMasterNodes(Long clusterId);
+ /**
+ * get master in master ip
+ * @param masterIp
+ * @return
+ */
+ List<MasterEntry> getMasterNodes(String masterIp);
+
/**
* use queryBody to generate queryUrl for master query
*
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/utils/ConvertUtils.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/utils/ConvertUtils.java
index 65b7baeff..14cbb9cb4 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/utils/ConvertUtils.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/utils/ConvertUtils.java
@@ -112,10 +112,10 @@ public class ConvertUtils {
}
public static ClusterVo convertToClusterVo(ClusterEntry clusterEntry,
- MasterEntry masterEntry, ClusterVo clusterVo) {
+ List<MasterEntry> masterEntries, ClusterVo clusterVo) {
ClusterVo cluster = new ClusterVo();
cluster.setClusterId(clusterEntry.getClusterId());
- cluster.setMasterIp(masterEntry.getIp());
+ cluster.setMasterEntries(masterEntries);
cluster.setClusterName(clusterEntry.getClusterName());
cluster.setReloadBrokerSize(clusterEntry.getReloadBrokerSize());
cluster.setBrokerCount(clusterVo.getBrokerCount());