You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/31 06:25:50 UTC

[incubator-inlong] branch master updated: [INLONG-4164][Manager] Migrate the use of third_party_cluster table to inlong_cluster table (#4165)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c8b24d30 [INLONG-4164][Manager] Migrate the use of third_party_cluster table to inlong_cluster table (#4165)
3c8b24d30 is described below

commit 3c8b24d301f2c45e73903a77d3482df7068158d5
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Tue May 31 14:25:44 2022 +0800

    [INLONG-4164][Manager] Migrate the use of third_party_cluster table to inlong_cluster table (#4165)
---
 .../dao/mapper/InlongClusterNodeEntityMapper.java  |  3 +
 .../mappers/InlongClusterNodeEntityMapper.xml      |  8 +-
 .../manager/service/core/InlongClusterService.java | 27 ++++++
 .../core/impl/InlongClusterServiceImpl.java        | 95 ++++++++++++++++++++++
 .../core/impl/InlongClusterServiceTest.java        | 28 +++++++
 5 files changed, 160 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index 88b25731f..9847f05b8 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
+import org.apache.ibatis.annotations.Param;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
 import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
@@ -37,6 +38,8 @@ public interface InlongClusterNodeEntityMapper {
 
     List<InlongClusterNodeEntity> selectByCondition(InlongClusterPageRequest request);
 
+    List<InlongClusterNodeEntity> selectByParentId(@Param("parentId") Integer parentId);
+
     int updateById(InlongClusterNodeEntity record);
 
     int updateByIdSelective(InlongClusterNodeEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 4457863c9..732ed3c8a 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -169,7 +169,13 @@
         </where>
         order by modify_time desc
     </select>
-
+    <select id="selectByParentId" resultType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_cluster_node
+        where is_deleted = 0
+        and parent_id = #{parentId, jdbcType=INTEGER}
+    </select>
     <update id="updateByIdSelective"
             parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
         update inlong_cluster_node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
index 6c036b26f..53b228585 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
@@ -18,11 +18,15 @@
 package org.apache.inlong.manager.service.core;
 
 import com.github.pagehelper.PageInfo;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterResponse;
+import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
+
+import java.util.List;
 
 /**
  * Inlong cluster service layer interface
@@ -54,6 +58,14 @@ public interface InlongClusterService {
      */
     PageInfo<InlongClusterResponse> list(InlongClusterPageRequest request);
 
+    /**
+     * Query ip list by cluster type
+     *
+     * @param type cluster type
+     * @return clustre node ip list
+     */
+    public List<String> listNodeIpByType(String type);
+
     /**
      * Update cluster information
      *
@@ -115,4 +127,19 @@ public interface InlongClusterService {
      */
     Boolean deleteNode(Integer id, String operator);
 
+    /**
+     * Query data proxy ip list by the given cluster name.
+     *
+     * @param clusterName Cluster name.
+     * @return Data proxy info list.
+     */
+    List<DataProxyResponse> getIpList(String clusterName);
+
+    /**
+     * query data proxy config by cluster id
+     *
+     * @return data proxy config
+     */
+    List<DataProxyConfig> getConfig();
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
index 76d25cd9c..7615b7fb2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
@@ -20,26 +20,38 @@ package org.apache.inlong.manager.service.core.impl;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
+import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterResponse;
+import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.service.core.InlongClusterService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Objects;
@@ -51,11 +63,18 @@ import java.util.Objects;
 public class InlongClusterServiceImpl implements InlongClusterService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
+    public static final String SCHEMA_M0_DAY = "m0_day";
 
     @Autowired
     private InlongClusterEntityMapper clusterMapper;
     @Autowired
     private InlongClusterNodeEntityMapper clusterNodeMapper;
+    @Autowired
+    private InlongGroupEntityMapper groupMapper;
+    @Autowired
+    private InlongStreamEntityMapper streamMapper;
+    @Autowired
+    private ClusterBean clusterBean;
 
     @Override
     public Integer save(InlongClusterRequest request, String operator) {
@@ -110,6 +129,22 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         return page;
     }
 
+    @Override
+    public List<String> listNodeIpByType(String type) {
+        Preconditions.checkNotNull(type, "inlong cluster type is null");
+        InlongClusterPageRequest request = new InlongClusterPageRequest();
+        request.setType(type);
+        List<InlongClusterEntity> clusterList = clusterMapper.selectByCondition(request);
+        List<String> ipList = new ArrayList<>();
+        for (InlongClusterEntity clusterEntity : clusterList) {
+            List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(clusterEntity.getId());
+            for (InlongClusterNodeEntity nodeEntity : nodeList) {
+                ipList.add(String.format("%s:%d", nodeEntity.getIp(), nodeEntity.getPort()));
+            }
+        }
+        return ipList;
+    }
+
     @Override
     public Boolean update(InlongClusterRequest request, String operator) {
         LOGGER.debug("begin to update inlong cluster={}", request);
@@ -261,4 +296,64 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         return true;
     }
 
+    @Override
+    public List<DataProxyResponse> getIpList(String clusterName) {
+        LOGGER.debug("begin to list data proxy by clusterName={}", clusterName);
+        InlongClusterPageRequest request = new InlongClusterPageRequest();
+        request.setType(InlongGroupSettings.CLUSTER_DATA_PROXY);
+        if (StringUtils.isNotBlank(clusterName)) {
+            request.setName(clusterName);
+        }
+        List<InlongClusterEntity> clusterList = clusterMapper.selectByCondition(request);
+        Preconditions.checkNotEmpty(clusterList,
+                "data proxy cluster not found by type=" + InlongGroupSettings.CLUSTER_DATA_PROXY);
+
+        List<DataProxyResponse> responseList = new ArrayList<>();
+        for (InlongClusterEntity clusterEntity : clusterList) {
+            Integer clusterId = clusterEntity.getId();
+            List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(clusterId);
+            for (InlongClusterNodeEntity nodeEntity : nodeList) {
+                DataProxyResponse response = new DataProxyResponse();
+                response.setId(clusterId);
+                response.setIp(nodeEntity.getIp());
+                response.setPort(nodeEntity.getPort());
+                responseList.add(response);
+            }
+        }
+
+        LOGGER.debug("success to list data proxy cluster={}", responseList);
+        return responseList;
+    }
+
+    @Override
+    public List<DataProxyConfig> getConfig() {
+        // get all configs with inlong group status of 130, that is, config successful
+        // TODO Optimize query conditions
+        List<InlongGroupEntity> groupEntityList = groupMapper.selectAll(GroupStatus.CONFIG_SUCCESSFUL.getCode());
+        List<DataProxyConfig> configList = new ArrayList<>();
+        for (InlongGroupEntity groupEntity : groupEntityList) {
+            String groupId = groupEntity.getInlongGroupId();
+            String bizResource = groupEntity.getMqResource();
+
+            DataProxyConfig config = new DataProxyConfig();
+            config.setM(SCHEMA_M0_DAY);
+            MQType mqType = MQType.forType(groupEntity.getMqType());
+            if (mqType == MQType.TUBE) {
+                config.setInlongGroupId(groupId);
+                config.setTopic(bizResource);
+            } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+                List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
+                for (InlongStreamEntity stream : streamList) {
+                    String topic = stream.getMqResource();
+                    String streamId = stream.getInlongStreamId();
+                    config.setInlongGroupId(groupId + "/" + streamId);
+                    config.setTopic("persistent://" + clusterBean.getDefaultTenant() + "/" + bizResource + "/" + topic);
+                }
+            }
+            configList.add(config);
+        }
+
+        return configList;
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
index 529880b17..5952fc801 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
@@ -242,4 +242,32 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         Assert.assertTrue(success);
     }
 
+    @Test
+    public void testGetDataProxyIp() {
+        String clusterTag = "default_cluster";
+        String extTag = "ext_1";
+        String ip = "127.0.0.1";
+        Integer port1 = 46800;
+        Integer port2 = 46801;
+
+        Integer id = this.saveCluster(CLUSTER_NAME, InlongGroupSettings.CLUSTER_DATA_PROXY, clusterTag, extTag);
+        Assert.assertNotNull(id);
+
+        // save cluster node
+        Integer nodeId1 = this.saveClusterNode(id, InlongGroupSettings.CLUSTER_DATA_PROXY, ip, port1);
+        Assert.assertNotNull(nodeId1);
+
+        Integer nodeId2 = this.saveClusterNode(id, InlongGroupSettings.CLUSTER_DATA_PROXY, ip, port2);
+        Assert.assertNotNull(nodeId2);
+
+        // Get the data proxy cluster ip list, the first port should is p1, second port is p2
+        List<DataProxyResponse> ipList = inlongClusterService.getIpList(CLUSTER_NAME);
+        Assert.assertEquals(ipList.size(), 2);
+        Assert.assertEquals(port1, ipList.get(0).getPort());
+        Assert.assertEquals(port2, ipList.get(1).getPort());
+        this.deleteClusterNode(nodeId1);
+        this.deleteClusterNode(nodeId2);
+        this.deleteCluster(id);
+    }
+
 }