You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/12 03:00:40 UTC

[incubator-inlong] branch master updated: [INLONG-3073][Manager] Get MQ cluster by the type and mq_set_name (#3074)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 44770fb  [INLONG-3073][Manager] Get MQ cluster by the type and mq_set_name (#3074)
44770fb is described below

commit 44770fbccdbe00989f5b565305a5b1c09806965b
Author: healchow <he...@gmail.com>
AuthorDate: Sat Mar 12 11:00:32 2022 +0800

    [INLONG-3073][Manager] Get MQ cluster by the type and mq_set_name (#3074)
---
 .../org/apache/inlong/manager/common/enums/Constant.java  |  7 +++----
 .../manager/dao/mapper/ThirdPartyClusterEntityMapper.java |  3 +++
 .../resources/mappers/ThirdPartyClusterEntityMapper.xml   | 15 +++++++++++++++
 .../service/core/impl/ThirdPartyClusterServiceImpl.java   | 11 ++++++++---
 .../service/thirdparty/sort/util/SourceInfoUtils.java     |  3 +--
 .../src/main/resources/sql/apache_inlong_manager.sql      |  3 ++-
 inlong-manager/manager-web/sql/apache_inlong_manager.sql  |  5 +++--
 7 files changed, 35 insertions(+), 12 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 9666853..91925c7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -60,16 +60,15 @@ public class Constant {
     public static final String FILE_FORMAT_PARQUET = "Parquet";
 
     public static final String MIDDLEWARE_TUBE = "TUBE";
-
     public static final String MIDDLEWARE_PULSAR = "PULSAR";
-
+    public static final String MIDDLEWARE_TDMQ_PULSAR = "TDMQ_PULSAR";
     public static final String MIDDLEWARE_NONE = "NONE";
 
-    public static final String MIDDLEWARE_TDMQ = "TDMQ";
-
     public static final String SCHEMA_M0_DAY = "m0_day";
 
+    public static final String CLUSTER_TUBE = "TUBE";
     public static final String CLUSTER_PULSAR = "PULSAR";
+    public static final String CLUSTER_TDMQ_PULSAR = "TDMQ_PULSAR";
     public static final String CLUSTER_DATA_PROXY = "DATA_PROXY";
 
     public static final String ID_IS_EMPTY = "primary key is empty";
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
index 71f07ff..fe13d80 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
@@ -39,6 +39,9 @@ public interface ThirdPartyClusterEntityMapper {
 
     List<ThirdPartyClusterEntity> selectByType(@Param("type") String type);
 
+    List<ThirdPartyClusterEntity> selectMqCluster(@Param("mqSetName") String mqSetName,
+            @Param("typeList") List<String> typeList);
+
     ThirdPartyClusterEntity selectByName(@Param("name") String name);
 
     int updateByPrimaryKeySelective(ThirdPartyClusterEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
index 8b871ec..f67ae62 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
@@ -225,6 +225,21 @@
         where is_deleted = 0
         and name = #{name, jdbcType=VARCHAR}
     </select>
+    <select id="selectMqCluster" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from third_party_cluster
+        <where>
+            is_deleted = 0
+            and mq_set_name = #{mqSetName, jdbcType=VARCHAR}
+            <if test="typeList != null and typeList.size()>0">
+                and `type` in
+                <foreach item="item" index="index" collection="typeList" open="(" close=")" separator=",">
+                    #{item}
+                </foreach>
+            </if>
+        </where>
+    </select>
 
     <update id="updateByPrimaryKeySelective"
             parameterType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
index f10276f..b7e5cb7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
@@ -55,6 +55,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -84,6 +85,9 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
     public Integer save(ClusterRequest request, String operator) {
         LOGGER.info("begin to insert a cluster info cluster={}", request);
         Preconditions.checkNotNull(request, "cluster is empty");
+        ThirdPartyClusterEntity exist = thirdPartyClusterMapper.selectByName(request.getName());
+        Preconditions.checkTrue(exist == null, "cluster name already exist");
+
         ThirdPartyClusterEntity entity = CommonBeanUtils.copyProperties(request, ThirdPartyClusterEntity::new);
         if (operator != null) {
             entity.setCreator(operator);
@@ -279,9 +283,10 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
 
         // construct pulsarSet info
         List<ThirdPartyClusterInfo> mqSet = new ArrayList<>();
-        ClusterPageRequest request = new ClusterPageRequest();
-        request.setMqSetName(clusterEntity.getMqSetName());
-        List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByCondition(request);
+        List<String> clusterType = Arrays.asList(Constant.CLUSTER_TUBE, Constant.CLUSTER_PULSAR,
+                Constant.CLUSTER_TDMQ_PULSAR);
+        List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectMqCluster(
+                clusterEntity.getMqSetName(), clusterType);
         for (ThirdPartyClusterEntity cluster : clusterList) {
             ThirdPartyClusterInfo clusterInfo = new ThirdPartyClusterInfo();
             clusterInfo.setUrl(cluster.getUrl());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index e43eb73..2df1936 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -37,7 +37,6 @@ import org.apache.inlong.sort.protocol.source.TDMQPulsarSourceInfo;
 import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 
 import java.util.List;
-import java.util.Locale;
 
 /**
  * Utils for source info
@@ -100,7 +99,7 @@ public class SourceInfoUtils {
         final String consumerGroup = clusterBean.getAppName() + "_" + topicName + "_consumer_group";
         FieldInfo[] fieldInfosArr = fieldInfos.toArray(new FieldInfo[0]);
         String type = pulsarCluster.getType();
-        if (StringUtils.isNotEmpty(type) && type.toUpperCase(Locale.ROOT).contains(Constant.MIDDLEWARE_TDMQ)) {
+        if (StringUtils.isNotEmpty(type) && Constant.MIDDLEWARE_TDMQ_PULSAR.equals(type)) {
             return new TDMQPulsarSourceInfo(pulsarCluster.getBrokerServiceUrl(),
                     fullTopicName, consumerGroup, pulsarCluster.getToken(), deserializationInfo, fieldInfosArr);
         } else {
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index a3d8079..b83f493 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -165,7 +165,8 @@ CREATE TABLE `third_party_cluster`
     `modifier`    varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
     `create_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_cluster_name` (`name`, `is_deleted`)
 );
 
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index bb6cc78..e3746c1 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -176,9 +176,10 @@ CREATE TABLE `third_party_cluster`
     `modifier`    varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
     `create_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_cluster_name` (`name`, `is_deleted`)
 ) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='MQ Cluster Information Table';
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Cluster Information Table';
 
 -- ----------------------------
 -- Table structure for common_db_server