You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/16 10:08:39 UTC

[incubator-inlong] branch master updated: [INLONG-3160][Manager] Deleting stream source failed as the status was not allowed to delete (#3165)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f07d2a5  [INLONG-3160][Manager] Deleting stream source failed as the status was not allowed to delete (#3165)
f07d2a5 is described below

commit f07d2a5f29a89c0291cc5ef47089145d4e36074a
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 16 18:08:32 2022 +0800

    [INLONG-3160][Manager] Deleting stream source failed as the status was not allowed to delete (#3165)
---
 .../inlong/manager/common/enums/SourceState.java   | 79 ++++++++++++++--------
 .../dao/mapper/StreamSourceEntityMapper.java       |  6 +-
 .../resources/mappers/StreamSourceEntityMapper.xml | 22 ++++--
 .../service/core/impl/AgentServiceImpl.java        |  8 +--
 .../source/AbstractStreamSourceOperation.java      | 19 ++++--
 .../service/source/StreamSourceServiceImpl.java    | 29 ++++----
 6 files changed, 102 insertions(+), 61 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceState.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceState.java
index 9cd278a..0c16caf 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceState.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceState.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.enums;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -74,49 +75,73 @@ public enum SourceState {
      * The set of status allowed updating
      */
     public static final Set<Integer> ALLOWED_UPDATE = Sets.newHashSet(
-            SOURCE_NEW.getCode(), SOURCE_FAILED.getCode(), SOURCE_FROZEN.getCode());
+            SOURCE_NEW.getCode(), SOURCE_FAILED.getCode(), SOURCE_FROZEN.getCode(),
+            TO_BE_ISSUED_ADD.getCode(), TO_BE_ISSUED_DELETE.getCode(), TO_BE_ISSUED_RETRY.getCode(),
+            TO_BE_ISSUED_BACKTRACK.getCode(), TO_BE_ISSUED_FROZEN.getCode(), TO_BE_ISSUED_ACTIVE.getCode(),
+            TO_BE_ISSUED_CHECK.getCode(), TO_BE_ISSUED_REDO_METRIC.getCode(), TO_BE_ISSUED_MAKEUP.getCode());
 
-    private static final Map<SourceState, Set<SourceState>> SOURCE_FINITE_STATE_AUTOMATON = Maps.newHashMap();
+    public static final Set<SourceState> TOBE_ISSUED_SET = Sets.newHashSet(
+            TO_BE_ISSUED_ADD, TO_BE_ISSUED_DELETE, TO_BE_ISSUED_RETRY,
+            TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_FROZEN, TO_BE_ISSUED_ACTIVE,
+            TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP);
+
+    private static final Map<SourceState, Set<SourceState>> SOURCE_STATE_AUTOMATON = Maps.newHashMap();
 
     static {
         // new
-        SOURCE_FINITE_STATE_AUTOMATON.put(SOURCE_NEW, Sets.newHashSet(SOURCE_NEW, TO_BE_ISSUED_ADD));
+        SOURCE_STATE_AUTOMATON.put(SOURCE_NEW, Sets.newHashSet(SOURCE_NEW, TO_BE_ISSUED_ADD));
 
         // normal
-        SOURCE_FINITE_STATE_AUTOMATON.put(SOURCE_NORMAL,
+        SOURCE_STATE_AUTOMATON.put(SOURCE_NORMAL,
                 Sets.newHashSet(SOURCE_DISABLE, SOURCE_NORMAL, SOURCE_FAILED, TO_BE_ISSUED_DELETE,
                         TO_BE_ISSUED_RETRY, TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_FROZEN, TO_BE_ISSUED_ACTIVE,
                         TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP));
 
         // failed
-        SOURCE_FINITE_STATE_AUTOMATON.put(SOURCE_FAILED,
-                Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, TO_BE_ISSUED_RETRY));
+        SOURCE_STATE_AUTOMATON.put(SOURCE_FAILED, Sets.newHashSet(SOURCE_DISABLE, SOURCE_FAILED, TO_BE_ISSUED_RETRY));
 
         // frozen
-        SOURCE_FINITE_STATE_AUTOMATON.put(SOURCE_FROZEN,
-                Sets.newHashSet(SOURCE_DISABLE, SOURCE_FROZEN, TO_BE_ISSUED_ACTIVE));
+        SOURCE_STATE_AUTOMATON.put(SOURCE_FROZEN, Sets.newHashSet(SOURCE_DISABLE, SOURCE_FROZEN, TO_BE_ISSUED_ACTIVE));
 
         // [xxx] bo be issued
-        SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, Sets.newHashSet(BEEN_ISSUED_ADD));
-        SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(BEEN_ISSUED_DELETE));
-        SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(BEEN_ISSUED_RETRY));
-        SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(BEEN_ISSUED_BACKTRACK));
-        SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_FROZEN, Sets.newHashSet(BEEN_ISSUED_FROZEN));
-        SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(BEEN_ISSUED_ACTIVE));
-        SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(BEEN_ISSUED_CHECK));
-        SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(BEEN_ISSUED_REDO_METRIC));
-        SOURCE_FINITE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(BEEN_ISSUED_MAKEUP));
+        HashSet<SourceState> tobeAdd = Sets.newHashSet(BEEN_ISSUED_ADD);
+        tobeAdd.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAdd);
+        HashSet<SourceState> tobeDelete = Sets.newHashSet(BEEN_ISSUED_DELETE);
+        tobeDelete.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(tobeDelete));
+        HashSet<SourceState> tobeRetry = Sets.newHashSet(BEEN_ISSUED_RETRY);
+        tobeRetry.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(tobeRetry));
+        HashSet<SourceState> tobeBacktrack = Sets.newHashSet(BEEN_ISSUED_BACKTRACK);
+        tobeBacktrack.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(tobeBacktrack));
+        HashSet<SourceState> tobeFrozen = Sets.newHashSet(BEEN_ISSUED_FROZEN);
+        tobeFrozen.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_FROZEN, Sets.newHashSet(tobeFrozen));
+        HashSet<SourceState> tobeActive = Sets.newHashSet(BEEN_ISSUED_ACTIVE);
+        tobeActive.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(tobeActive));
+        HashSet<SourceState> tobeCheck = Sets.newHashSet(BEEN_ISSUED_CHECK);
+        tobeCheck.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(tobeCheck));
+        HashSet<SourceState> tobeRedoMetric = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC);
+        tobeRedoMetric.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(tobeRedoMetric));
+        HashSet<SourceState> tobeMakeup = Sets.newHashSet(BEEN_ISSUED_MAKEUP);
+        tobeMakeup.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(tobeMakeup));
 
         // [xxx] been issued
-        SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_FROZEN, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
-        SOURCE_FINITE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_DELETE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_RETRY, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_BACKTRACK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_FROZEN, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ACTIVE, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_CHECK, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_REDO_METRIC, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
+        SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_MAKEUP, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
     }
 
     private final Integer code;
@@ -143,7 +168,7 @@ public enum SourceState {
      * Whether the `next` state is valid according to the `current` state.
      */
     public static boolean isAllowedTransition(SourceState current, SourceState next) {
-        Set<SourceState> nextStates = SOURCE_FINITE_STATE_AUTOMATON.get(current);
+        Set<SourceState> nextStates = SOURCE_STATE_AUTOMATON.get(current);
         return nextStates != null && nextStates.contains(next);
     }
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 0bfb691..76be150 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -31,7 +31,7 @@ public interface StreamSourceEntityMapper {
 
     int insertSelective(StreamSourceEntity record);
 
-    StreamSourceEntity selectByPrimaryKey(Integer id);
+    StreamSourceEntity selectByIdForUpdate(Integer id);
 
     /**
      * According to the inlong group id and inlong stream id, query the number of valid source
@@ -78,7 +78,7 @@ public interface StreamSourceEntityMapper {
     /**
      * Query the sources with status 20x by the given agent IP and agent UUID.
      */
-    List<StreamSourceEntity> selectByStatusAndIp(@Param("list") List<Integer> list,
+    List<StreamSourceEntity> selectByStatusAndIpForUpdate(@Param("list") List<Integer> list,
             @Param("agentIp") String agentIp, @Param("uuid") String uuid);
 
     /**
@@ -121,6 +121,6 @@ public interface StreamSourceEntityMapper {
 
     int updateSnapshot(StreamSourceEntity entity);
 
-    int deleteByPrimaryKey(Integer id);
+    int deleteByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 595b69a..1e9124c 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -202,11 +202,12 @@
         </trim>
     </insert>
 
-    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+    <select id="selectByIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
         where id = #{id,jdbcType=INTEGER}
+        for update
     </select>
     <select id="selectCount" resultType="java.lang.Integer">
         select count(1)
@@ -275,6 +276,7 @@
             <if test="sourceType != null and sourceType != ''">
                 and source_type = #{sourceType, jdbcType=VARCHAR}
             </if>
+            for update
         </where>
     </select>
     <select id="selectByStatusForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
@@ -290,7 +292,7 @@
             limit 2 for update
         </where>
     </select>
-    <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+    <select id="selectByStatusAndIpForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
@@ -304,6 +306,7 @@
             <if test="uuid != null and uuid != ''">
                 and uuid = #{uuid, jdbcType=VARCHAR}
             </if>
+            for update
         </where>
     </select>
     <select id="selectSourceType" resultType="java.lang.String">
@@ -317,12 +320,16 @@
             <if test="streamId != null and streamId != ''">
                 and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
             </if>
+            for update
         </where>
     </select>
     <select id="selectTempStatusSource" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select id, agent_ip, uuid, status
         from stream_source
-        where status >= 200
+        <where>
+            status >= 200
+            for update
+        </where>
     </select>
 
     <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
@@ -449,9 +456,14 @@
         where id = #{id,jdbcType=INTEGER}
     </update>
 
-    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+    <delete id="deleteByRelatedId">
         delete
         from stream_source
-        where id = #{id,jdbcType=INTEGER}
+        where
+        is_deleted = 0
+        and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+        <if test="streamId != null and streamId != ''">
+            and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+        </if>
     </delete>
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index a4fec35..f826977 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -110,6 +110,7 @@ public class AgentServiceImpl implements AgentService {
     }
 
     @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
     public TaskResult reportAndGetTask(TaskRequest request) {
         LOGGER.debug("begin to get agent task: {}", request);
         if (request == null || request.getAgentIp() == null) {
@@ -133,7 +134,7 @@ public class AgentServiceImpl implements AgentService {
 
         for (CommandEntity command : request.getCommandInfo()) {
             Integer taskId = command.getTaskId();
-            StreamSourceEntity current = sourceMapper.selectByPrimaryKey(taskId);
+            StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
             if (current == null) {
                 continue;
             }
@@ -171,8 +172,7 @@ public class AgentServiceImpl implements AgentService {
     /**
      * Get task result by the request
      */
-    @Transactional(isolation = Isolation.REPEATABLE_READ)
-    TaskResult getTaskResult(TaskRequest request) {
+    private TaskResult getTaskResult(TaskRequest request) {
         // Query the tasks that needed to add or active - without agentIp and uuid
         List<Integer> addedStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
                 SourceState.TO_BE_ISSUED_ACTIVE.getCode());
@@ -185,7 +185,7 @@ public class AgentServiceImpl implements AgentService {
                 SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
         String agentIp = request.getAgentIp();
         String uuid = request.getUuid();
-        List<StreamSourceEntity> entityList = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByStatusAndIpForUpdate(statusList, agentIp, uuid);
         entityList.addAll(addList);
 
         List<DataConfig> dataConfigs = Lists.newArrayList();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index bbec00b..1ebfdf4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -30,6 +30,8 @@ import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
 
 import javax.validation.constraints.NotNull;
 import java.util.Date;
@@ -63,8 +65,9 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
     protected abstract SourceResponse getResponse();
 
     @Override
+    @Transactional(isolation = Isolation.REPEATABLE_READ)
     public SourceResponse getById(@NotNull Integer id) {
-        StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(id);
+        StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
@@ -73,12 +76,13 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
     }
 
     @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
     public void updateOpt(SourceRequest request, String operator) {
-        StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(request.getId());
+        StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         if (!SourceState.ALLOWED_UPDATE.contains(entity.getStatus())) {
             throw new RuntimeException(String.format("Source=%s is not allowed to update, "
-                    + "please stop / frozen / delete it firstly", entity));
+                    + "please wait until its changed to final status or stop / frozen / delete it firstly", entity));
         }
 
         // Setting updated parameters of stream source entity.
@@ -110,8 +114,9 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
     }
 
     @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
     public void stopOpt(SourceRequest request, String operator) {
-        StreamSourceEntity snapshot = sourceMapper.selectByPrimaryKey(request.getId());
+        StreamSourceEntity snapshot = sourceMapper.selectByIdForUpdate(request.getId());
         SourceState curState = SourceState.forCode(snapshot.getStatus());
         SourceState nextState = SourceState.TO_BE_ISSUED_FROZEN;
         if (!SourceState.isAllowedTransition(curState, nextState)) {
@@ -125,8 +130,9 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
     }
 
     @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
     public void restartOpt(SourceRequest request, String operator) {
-        StreamSourceEntity snapshot = sourceMapper.selectByPrimaryKey(request.getId());
+        StreamSourceEntity snapshot = sourceMapper.selectByIdForUpdate(request.getId());
         SourceState curState = SourceState.forCode(snapshot.getStatus());
         SourceState nextState = SourceState.TO_BE_ISSUED_ACTIVE;
         if (!SourceState.isAllowedTransition(curState, nextState)) {
@@ -141,9 +147,10 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
     }
 
     @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
     public void deleteOpt(SourceRequest request, String operator) {
         Integer id = request.getId();
-        StreamSourceEntity existEntity = sourceMapper.selectByPrimaryKey(id);
+        StreamSourceEntity existEntity = sourceMapper.selectByIdForUpdate(id);
         SourceState curState = SourceState.forCode(existEntity.getStatus());
         SourceState nextState = SourceState.TO_BE_ISSUED_DELETE;
         if (!SourceState.isAllowedTransition(curState, nextState)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 62c9451..0210737 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -22,11 +22,6 @@ import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.Constant;
@@ -48,8 +43,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Implementation of source service interface
  */
@@ -188,7 +190,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         LOGGER.info("begin to delete source by id={}, sourceType={}", id, sourceType);
         Preconditions.checkNotNull(id, Constant.ID_IS_EMPTY);
 
-        StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(id);
+        StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
@@ -202,11 +204,10 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
     public boolean restart(Integer id, String sourceType, String operator) {
         LOGGER.info("begin to restart source by id={}, sourceType={}", id, sourceType);
-        Preconditions.checkNotNull(id, Constant.ID_IS_EMPTY);
-
-        StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(id);
+        StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
@@ -220,9 +221,10 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
     public boolean stop(Integer id, String sourceType, String operator) {
         LOGGER.info("begin to stop source by id={}, sourceType={}", id, sourceType);
-        StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(id);
+        StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
@@ -278,12 +280,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
         // Check if it can be deleted
         commonOperateService.checkGroupStatus(groupId, operator);
-
-        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
-        if (CollectionUtils.isNotEmpty(entityList)) {
-            entityList.forEach(entity -> sourceMapper.deleteByPrimaryKey(entity.getId()));
-        }
-
+        sourceMapper.deleteByRelatedId(groupId, streamId);
         LOGGER.info("success to delete all source by groupId={}, streamId={}", groupId, streamId);
         return true;
     }