You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/17 06:31:59 UTC

[incubator-inlong] branch master updated: [INLONG-3179][Manager] Check the source name can not be the same when saving stream source (#3180)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 865a301  [INLONG-3179][Manager] Check the source name can not be the same when saving stream source (#3180)
865a301 is described below

commit 865a3019a0d5e70245de58d07dc165dcb9e6b575
Author: healchow <he...@gmail.com>
AuthorDate: Thu Mar 17 14:31:55 2022 +0800

    [INLONG-3179][Manager] Check the source name can not be the same when saving stream source (#3180)
---
 .../dao/mapper/StreamSourceEntityMapper.java       | 27 +++++-----------------
 .../resources/mappers/StreamSourceEntityMapper.xml | 10 +++++---
 .../source/AbstractStreamSourceOperation.java      | 13 +++++++++++
 .../service/source/StreamSourceServiceImpl.java    |  4 ++--
 .../main/resources/sql/apache_inlong_manager.sql   |  6 ++---
 .../manager-web/sql/apache_inlong_manager.sql      |  6 ++---
 6 files changed, 34 insertions(+), 32 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 76be150..b18ef34 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -35,40 +35,25 @@ public interface StreamSourceEntityMapper {
 
     /**
      * According to the inlong group id and inlong stream id, query the number of valid source
-     *
-     * @param groupId inlong group id
-     * @param streamId inlong stream id
-     * @return Source entity size
      */
     int selectCount(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
     /**
      * Paging query source list based on conditions
-     *
-     * @param request Paging query conditions
-     * @return Source entity list
      */
     List<StreamSourceEntity> selectByCondition(@Param("request") SourcePageRequest request);
 
     /**
-     * Query valid source list by the given group id and stream id.
-     *
-     * @param groupId Inlong group id.
-     * @param streamId Inlong stream id.
-     * @return Source entity list.
+     * Query valid source list by the given group id, stream id and source name.
      */
-    List<StreamSourceEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+    List<StreamSourceEntity> selectByRelatedIdForUpdate(@Param("groupId") String groupId,
+            @Param("streamId") String streamId, @Param("sourceName") String sourceName);
 
     /**
      * According to the group id, stream id and source type, query valid source entity list.
-     *
-     * @param groupId Inlong group id.
-     * @param streamId Inlong stream id.
-     * @param sourceType Source type.
-     * @return Source entity list.
      */
-    List<StreamSourceEntity> selectByIdAndType(@Param("groupId") String groupId, @Param("streamId") String streamId,
-            @Param("sourceType") String sourceType);
+    List<StreamSourceEntity> selectByRelatedIdAndTypeForUpdate(@Param("groupId") String groupId,
+            @Param("streamId") String streamId, @Param("sourceType") String sourceType);
 
     /**
      * Query the tasks that need to be added.
@@ -78,7 +63,7 @@ public interface StreamSourceEntityMapper {
     /**
      * Query the sources with status 20x by the given agent IP and agent UUID.
      */
-    List<StreamSourceEntity> selectByStatusAndIpForUpdate(@Param("list") List<Integer> list,
+    List<StreamSourceEntity> selectByStatusAndIpForUpdate(@Param("statusList") List<Integer> statusList,
             @Param("agentIp") String agentIp, @Param("uuid") String uuid);
 
     /**
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 1e9124c..874f919 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -251,7 +251,7 @@
         </where>
     </select>
 
-    <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+    <select id="selectByRelatedIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
@@ -261,9 +261,13 @@
             <if test="streamId != null and streamId != ''">
                 and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
             </if>
+            <if test="sourceName != null and sourceName != ''">
+                and source_name = #{sourceName, jdbcType=VARCHAR},
+            </if>
+            for update
         </where>
     </select>
-    <select id="selectByIdAndType" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+    <select id="selectByRelatedIdAndTypeForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
@@ -299,7 +303,7 @@
         <where>
             is_deleted = 0
             and status in
-            <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
+            <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=",">
                 #{item}
             </foreach>
             and agent_ip = #{agentIp, jdbcType=VARCHAR}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index 1ebfdf4..67235b2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -17,10 +17,12 @@
 
 package org.apache.inlong.manager.service.source;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.enums.SourceState;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -35,6 +37,7 @@ import org.springframework.transaction.annotation.Transactional;
 
 import javax.validation.constraints.NotNull;
 import java.util.Date;
+import java.util.List;
 
 public abstract class AbstractStreamSourceOperation implements StreamSourceOperation {
 
@@ -94,7 +97,17 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
     }
 
     @Override
+    @Transactional(rollbackFor = Throwable.class)
     public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) {
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sourceName = request.getSourceName();
+        List<StreamSourceEntity> existList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, sourceName);
+        if (CollectionUtils.isNotEmpty(existList)) {
+            String err = "stream source already exists with groupId=%s, streamId=%s, sourceName=%s";
+            throw new BusinessException(String.format(err, groupId, streamId, sourceName));
+        }
+
         StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
         if (GroupState.forCode(groupStatus).equals(GroupState.CONFIG_SUCCESSFUL)) {
             entity.setStatus(SourceState.TO_BE_ISSUED_ADD.getCode());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 0210737..b93e9f9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -109,7 +109,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         LOGGER.debug("begin to list source by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
 
-        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, null);
         if (CollectionUtils.isEmpty(entityList)) {
             return Collections.emptyList();
         }
@@ -253,7 +253,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
             nextStatus = SourceState.SOURCE_DISABLE.getCode();
         }
         Date now = new Date();
-        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, null);
         if (CollectionUtils.isNotEmpty(entityList)) {
             for (StreamSourceEntity entity : entityList) {
                 Integer id = entity.getId();
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 891a330..3afa66a 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -386,7 +386,7 @@ CREATE TABLE `inlong_stream_ext`
     `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     KEY `index_stream_id` (`inlong_stream_id`),
-    UNIQUE KEY `group_stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`)
+    UNIQUE KEY `unique_group_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`)
 );
 
 -- ----------------------------
@@ -582,7 +582,7 @@ CREATE TABLE `stream_source`
     `create_time`      timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `source_idx` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`)
+    UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`)
 );
 
 -- ----------------------------
@@ -608,7 +608,7 @@ CREATE TABLE `stream_sink`
     `create_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `sink_idx` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
+    UNIQUE KEY `unique_sink_name` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
 );
 
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index d7a90bb..8a87283 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -407,7 +407,7 @@ CREATE TABLE `inlong_stream_ext`
     `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     KEY `index_stream_id` (`inlong_stream_id`),
-    UNIQUE KEY `group_stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`)
+    UNIQUE KEY `unique_group_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream extension table';
 
@@ -611,7 +611,7 @@ CREATE TABLE `stream_source`
     `create_time`      timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `source_idx` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`)
+    UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
 
@@ -638,7 +638,7 @@ CREATE TABLE `stream_sink`
     `create_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `sink_idx` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
+    UNIQUE KEY `unique_sink_name` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink table';