You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/31 06:25:50 UTC
[incubator-inlong] branch master updated: [INLONG-4164][Manager] Migrate the use of third_party_cluster table to inlong_cluster table (#4165)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3c8b24d30 [INLONG-4164][Manager] Migrate the use of third_party_cluster table to inlong_cluster table (#4165)
3c8b24d30 is described below
commit 3c8b24d301f2c45e73903a77d3482df7068158d5
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Tue May 31 14:25:44 2022 +0800
[INLONG-4164][Manager] Migrate the use of third_party_cluster table to inlong_cluster table (#4165)
---
.../dao/mapper/InlongClusterNodeEntityMapper.java | 3 +
.../mappers/InlongClusterNodeEntityMapper.xml | 8 +-
.../manager/service/core/InlongClusterService.java | 27 ++++++
.../core/impl/InlongClusterServiceImpl.java | 95 ++++++++++++++++++++++
.../core/impl/InlongClusterServiceTest.java | 28 +++++++
5 files changed, 160 insertions(+), 1 deletion(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index 88b25731f..9847f05b8 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.dao.mapper;
+import org.apache.ibatis.annotations.Param;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
@@ -37,6 +38,8 @@ public interface InlongClusterNodeEntityMapper {
List<InlongClusterNodeEntity> selectByCondition(InlongClusterPageRequest request);
+ List<InlongClusterNodeEntity> selectByParentId(@Param("parentId") Integer parentId);
+
int updateById(InlongClusterNodeEntity record);
int updateByIdSelective(InlongClusterNodeEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 4457863c9..732ed3c8a 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -169,7 +169,13 @@
</where>
order by modify_time desc
</select>
-
+ <select id="selectByParentId" resultType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_cluster_node
+ where is_deleted = 0
+ and parent_id = #{parentId, jdbcType=INTEGER}
+ </select>
<update id="updateByIdSelective"
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
update inlong_cluster_node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
index 6c036b26f..53b228585 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
@@ -18,11 +18,15 @@
package org.apache.inlong.manager.service.core;
import com.github.pagehelper.PageInfo;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterResponse;
+import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
+
+import java.util.List;
/**
* Inlong cluster service layer interface
@@ -54,6 +58,14 @@ public interface InlongClusterService {
*/
PageInfo<InlongClusterResponse> list(InlongClusterPageRequest request);
+ /**
+ * Query ip list by cluster type
+ *
+ * @param type cluster type
+ * @return clustre node ip list
+ */
+ public List<String> listNodeIpByType(String type);
+
/**
* Update cluster information
*
@@ -115,4 +127,19 @@ public interface InlongClusterService {
*/
Boolean deleteNode(Integer id, String operator);
+ /**
+ * Query data proxy ip list by the given cluster name.
+ *
+ * @param clusterName Cluster name.
+ * @return Data proxy info list.
+ */
+ List<DataProxyResponse> getIpList(String clusterName);
+
+ /**
+ * query data proxy config by cluster id
+ *
+ * @return data proxy config
+ */
+ List<DataProxyConfig> getConfig();
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
index 76d25cd9c..7615b7fb2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
@@ -20,26 +20,38 @@ package org.apache.inlong.manager.service.core.impl;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
+import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterResponse;
+import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.service.core.InlongClusterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
@@ -51,11 +63,18 @@ import java.util.Objects;
public class InlongClusterServiceImpl implements InlongClusterService {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
+ public static final String SCHEMA_M0_DAY = "m0_day";
@Autowired
private InlongClusterEntityMapper clusterMapper;
@Autowired
private InlongClusterNodeEntityMapper clusterNodeMapper;
+ @Autowired
+ private InlongGroupEntityMapper groupMapper;
+ @Autowired
+ private InlongStreamEntityMapper streamMapper;
+ @Autowired
+ private ClusterBean clusterBean;
@Override
public Integer save(InlongClusterRequest request, String operator) {
@@ -110,6 +129,22 @@ public class InlongClusterServiceImpl implements InlongClusterService {
return page;
}
+ @Override
+ public List<String> listNodeIpByType(String type) {
+ Preconditions.checkNotNull(type, "inlong cluster type is null");
+ InlongClusterPageRequest request = new InlongClusterPageRequest();
+ request.setType(type);
+ List<InlongClusterEntity> clusterList = clusterMapper.selectByCondition(request);
+ List<String> ipList = new ArrayList<>();
+ for (InlongClusterEntity clusterEntity : clusterList) {
+ List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(clusterEntity.getId());
+ for (InlongClusterNodeEntity nodeEntity : nodeList) {
+ ipList.add(String.format("%s:%d", nodeEntity.getIp(), nodeEntity.getPort()));
+ }
+ }
+ return ipList;
+ }
+
@Override
public Boolean update(InlongClusterRequest request, String operator) {
LOGGER.debug("begin to update inlong cluster={}", request);
@@ -261,4 +296,64 @@ public class InlongClusterServiceImpl implements InlongClusterService {
return true;
}
+ @Override
+ public List<DataProxyResponse> getIpList(String clusterName) {
+ LOGGER.debug("begin to list data proxy by clusterName={}", clusterName);
+ InlongClusterPageRequest request = new InlongClusterPageRequest();
+ request.setType(InlongGroupSettings.CLUSTER_DATA_PROXY);
+ if (StringUtils.isNotBlank(clusterName)) {
+ request.setName(clusterName);
+ }
+ List<InlongClusterEntity> clusterList = clusterMapper.selectByCondition(request);
+ Preconditions.checkNotEmpty(clusterList,
+ "data proxy cluster not found by type=" + InlongGroupSettings.CLUSTER_DATA_PROXY);
+
+ List<DataProxyResponse> responseList = new ArrayList<>();
+ for (InlongClusterEntity clusterEntity : clusterList) {
+ Integer clusterId = clusterEntity.getId();
+ List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(clusterId);
+ for (InlongClusterNodeEntity nodeEntity : nodeList) {
+ DataProxyResponse response = new DataProxyResponse();
+ response.setId(clusterId);
+ response.setIp(nodeEntity.getIp());
+ response.setPort(nodeEntity.getPort());
+ responseList.add(response);
+ }
+ }
+
+ LOGGER.debug("success to list data proxy cluster={}", responseList);
+ return responseList;
+ }
+
+ @Override
+ public List<DataProxyConfig> getConfig() {
+ // get all configs with inlong group status of 130, that is, config successful
+ // TODO Optimize query conditions
+ List<InlongGroupEntity> groupEntityList = groupMapper.selectAll(GroupStatus.CONFIG_SUCCESSFUL.getCode());
+ List<DataProxyConfig> configList = new ArrayList<>();
+ for (InlongGroupEntity groupEntity : groupEntityList) {
+ String groupId = groupEntity.getInlongGroupId();
+ String bizResource = groupEntity.getMqResource();
+
+ DataProxyConfig config = new DataProxyConfig();
+ config.setM(SCHEMA_M0_DAY);
+ MQType mqType = MQType.forType(groupEntity.getMqType());
+ if (mqType == MQType.TUBE) {
+ config.setInlongGroupId(groupId);
+ config.setTopic(bizResource);
+ } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+ List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
+ for (InlongStreamEntity stream : streamList) {
+ String topic = stream.getMqResource();
+ String streamId = stream.getInlongStreamId();
+ config.setInlongGroupId(groupId + "/" + streamId);
+ config.setTopic("persistent://" + clusterBean.getDefaultTenant() + "/" + bizResource + "/" + topic);
+ }
+ }
+ configList.add(config);
+ }
+
+ return configList;
+ }
+
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
index 529880b17..5952fc801 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
@@ -242,4 +242,32 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
Assert.assertTrue(success);
}
+ @Test
+ public void testGetDataProxyIp() {
+ String clusterTag = "default_cluster";
+ String extTag = "ext_1";
+ String ip = "127.0.0.1";
+ Integer port1 = 46800;
+ Integer port2 = 46801;
+
+ Integer id = this.saveCluster(CLUSTER_NAME, InlongGroupSettings.CLUSTER_DATA_PROXY, clusterTag, extTag);
+ Assert.assertNotNull(id);
+
+ // save cluster node
+ Integer nodeId1 = this.saveClusterNode(id, InlongGroupSettings.CLUSTER_DATA_PROXY, ip, port1);
+ Assert.assertNotNull(nodeId1);
+
+ Integer nodeId2 = this.saveClusterNode(id, InlongGroupSettings.CLUSTER_DATA_PROXY, ip, port2);
+ Assert.assertNotNull(nodeId2);
+
+ // Get the data proxy cluster ip list, the first port should is p1, second port is p2
+ List<DataProxyResponse> ipList = inlongClusterService.getIpList(CLUSTER_NAME);
+ Assert.assertEquals(ipList.size(), 2);
+ Assert.assertEquals(port1, ipList.get(0).getPort());
+ Assert.assertEquals(port2, ipList.get(1).getPort());
+ this.deleteClusterNode(nodeId1);
+ this.deleteClusterNode(nodeId2);
+ this.deleteCluster(id);
+ }
+
}