You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/16 10:08:39 UTC
[incubator-inlong] branch master updated: [INLONG-3160][Manager] Deleting stream source failed as the status was not allowed to delete (#3165)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f07d2a5 [INLONG-3160][Manager] Deleting stream source failed as the status was not allowed to delete (#3165)
f07d2a5 is described below
commit f07d2a5f29a89c0291cc5ef47089145d4e36074a
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 16 18:08:32 2022 +0800
[INLONG-3160][Manager] Deleting stream source failed as the status was not allowed to delete (#3165)
---
.../inlong/manager/common/enums/SourceState.java | 79 ++++++++++++++--------
.../dao/mapper/StreamSourceEntityMapper.java | 6 +-
.../resources/mappers/StreamSourceEntityMapper.xml | 22 ++++--
.../service/core/impl/AgentServiceImpl.java | 8 +--
.../source/AbstractStreamSourceOperation.java | 19 ++++--
.../service/source/StreamSourceServiceImpl.java | 29 ++++----
6 files changed, 102 insertions(+), 61 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceState.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceState.java
index 9cd278a..0c16caf 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceState.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceState.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.enums;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -74,49 +75,73 @@ public enum SourceState {
* The set of status allowed updating
*/
public static final Set<Integer> ALLOWED_UPDATE = Sets.newHashSet(
- SOURCE_NEW.getCode(), SOURCE_FAILED.getCode(), SOURCE_FROZEN.getCode());
+ SOURCE_NEW.getCode(), SOURCE_FAILED.getCode(), SOURCE_FROZEN.getCode(),
+ TO_BE_ISSUED_ADD.getCode(), TO_BE_ISSUED_DELETE.getCode(), TO_BE_ISSUED_RETRY.getCode(),
+ TO_BE_ISSUED_BACKTRACK.getCode(), TO_BE_ISSUED_FROZEN.getCode(), TO_BE_ISSUED_ACTIVE.getCode(),
+ TO_BE_ISSUED_CHECK.getCode(), TO_BE_ISSUED_REDO_METRIC.getCode(), TO_BE_ISSUED_MAKEUP.getCode());
- private static final Map<SourceState, Set<SourceState>> SOURCE_FINITE_STATE_AUTOMATON = Maps.newHashMap();
+ public static final Set<SourceState> TOBE_ISSUED_SET = Sets.newHashSet(
+ TO_BE_ISSUED_ADD, TO_BE_ISSUED_DELETE, TO_BE_ISSUED_RETRY,
+ TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_FROZEN, TO_BE_ISSUED_ACTIVE,
+ TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP);
+
+ private static final Map<SourceState, Set<SourceState>> SOURCE_STATE_AUTOMATON = Maps.newHashMap();
static {
// new
- SOURCE_FINITE_STATE_AUTOMATON.put(SOURCE_NEW, Sets.newHashSet(SOURCE_NEW, TO_BE_ISSUED_ADD));
+ SOURCE_STATE_AUTOMATON.put(SOURCE_NEW, Sets.newHashSet(SOURCE_NEW, TO_BE_ISSUED_ADD));
// normal
- SOURCE_FINITE_STATE_AUTOMATON.put(SOURCE_NORMAL,
+ SOURCE_STATE_AUTOMATON.put(SOURCE_NORMAL,
Sets.newHashSet(SOURCE_DISABLE, SOURCE_NORMAL, SOURCE_FAILED, TO_BE_ISSUED_DELETE,
TO_BE_ISSUED_RETRY, TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_FROZEN, TO_BE_ISSUED_ACTIVE,
TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP));
// failed
- SOURCE_FINITE_STATE_AUTOMATON.put(SOURCE_FAILED,
- Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, TO_BE_ISSUED_RETRY));
+ SOURCE_STATE_AUTOMATON.put(SOURCE_FAILED, Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, TO_BE_ISSUED_RETRY));
// frozen
- SOURCE_FINITE_STATE_AUTOMATON.put(SOURCE_FROZEN,
- Sets.newHashSet(SOURCE_DISABLE, SOURCE_FROZEN, TO_BE_ISSUED_ACTIVE));
+ SOURCE_STATE_AUTOMATON.put(SOURCE_FROZEN, Sets.newHashSet(SOURCE_DISABLE, SOURCE_FROZEN, TO_BE_ISSUED_ACTIVE));
// [xxx] bo be issued
- SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, Sets.newHashSet(BEEN_ISSUED_ADD));
- SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(BEEN_ISSUED_DELETE));
- SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(BEEN_ISSUED_RETRY));
- SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(BEEN_ISSUED_BACKTRACK));
- SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_FROZEN, Sets.newHashSet(BEEN_ISSUED_FROZEN));
- SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(BEEN_ISSUED_ACTIVE));
- SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(BEEN_ISSUED_CHECK));
- SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(BEEN_ISSUED_REDO_METRIC));
- SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(BEEN_ISSUED_MAKEUP));
+ HashSet<SourceState> tobeAdd = Sets.newHashSet(BEEN_ISSUED_ADD);
+ tobeAdd.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAdd);
+ HashSet<SourceState> tobeDelete = Sets.newHashSet(BEEN_ISSUED_DELETE);
+ tobeDelete.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(tobeDelete));
+ HashSet<SourceState> tobeRetry = Sets.newHashSet(BEEN_ISSUED_RETRY);
+ tobeRetry.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(tobeRetry));
+ HashSet<SourceState> tobeBacktrack = Sets.newHashSet(BEEN_ISSUED_BACKTRACK);
+ tobeBacktrack.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(tobeBacktrack));
+ HashSet<SourceState> tobeFrozen = Sets.newHashSet(BEEN_ISSUED_FROZEN);
+ tobeFrozen.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_FROZEN, Sets.newHashSet(tobeFrozen));
+ HashSet<SourceState> tobeActive = Sets.newHashSet(BEEN_ISSUED_ACTIVE);
+ tobeActive.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(tobeActive));
+ HashSet<SourceState> tobeCheck = Sets.newHashSet(BEEN_ISSUED_CHECK);
+ tobeCheck.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(tobeCheck));
+ HashSet<SourceState> tobeRedoMetric = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC);
+ tobeRedoMetric.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(tobeRedoMetric));
+ HashSet<SourceState> tobeMakeup = Sets.newHashSet(BEEN_ISSUED_MAKEUP);
+ tobeMakeup.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(tobeMakeup));
// [xxx] been issued
- SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_FROZEN, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
- SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_FROZEN, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+ SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
}
private final Integer code;
@@ -143,7 +168,7 @@ public enum SourceState {
* Whether the `next` state is valid according to the `current` state.
*/
public static boolean isAllowedTransition(SourceState current, SourceState next) {
- Set<SourceState> nextStates = SOURCE_FINITE_STATE_AUTOMATON.get(current);
+ Set<SourceState> nextStates = SOURCE_STATE_AUTOMATON.get(current);
return nextStates != null && nextStates.contains(next);
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 0bfb691..76be150 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -31,7 +31,7 @@ public interface StreamSourceEntityMapper {
int insertSelective(StreamSourceEntity record);
- StreamSourceEntity selectByPrimaryKey(Integer id);
+ StreamSourceEntity selectByIdForUpdate(Integer id);
/**
* According to the inlong group id and inlong stream id, query the number of valid source
@@ -78,7 +78,7 @@ public interface StreamSourceEntityMapper {
/**
* Query the sources with status 20x by the given agent IP and agent UUID.
*/
- List<StreamSourceEntity> selectByStatusAndIp(@Param("list") List<Integer> list,
+ List<StreamSourceEntity> selectByStatusAndIpForUpdate(@Param("list") List<Integer> list,
@Param("agentIp") String agentIp, @Param("uuid") String uuid);
/**
@@ -121,6 +121,6 @@ public interface StreamSourceEntityMapper {
int updateSnapshot(StreamSourceEntity entity);
- int deleteByPrimaryKey(Integer id);
+ int deleteByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 595b69a..1e9124c 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -202,11 +202,12 @@
</trim>
</insert>
- <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ <select id="selectByIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
where id = #{id,jdbcType=INTEGER}
+ for update
</select>
<select id="selectCount" resultType="java.lang.Integer">
select count(1)
@@ -275,6 +276,7 @@
<if test="sourceType != null and sourceType != ''">
and source_type = #{sourceType, jdbcType=VARCHAR}
</if>
+ for update
</where>
</select>
<select id="selectByStatusForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
@@ -290,7 +292,7 @@
limit 2 for update
</where>
</select>
- <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ <select id="selectByStatusAndIpForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
@@ -304,6 +306,7 @@
<if test="uuid != null and uuid != ''">
and uuid = #{uuid, jdbcType=VARCHAR}
</if>
+ for update
</where>
</select>
<select id="selectSourceType" resultType="java.lang.String">
@@ -317,12 +320,16 @@
<if test="streamId != null and streamId != ''">
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</if>
+ for update
</where>
</select>
<select id="selectTempStatusSource" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select id, agent_ip, uuid, status
from stream_source
- where status >= 200
+ <where>
+ status >= 200
+ for update
+ </where>
</select>
<update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
@@ -449,9 +456,14 @@
where id = #{id,jdbcType=INTEGER}
</update>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ <delete id="deleteByRelatedId">
delete
from stream_source
- where id = #{id,jdbcType=INTEGER}
+ where
+ is_deleted = 0
+ and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ <if test="streamId != null and streamId != ''">
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ </if>
</delete>
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index a4fec35..f826977 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -110,6 +110,7 @@ public class AgentServiceImpl implements AgentService {
}
@Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public TaskResult reportAndGetTask(TaskRequest request) {
LOGGER.debug("begin to get agent task: {}", request);
if (request == null || request.getAgentIp() == null) {
@@ -133,7 +134,7 @@ public class AgentServiceImpl implements AgentService {
for (CommandEntity command : request.getCommandInfo()) {
Integer taskId = command.getTaskId();
- StreamSourceEntity current = sourceMapper.selectByPrimaryKey(taskId);
+ StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
if (current == null) {
continue;
}
@@ -171,8 +172,7 @@ public class AgentServiceImpl implements AgentService {
/**
* Get task result by the request
*/
- @Transactional(isolation = Isolation.REPEATABLE_READ)
- TaskResult getTaskResult(TaskRequest request) {
+ private TaskResult getTaskResult(TaskRequest request) {
// Query the tasks that needed to add or active - without agentIp and uuid
List<Integer> addedStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
SourceState.TO_BE_ISSUED_ACTIVE.getCode());
@@ -185,7 +185,7 @@ public class AgentServiceImpl implements AgentService {
SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
String agentIp = request.getAgentIp();
String uuid = request.getUuid();
- List<StreamSourceEntity> entityList = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
+ List<StreamSourceEntity> entityList = sourceMapper.selectByStatusAndIpForUpdate(statusList, agentIp, uuid);
entityList.addAll(addList);
List<DataConfig> dataConfigs = Lists.newArrayList();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index bbec00b..1ebfdf4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -30,6 +30,8 @@ import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
import javax.validation.constraints.NotNull;
import java.util.Date;
@@ -63,8 +65,9 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
protected abstract SourceResponse getResponse();
@Override
+ @Transactional(isolation = Isolation.REPEATABLE_READ)
public SourceResponse getById(@NotNull Integer id) {
- StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(id);
+ StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
@@ -73,12 +76,13 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
}
@Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public void updateOpt(SourceRequest request, String operator) {
- StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(request.getId());
+ StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
if (!SourceState.ALLOWED_UPDATE.contains(entity.getStatus())) {
throw new RuntimeException(String.format("Source=%s is not allowed to update, "
- + "please stop / frozen / delete it firstly", entity));
+ + "please wait until its changed to final status or stop / frozen / delete it firstly", entity));
}
// Setting updated parameters of stream source entity.
@@ -110,8 +114,9 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
}
@Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public void stopOpt(SourceRequest request, String operator) {
- StreamSourceEntity snapshot = sourceMapper.selectByPrimaryKey(request.getId());
+ StreamSourceEntity snapshot = sourceMapper.selectByIdForUpdate(request.getId());
SourceState curState = SourceState.forCode(snapshot.getStatus());
SourceState nextState = SourceState.TO_BE_ISSUED_FROZEN;
if (!SourceState.isAllowedTransition(curState, nextState)) {
@@ -125,8 +130,9 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
}
@Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public void restartOpt(SourceRequest request, String operator) {
- StreamSourceEntity snapshot = sourceMapper.selectByPrimaryKey(request.getId());
+ StreamSourceEntity snapshot = sourceMapper.selectByIdForUpdate(request.getId());
SourceState curState = SourceState.forCode(snapshot.getStatus());
SourceState nextState = SourceState.TO_BE_ISSUED_ACTIVE;
if (!SourceState.isAllowedTransition(curState, nextState)) {
@@ -141,9 +147,10 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
}
@Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public void deleteOpt(SourceRequest request, String operator) {
Integer id = request.getId();
- StreamSourceEntity existEntity = sourceMapper.selectByPrimaryKey(id);
+ StreamSourceEntity existEntity = sourceMapper.selectByIdForUpdate(id);
SourceState curState = SourceState.forCode(existEntity.getStatus());
SourceState nextState = SourceState.TO_BE_ISSUED_DELETE;
if (!SourceState.isAllowedTransition(curState, nextState)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 62c9451..0210737 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -22,11 +22,6 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.Constant;
@@ -48,8 +43,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
/**
* Implementation of source service interface
*/
@@ -188,7 +190,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
LOGGER.info("begin to delete source by id={}, sourceType={}", id, sourceType);
Preconditions.checkNotNull(id, Constant.ID_IS_EMPTY);
- StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(id);
+ StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
@@ -202,11 +204,10 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public boolean restart(Integer id, String sourceType, String operator) {
LOGGER.info("begin to restart source by id={}, sourceType={}", id, sourceType);
- Preconditions.checkNotNull(id, Constant.ID_IS_EMPTY);
-
- StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(id);
+ StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
@@ -220,9 +221,10 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public boolean stop(Integer id, String sourceType, String operator) {
LOGGER.info("begin to stop source by id={}, sourceType={}", id, sourceType);
- StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(id);
+ StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
@@ -278,12 +280,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
// Check if it can be deleted
commonOperateService.checkGroupStatus(groupId, operator);
-
- List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
- if (CollectionUtils.isNotEmpty(entityList)) {
- entityList.forEach(entity -> sourceMapper.deleteByPrimaryKey(entity.getId()));
- }
-
+ sourceMapper.deleteByRelatedId(groupId, streamId);
LOGGER.info("success to delete all source by groupId={}, streamId={}", groupId, streamId);
return true;
}