You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/12 03:00:40 UTC
[incubator-inlong] branch master updated: [INLONG-3073][Manager] Get MQ cluster by the type and mq_set_name (#3074)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 44770fb [INLONG-3073][Manager] Get MQ cluster by the type and mq_set_name (#3074)
44770fb is described below
commit 44770fbccdbe00989f5b565305a5b1c09806965b
Author: healchow <he...@gmail.com>
AuthorDate: Sat Mar 12 11:00:32 2022 +0800
[INLONG-3073][Manager] Get MQ cluster by the type and mq_set_name (#3074)
---
.../org/apache/inlong/manager/common/enums/Constant.java | 7 +++----
.../manager/dao/mapper/ThirdPartyClusterEntityMapper.java | 3 +++
.../resources/mappers/ThirdPartyClusterEntityMapper.xml | 15 +++++++++++++++
.../service/core/impl/ThirdPartyClusterServiceImpl.java | 11 ++++++++---
.../service/thirdparty/sort/util/SourceInfoUtils.java | 3 +--
.../src/main/resources/sql/apache_inlong_manager.sql | 3 ++-
inlong-manager/manager-web/sql/apache_inlong_manager.sql | 5 +++--
7 files changed, 35 insertions(+), 12 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 9666853..91925c7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -60,16 +60,15 @@ public class Constant {
public static final String FILE_FORMAT_PARQUET = "Parquet";
public static final String MIDDLEWARE_TUBE = "TUBE";
-
public static final String MIDDLEWARE_PULSAR = "PULSAR";
-
+ public static final String MIDDLEWARE_TDMQ_PULSAR = "TDMQ_PULSAR";
public static final String MIDDLEWARE_NONE = "NONE";
- public static final String MIDDLEWARE_TDMQ = "TDMQ";
-
public static final String SCHEMA_M0_DAY = "m0_day";
+ public static final String CLUSTER_TUBE = "TUBE";
public static final String CLUSTER_PULSAR = "PULSAR";
+ public static final String CLUSTER_TDMQ_PULSAR = "TDMQ_PULSAR";
public static final String CLUSTER_DATA_PROXY = "DATA_PROXY";
public static final String ID_IS_EMPTY = "primary key is empty";
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
index 71f07ff..fe13d80 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
@@ -39,6 +39,9 @@ public interface ThirdPartyClusterEntityMapper {
List<ThirdPartyClusterEntity> selectByType(@Param("type") String type);
+ List<ThirdPartyClusterEntity> selectMqCluster(@Param("mqSetName") String mqSetName,
+ @Param("typeList") List<String> typeList);
+
ThirdPartyClusterEntity selectByName(@Param("name") String name);
int updateByPrimaryKeySelective(ThirdPartyClusterEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
index 8b871ec..f67ae62 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
@@ -225,6 +225,21 @@
where is_deleted = 0
and name = #{name, jdbcType=VARCHAR}
</select>
+ <select id="selectMqCluster" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from third_party_cluster
+ <where>
+ is_deleted = 0
+ and mq_set_name = #{mqSetName, jdbcType=VARCHAR}
+ <if test="typeList != null and typeList.size()>0">
+ and `type` in
+ <foreach item="item" index="index" collection="typeList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </if>
+ </where>
+ </select>
<update id="updateByPrimaryKeySelective"
parameterType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
index f10276f..b7e5cb7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
@@ -55,6 +55,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -84,6 +85,9 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
public Integer save(ClusterRequest request, String operator) {
LOGGER.info("begin to insert a cluster info cluster={}", request);
Preconditions.checkNotNull(request, "cluster is empty");
+ ThirdPartyClusterEntity exist = thirdPartyClusterMapper.selectByName(request.getName());
+ Preconditions.checkTrue(exist == null, "cluster name already exist");
+
ThirdPartyClusterEntity entity = CommonBeanUtils.copyProperties(request, ThirdPartyClusterEntity::new);
if (operator != null) {
entity.setCreator(operator);
@@ -279,9 +283,10 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
// construct pulsarSet info
List<ThirdPartyClusterInfo> mqSet = new ArrayList<>();
- ClusterPageRequest request = new ClusterPageRequest();
- request.setMqSetName(clusterEntity.getMqSetName());
- List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByCondition(request);
+ List<String> clusterType = Arrays.asList(Constant.CLUSTER_TUBE, Constant.CLUSTER_PULSAR,
+ Constant.CLUSTER_TDMQ_PULSAR);
+ List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectMqCluster(
+ clusterEntity.getMqSetName(), clusterType);
for (ThirdPartyClusterEntity cluster : clusterList) {
ThirdPartyClusterInfo clusterInfo = new ThirdPartyClusterInfo();
clusterInfo.setUrl(cluster.getUrl());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index e43eb73..2df1936 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -37,7 +37,6 @@ import org.apache.inlong.sort.protocol.source.TDMQPulsarSourceInfo;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import java.util.List;
-import java.util.Locale;
/**
* Utils for source info
@@ -100,7 +99,7 @@ public class SourceInfoUtils {
final String consumerGroup = clusterBean.getAppName() + "_" + topicName + "_consumer_group";
FieldInfo[] fieldInfosArr = fieldInfos.toArray(new FieldInfo[0]);
String type = pulsarCluster.getType();
- if (StringUtils.isNotEmpty(type) && type.toUpperCase(Locale.ROOT).contains(Constant.MIDDLEWARE_TDMQ)) {
+ if (StringUtils.isNotEmpty(type) && Constant.MIDDLEWARE_TDMQ_PULSAR.equals(type)) {
return new TDMQPulsarSourceInfo(pulsarCluster.getBrokerServiceUrl(),
fullTopicName, consumerGroup, pulsarCluster.getToken(), deserializationInfo, fieldInfosArr);
} else {
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index a3d8079..b83f493 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -165,7 +165,8 @@ CREATE TABLE `third_party_cluster`
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_cluster_name` (`name`, `is_deleted`)
);
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index bb6cc78..e3746c1 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -176,9 +176,10 @@ CREATE TABLE `third_party_cluster`
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_cluster_name` (`name`, `is_deleted`)
) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='MQ Cluster Information Table';
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Cluster Information Table';
-- ----------------------------
-- Table structure for common_db_server