You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/17 06:31:59 UTC
[incubator-inlong] branch master updated: [INLONG-3179][Manager] Check the source name can not be the same when saving stream source (#3180)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 865a301 [INLONG-3179][Manager] Check the source name can not be the same when saving stream source (#3180)
865a301 is described below
commit 865a3019a0d5e70245de58d07dc165dcb9e6b575
Author: healchow <he...@gmail.com>
AuthorDate: Thu Mar 17 14:31:55 2022 +0800
[INLONG-3179][Manager] Check the source name can not be the same when saving stream source (#3180)
---
.../dao/mapper/StreamSourceEntityMapper.java | 27 +++++-----------------
.../resources/mappers/StreamSourceEntityMapper.xml | 10 +++++---
.../source/AbstractStreamSourceOperation.java | 13 +++++++++++
.../service/source/StreamSourceServiceImpl.java | 4 ++--
.../main/resources/sql/apache_inlong_manager.sql | 6 ++---
.../manager-web/sql/apache_inlong_manager.sql | 6 ++---
6 files changed, 34 insertions(+), 32 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 76be150..b18ef34 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -35,40 +35,25 @@ public interface StreamSourceEntityMapper {
/**
* According to the inlong group id and inlong stream id, query the number of valid source
- *
- * @param groupId inlong group id
- * @param streamId inlong stream id
- * @return Source entity size
*/
int selectCount(@Param("groupId") String groupId, @Param("streamId") String streamId);
/**
* Paging query source list based on conditions
- *
- * @param request Paging query conditions
- * @return Source entity list
*/
List<StreamSourceEntity> selectByCondition(@Param("request") SourcePageRequest request);
/**
- * Query valid source list by the given group id and stream id.
- *
- * @param groupId Inlong group id.
- * @param streamId Inlong stream id.
- * @return Source entity list.
+ * Query valid source list by the given group id, stream id and source name.
*/
- List<StreamSourceEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+ List<StreamSourceEntity> selectByRelatedIdForUpdate(@Param("groupId") String groupId,
+ @Param("streamId") String streamId, @Param("sourceName") String sourceName);
/**
* According to the group id, stream id and source type, query valid source entity list.
- *
- * @param groupId Inlong group id.
- * @param streamId Inlong stream id.
- * @param sourceType Source type.
- * @return Source entity list.
*/
- List<StreamSourceEntity> selectByIdAndType(@Param("groupId") String groupId, @Param("streamId") String streamId,
- @Param("sourceType") String sourceType);
+ List<StreamSourceEntity> selectByRelatedIdAndTypeForUpdate(@Param("groupId") String groupId,
+ @Param("streamId") String streamId, @Param("sourceType") String sourceType);
/**
* Query the tasks that need to be added.
@@ -78,7 +63,7 @@ public interface StreamSourceEntityMapper {
/**
* Query the sources with status 20x by the given agent IP and agent UUID.
*/
- List<StreamSourceEntity> selectByStatusAndIpForUpdate(@Param("list") List<Integer> list,
+ List<StreamSourceEntity> selectByStatusAndIpForUpdate(@Param("statusList") List<Integer> statusList,
@Param("agentIp") String agentIp, @Param("uuid") String uuid);
/**
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 1e9124c..874f919 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -251,7 +251,7 @@
</where>
</select>
- <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ <select id="selectByRelatedIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
@@ -261,9 +261,13 @@
<if test="streamId != null and streamId != ''">
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</if>
+ <if test="sourceName != null and sourceName != ''">
+ and source_name = #{sourceName, jdbcType=VARCHAR},
+ </if>
+ for update
</where>
</select>
- <select id="selectByIdAndType" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ <select id="selectByRelatedIdAndTypeForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
@@ -299,7 +303,7 @@
<where>
is_deleted = 0
and status in
- <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
+ <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=",">
#{item}
</foreach>
and agent_ip = #{agentIp, jdbcType=VARCHAR}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index 1ebfdf4..67235b2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -17,10 +17,12 @@
package org.apache.inlong.manager.service.source;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.SourceState;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -35,6 +37,7 @@ import org.springframework.transaction.annotation.Transactional;
import javax.validation.constraints.NotNull;
import java.util.Date;
+import java.util.List;
public abstract class AbstractStreamSourceOperation implements StreamSourceOperation {
@@ -94,7 +97,17 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
}
@Override
+ @Transactional(rollbackFor = Throwable.class)
public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) {
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ String sourceName = request.getSourceName();
+ List<StreamSourceEntity> existList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, sourceName);
+ if (CollectionUtils.isNotEmpty(existList)) {
+ String err = "stream source already exists with groupId=%s, streamId=%s, sourceName=%s";
+ throw new BusinessException(String.format(err, groupId, streamId, sourceName));
+ }
+
StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
if (GroupState.forCode(groupStatus).equals(GroupState.CONFIG_SUCCESSFUL)) {
entity.setStatus(SourceState.TO_BE_ISSUED_ADD.getCode());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 0210737..b93e9f9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -109,7 +109,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
LOGGER.debug("begin to list source by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
+ List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, null);
if (CollectionUtils.isEmpty(entityList)) {
return Collections.emptyList();
}
@@ -253,7 +253,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
nextStatus = SourceState.SOURCE_DISABLE.getCode();
}
Date now = new Date();
- List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
+ List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, null);
if (CollectionUtils.isNotEmpty(entityList)) {
for (StreamSourceEntity entity : entityList) {
Integer id = entity.getId();
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 891a330..3afa66a 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -386,7 +386,7 @@ CREATE TABLE `inlong_stream_ext`
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
KEY `index_stream_id` (`inlong_stream_id`),
- UNIQUE KEY `group_stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`)
+ UNIQUE KEY `unique_group_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`)
);
-- ----------------------------
@@ -582,7 +582,7 @@ CREATE TABLE `stream_source`
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `source_idx` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`)
+ UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`)
);
-- ----------------------------
@@ -608,7 +608,7 @@ CREATE TABLE `stream_sink`
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `sink_idx` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
+ UNIQUE KEY `unique_sink_name` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
);
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index d7a90bb..8a87283 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -407,7 +407,7 @@ CREATE TABLE `inlong_stream_ext`
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
KEY `index_stream_id` (`inlong_stream_id`),
- UNIQUE KEY `group_stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`)
+ UNIQUE KEY `unique_group_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream extension table';
@@ -611,7 +611,7 @@ CREATE TABLE `stream_source`
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `source_idx` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`)
+ UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
@@ -638,7 +638,7 @@ CREATE TABLE `stream_sink`
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `sink_idx` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
+ UNIQUE KEY `unique_sink_name` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink table';