You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/23 08:39:39 UTC

[incubator-inlong] branch master updated: [INLONG-3312][Manager] Optimize the delete operation for stream source (#3320)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bf33f7  [INLONG-3312][Manager] Optimize the delete operation for stream source (#3320)
7bf33f7 is described below

commit 7bf33f791d40837acdc7424af57d801b7a5e1713
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 23 16:39:35 2022 +0800

    [INLONG-3312][Manager] Optimize the delete operation for stream source (#3320)
---
 .../dao/mapper/StreamSourceEntityMapper.java       | 33 ++----------
 .../resources/mappers/StreamSourceEntityMapper.xml | 56 ++-----------------
 .../service/core/impl/AgentServiceImpl.java        | 23 ++++----
 .../manager/service/sink/StreamSinkService.java    | 18 +++----
 .../service/sink/StreamSinkServiceImpl.java        | 62 +++++++++-------------
 .../source/AbstractStreamSourceOperation.java      |  5 +-
 .../service/source/SourceSnapshotOperation.java    |  3 +-
 .../service/source/StreamSourceService.java        | 10 ----
 .../service/source/StreamSourceServiceImpl.java    | 40 ++++----------
 9 files changed, 73 insertions(+), 177 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 0eee410..9603275 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -22,7 +22,6 @@ import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.springframework.stereotype.Repository;
 
-import java.util.Date;
 import java.util.List;
 
 @Repository
@@ -36,8 +35,6 @@ public interface StreamSourceEntityMapper {
 
     /**
      * Query un-deleted sources by the given agentIp.
-     *
-     * @apiNote Sources with is_deleted > 0 need to be filtered.
      */
     List<StreamSourceEntity> selectByAgentIp(@Param("agentIp") String agentIp);
 
@@ -58,30 +55,14 @@ public interface StreamSourceEntityMapper {
             @Param("sourceName") String sourceName);
 
     /**
-     * According to the group id, stream id and source type, query valid source entity list.
-     */
-    List<StreamSourceEntity> selectByRelatedIdForUpdate(@Param("groupId") String groupId,
-            @Param("streamId") String streamId, @Param("sourceType") String sourceType);
-
-    /**
-     * Query the tasks that need to be added for update.
-     */
-    List<StreamSourceEntity> selectByStatusForUpdate(@Param("list") List<Integer> list);
-
-
-    /**
-     * Query the tasks that need to be added.
+     * Query the tasks by the given status list.
      */
     List<StreamSourceEntity> selectByStatus(@Param("list") List<Integer> list);
 
     /**
-     * Query the sources with status 20x by the given agent IP and agent UUID for update.
-     */
-    List<StreamSourceEntity> selectByStatusAndIpForUpdate(@Param("statusList") List<Integer> statusList,
-            @Param("agentIp") String agentIp, @Param("uuid") String uuid);
-
-    /**
      * Query the sources with status 20x by the given agent IP and agent UUID.
+     *
+     * @apiNote Sources with is_deleted > 0 need to be filtered.
      */
     List<StreamSourceEntity> selectByStatusAndIp(@Param("statusList") List<Integer> statusList,
             @Param("agentIp") String agentIp, @Param("uuid") String uuid);
@@ -97,11 +78,9 @@ public interface StreamSourceEntityMapper {
 
     /**
      * Update the status to `nextStatus` by the given id.
-     *
-     * @apiNote Should not change the modify_time
      */
     int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus,
-            @Param("modifyTime") Date modifyTime);
+            @Param("changeTime") Boolean changeModifyTime);
 
     /**
      * Update the status to `nextStatus` by the given group id and stream id.
@@ -113,11 +92,9 @@ public interface StreamSourceEntityMapper {
 
     /**
      * Update the agentIp and uuid.
-     *
-     * @apiNote Should not change the modify_time
      */
     int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @Param("uuid") String uuid,
-            @Param("modifyTime") Date modifyTime);
+            @Param("changeTime") Boolean changeModifyTime);
 
     int updateSnapshot(StreamSourceEntity entity);
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index be1c3c0..963e4bb 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -146,35 +146,6 @@
             </if>
         </where>
     </select>
-    <select id="selectByRelatedIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
-        select
-        <include refid="Base_Column_List"/>
-        from stream_source
-        <where>
-            is_deleted = 0
-            and inlong_group_id = #{groupId, jdbcType=VARCHAR}
-            <if test="streamId != null and streamId != ''">
-                and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
-            </if>
-            <if test="sourceType != null and sourceType != ''">
-                and source_type = #{sourceType, jdbcType=VARCHAR}
-            </if>
-            for update
-        </where>
-    </select>
-    <select id="selectByStatusForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
-        select
-        <include refid="Base_Column_List"/>
-        from stream_source
-        <where>
-            is_deleted = 0
-            and status in
-            <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
-                #{item}
-            </foreach>
-            limit 2 for update
-        </where>
-    </select>
     <select id="selectByStatus" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
@@ -185,23 +156,7 @@
             <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
                 #{item}
             </foreach>
-        </where>
-    </select>
-    <select id="selectByStatusAndIpForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
-        select
-        <include refid="Base_Column_List"/>
-        from stream_source
-        <where>
-            is_deleted = 0
-            and status in
-            <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=",">
-                #{item}
-            </foreach>
-            and agent_ip = #{agentIp, jdbcType=VARCHAR}
-            <if test="uuid != null and uuid != ''">
-                and uuid = #{uuid, jdbcType=VARCHAR}
-            </if>
-            for update
+            limit 2
         </where>
     </select>
     <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
@@ -209,7 +164,6 @@
         <include refid="Base_Column_List"/>
         from stream_source
         <where>
-            is_deleted = 0
             and status in
             <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=",">
                 #{item}
@@ -333,8 +287,8 @@
         update stream_source
         set previous_status = status,
         status = #{nextStatus, jdbcType=INTEGER}
-        <if test="modifyTime != null">
-            , modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+        <if test="changeTime == false">
+            , modify_time = modify_time
         </if>
         where id = #{id, jdbcType=INTEGER}
     </update>
@@ -353,8 +307,8 @@
         update stream_source
         set agent_ip = #{agentIp,jdbcType=VARCHAR},
         uuid = #{uuid,jdbcType=VARCHAR}
-        <if test="modifyTime != null">
-            , modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+        <if test="changeTime == false">
+            , modify_time = modify_time
         </if>
         where id = #{id, jdbcType=INTEGER}
     </update>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 7f67bed..3ecde6c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -155,8 +155,8 @@ public class AgentServiceImpl implements AgentService {
                 nextStatus = SourceState.SOURCE_FAILED.getCode();
             }
 
-            sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
-            LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
+            sourceMapper.updateStatus(taskId, nextStatus, false);
+            LOGGER.info("update stream source status to [{}] for id [{}]", nextStatus, taskId);
         }
     }
 
@@ -172,9 +172,9 @@ public class AgentServiceImpl implements AgentService {
         }
 
         // Query the tasks that needed to add or active - without agentIp and uuid
-        List<Integer> addedStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
+        List<Integer> needAddStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
                 SourceState.TO_BE_ISSUED_ACTIVE.getCode());
-        List<StreamSourceEntity> entityList = sourceMapper.selectByStatus(addedStatusList);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByStatus(needAddStatusList);
 
         String agentIp = request.getAgentIp();
         String uuid = request.getUuid();
@@ -183,8 +183,8 @@ public class AgentServiceImpl implements AgentService {
                 SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
                 SourceState.TO_BE_ISSUED_FROZEN.getCode(), SourceState.TO_BE_ISSUED_CHECK.getCode(),
                 SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
-        List<StreamSourceEntity> addedList = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
-        entityList.addAll(addedList);
+        List<StreamSourceEntity> needIssuedList = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
+        entityList.addAll(needIssuedList);
 
         List<DataConfig> dataConfigs = Lists.newArrayList();
         for (StreamSourceEntity entity : entityList) {
@@ -195,7 +195,7 @@ public class AgentServiceImpl implements AgentService {
             int op = status % MODULUS_100;
             if (status / MODULUS_100 == UNISSUED_STATUS) {
                 int nextStatus = ISSUED_STATUS * MODULUS_100 + op;
-                sourceMapper.updateStatus(id, nextStatus, entity.getModifyTime());
+                sourceMapper.updateStatus(id, nextStatus, false);
                 LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, id);
             } else {
                 LOGGER.info("skip task status not in 20x, id={}", id);
@@ -211,7 +211,7 @@ public class AgentServiceImpl implements AgentService {
         // Update agentIp and uuid for the added and active tasks
         for (StreamSourceEntity entity : entityList) {
             if (StringUtils.isEmpty(entity.getAgentIp())) {
-                sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, entity.getModifyTime());
+                sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, false);
                 LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}]", agentIp, uuid, entity.getId());
             }
         }
@@ -239,7 +239,12 @@ public class AgentServiceImpl implements AgentService {
         dataConfig.setInlongGroupId(groupId);
         dataConfig.setInlongStreamId(streamId);
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
-        dataConfig.setSyncSend(streamEntity.getSyncSend());
+        if (streamEntity != null) {
+            dataConfig.setSyncSend(streamEntity.getSyncSend());
+        } else {
+            dataConfig.setSyncSend(0);
+            LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", groupId, streamId);
+        }
         return dataConfig;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index dc83bb2..b04784b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -95,6 +95,15 @@ public interface StreamSinkService {
     Boolean update(SinkRequest sinkRequest, String operator);
 
     /**
+     * Modify sink data status.
+     *
+     * @param id Sink id.
+     * @param status Target status.
+     * @param log Modify the log.
+     */
+    void updateStatus(int id, int status, String log);
+
+    /**
      * Delete the stream sink by the given id and sink type.
      *
      * @param id The primary key of the sink.
@@ -105,15 +114,6 @@ public interface StreamSinkService {
     Boolean delete(Integer id, String sinkType, String operator);
 
     /**
-     * Modify sink data status.
-     *
-     * @param id Sink id.
-     * @param status Target status.
-     * @param log Modify the log.
-     */
-    void updateStatus(int id, int status, String log);
-
-    /**
      * Logically delete stream sink with the given conditions.
      *
      * @param groupId InLong group id to which the data source belongs.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 7c9290d..f030959 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -95,9 +95,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Integer save(SinkRequest request, String operator) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin to save sink info=" + request);
-        }
+        LOGGER.info("begin to save sink info: {}", request);
         this.checkParams(request);
 
         // Check if it can be added
@@ -120,16 +118,15 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
         }
 
-        LOGGER.info("success to save sink info");
+        LOGGER.info("success to save sink info: {}", request);
         return id;
     }
 
     @Override
     public SinkResponse get(Integer id, String sinkType) {
-        LOGGER.debug("begin to get sink by id={}, sinkType={}", id, sinkType);
         StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
         SinkResponse sinkResponse = operation.getById(sinkType, id);
-        LOGGER.debug("success to get sink info");
+        LOGGER.debug("success to get sink by id={}, sinkType={}", id, sinkType);
         return sinkResponse;
     }
 
@@ -142,9 +139,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
     @Override
     public List<SinkResponse> listSink(String groupId, String streamId) {
-        LOGGER.debug("begin to list sink by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-
         List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
         if (CollectionUtils.isEmpty(entityList)) {
             return Collections.emptyList();
@@ -152,30 +147,25 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         List<SinkResponse> responseList = new ArrayList<>();
         entityList.forEach(entity -> responseList.add(this.get(entity.getId(), entity.getSinkType())));
 
-        LOGGER.info("success to list sink");
+        LOGGER.debug("success to list sink by groupId={}, streamId={}", groupId, streamId);
         return responseList;
     }
 
     @Override
     public List<SinkBriefResponse> listBrief(String groupId, String streamId) {
-        LOGGER.debug("begin to list sink summary by groupId=" + groupId + ", streamId=" + streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
         Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
 
         // Query all sink information and encapsulate it in the result set
         List<SinkBriefResponse> summaryList = sinkMapper.selectSummary(groupId, streamId);
 
-        LOGGER.debug("success to list sink summary");
+        LOGGER.debug("success to list sink summary by groupId=" + groupId + ", streamId=" + streamId);
         return summaryList;
     }
 
     @Override
     public PageInfo<? extends SinkListResponse> listByCondition(SinkPageRequest request) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin to list sink page by " + request);
-        }
         Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
-
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         List<StreamSinkEntity> entityPage = sinkMapper.selectByCondition(request);
         Map<SinkType, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
@@ -193,14 +183,14 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         // Encapsulate the paging query results into the PageInfo object to obtain related paging information
         PageInfo<? extends SinkListResponse> pageInfo = PageInfo.of(sinkListResponses);
 
-        LOGGER.debug("success to list sink page");
+        LOGGER.debug("success to list sink page, result size {}", pageInfo.getSize());
         return pageInfo;
     }
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean update(SinkRequest request, String operator) {
-        LOGGER.info("begin to update sink info={}", request);
+        LOGGER.info("begin to update sink info: {}", request);
         this.checkParams(request);
         Preconditions.checkNotNull(request.getId(), Constant.ID_IS_EMPTY);
 
@@ -219,10 +209,21 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
             executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
         }
-        LOGGER.info("success to update sink info");
+        LOGGER.info("success to update sink info: {}", request);
         return true;
     }
 
+    @Override
+    public void updateStatus(int id, int status, String log) {
+        StreamSinkEntity entity = new StreamSinkEntity();
+        entity.setId(id);
+        entity.setStatus(status);
+        entity.setOperateLog(log);
+        sinkMapper.updateStatus(entity);
+
+        LOGGER.info("success to update sink status={} for id={} with log: {}", status, id, log);
+    }
+
     @Transactional(rollbackFor = Throwable.class)
     @Override
     public Boolean delete(Integer id, String sinkType, String operator) {
@@ -240,23 +241,13 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         entity.setModifier(operator);
         entity.setModifyTime(new Date());
         sinkMapper.updateByPrimaryKeySelective(entity);
-
         sinkFieldMapper.logicDeleteAll(id);
 
-        LOGGER.info("success to delete sink info");
+        LOGGER.info("success to delete sink info: {}", entity);
         return true;
     }
 
     @Override
-    public void updateStatus(int id, int status, String log) {
-        StreamSinkEntity entity = new StreamSinkEntity();
-        entity.setId(id);
-        entity.setStatus(status);
-        entity.setOperateLog(log);
-        sinkMapper.updateStatus(entity);
-    }
-
-    @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
         LOGGER.info("begin to logic delete all sink info by groupId={}, streamId={}", groupId, streamId);
@@ -322,21 +313,18 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
     @Override
     public List<String> getSinkTypeList(String groupId, String streamId) {
-        LOGGER.debug("begin to get sink type list by groupId={}, streamId={}", groupId, streamId);
         if (StringUtils.isEmpty(streamId)) {
             return Collections.emptyList();
         }
 
         List<String> resultList = sinkMapper.selectSinkType(groupId, streamId);
-        LOGGER.debug("success to get sink type list, result sinkType={}", resultList);
+        LOGGER.debug("success to get sink type by groupId={}, streamId={}, result={}", groupId, streamId, resultList);
         return resultList;
     }
 
     @Override
     public Boolean updateAfterApprove(List<SinkApproveDTO> approveList, String operator) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin to update sink after approve={}", approveList);
-        }
+        LOGGER.info("begin to update sink after approve: {}", approveList);
         if (CollectionUtils.isEmpty(approveList)) {
             return true;
         }
@@ -358,7 +346,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             sinkMapper.updateByPrimaryKeySelective(entity);
         }
 
-        LOGGER.info("success to update sink after approve");
+        LOGGER.info("success to update sink after approve: {}", approveList);
         return true;
     }
 
@@ -392,13 +380,13 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         @Override
         public void run() {
             String groupId = inlongGroupEntity.getInlongGroupId();
-            LOGGER.info("begin start inlong stream workflow, groupId={}, streamId={}", groupId, streamId);
+            LOGGER.info("begin start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
 
             InlongGroupInfo groupInfo = CommonBeanUtils.copyProperties(inlongGroupEntity, InlongGroupInfo::new);
             GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, streamId);
 
             workflowService.start(ProcessName.CREATE_STREAM_RESOURCE, operator, form);
-            LOGGER.info("success start inlong stream workflow, groupId={}, streamId={}", groupId, streamId);
+            LOGGER.info("success start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
         }
 
         /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index 918ce16..c3eafd9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.manager.service.source;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -41,7 +40,9 @@ import javax.validation.constraints.NotNull;
 import java.util.Date;
 import java.util.List;
 
-@Slf4j
+/**
+ * Default operation of stream source.
+ */
 public abstract class AbstractStreamSourceOperation implements StreamSourceOperation {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamSourceOperation.class);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
index 713ebdd..ae5e6ee 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
@@ -142,8 +142,7 @@ public class SourceSnapshotOperation implements AutoCloseable {
                 Integer status = idStatusMap.get(id);
                 if (SourceState.TEMP_TO_NORMAL.contains(status)) {
                     isInvalid = true;
-                    StreamSourceEntity source = sourceMapper.selectByIdForUpdate(id);
-                    sourceMapper.updateStatus(id, SourceState.SOURCE_NORMAL.getCode(), source.getModifyTime());
+                    sourceMapper.updateStatus(id, SourceState.SOURCE_NORMAL.getCode(), false);
                 }
             }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 5f876b3..2cd3de8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -84,16 +84,6 @@ public interface StreamSourceService {
     boolean update(SourceRequest sourceRequest, String operator);
 
     /**
-     * Update source status.
-     *
-     * @param id The source id.
-     * @param targetStatus The target status.
-     * @param operator The operator name.
-     * @return whether succeed
-     */
-    boolean updateStatus(Integer id, Integer targetStatus, String operator);
-
-    /**
      * Update source status by the given groupId and streamId
      *
      * @param groupId The belongs group id.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 3de257a..40bf56b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -71,9 +71,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
     public Integer save(SourceRequest request, String operator) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin to save source info=" + request);
-        }
+        LOGGER.info("begin to save source info: {}", request);
         this.checkParams(request);
 
         // Check if it can be added
@@ -85,16 +83,15 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
         int id = operation.saveOpt(request, groupEntity.getStatus(), operator);
 
-        LOGGER.info("success to save source info");
+        LOGGER.info("success to save source info: {}", request);
         return id;
     }
 
     @Override
     public SourceResponse get(Integer id, String sourceType) {
-        LOGGER.debug("begin to get source by id={}, sourceType={}", id, sourceType);
         StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
         SourceResponse sourceResponse = operation.getById(id);
-        LOGGER.debug("success to get source info");
+        LOGGER.debug("success to get source by id={}", id);
         return sourceResponse;
     }
 
@@ -107,27 +104,20 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
     @Override
     public List<SourceResponse> listSource(String groupId, String streamId) {
-        LOGGER.debug("begin to list source by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-
         List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId, null);
         if (CollectionUtils.isEmpty(entityList)) {
             return Collections.emptyList();
         }
         List<SourceResponse> responseList = new ArrayList<>();
         entityList.forEach(entity -> responseList.add(this.get(entity.getId(), entity.getSourceType())));
-
-        LOGGER.info("success to list source");
+        LOGGER.debug("success to list source by groupId={}, streamId={}", groupId, streamId);
         return responseList;
     }
 
     @Override
     public PageInfo<? extends SourceListResponse> listByCondition(SourcePageRequest request) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin to list source page by " + request);
-        }
         Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
-
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         List<StreamSourceEntity> entityPage = sourceMapper.selectByCondition(request);
 
@@ -145,7 +135,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
             sourceListResponses.addAll(pageInfo.getList());
         }
         PageInfo<? extends SourceListResponse> pageInfo = PageInfo.of(sourceListResponses);
-        LOGGER.debug("success to list source page");
+
+        LOGGER.debug("success to list source page, result size {}", pageInfo.getSize());
         return pageInfo;
     }
 
@@ -153,7 +144,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
             isolation = Isolation.READ_COMMITTED)
     public boolean update(SourceRequest request, String operator) {
-        LOGGER.debug("begin to update source info=" + request);
+        LOGGER.info("begin to update source info: {}", request);
         this.checkParams(request);
         Preconditions.checkNotNull(request.getId(), Constant.ID_IS_EMPTY);
 
@@ -165,16 +156,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
         operation.updateOpt(request, groupEntity.getStatus(), operator);
 
-        LOGGER.info("success to update source info");
-        return true;
-    }
-
-    @Override
-    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
-            isolation = Isolation.READ_COMMITTED)
-    public boolean updateStatus(Integer id, Integer targetStatus, String operator) {
-        sourceMapper.updateStatus(id, targetStatus, null);
-        LOGGER.info("success to update source status={} for id={} by {}", targetStatus, id, operator);
+        LOGGER.info("success to update source info: {}", request);
         return true;
     }
 
@@ -204,7 +186,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         CommonBeanUtils.copyProperties(entity, sourceRequest, true);
         operation.deleteOpt(sourceRequest, operator);
 
-        LOGGER.info("success to delete source info:{}", entity);
+        LOGGER.info("success to delete source info: {}", entity);
         return true;
     }
 
@@ -222,7 +204,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         CommonBeanUtils.copyProperties(entity, sourceRequest, true);
         operation.restartOpt(sourceRequest, operator);
 
-        LOGGER.info("success to restart source info:{}", entity);
+        LOGGER.info("success to restart source info: {}", entity);
         return true;
     }
 
@@ -240,7 +222,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         CommonBeanUtils.copyProperties(entity, sourceRequest, true);
         operation.stopOpt(sourceRequest, operator);
 
-        LOGGER.info("success to stop source info:{}", entity);
+        LOGGER.info("success to stop source info: {}", entity);
         return true;
     }