You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/20 11:09:16 UTC

[incubator-inlong] branch master updated: [INLONG-3252][Manager] Remove repeated-read transaction in the getting source method (#3253)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 86319d8  [INLONG-3252][Manager] Remove repeated-read transaction in the getting source method (#3253)
86319d8 is described below

commit 86319d8fbfca7da9e63e2733cc30052f23daa114
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sun Mar 20 19:09:10 2022 +0800

    [INLONG-3252][Manager] Remove repeated-read transaction in the getting source method (#3253)
---
 .../dao/mapper/StreamSourceEntityMapper.java       |  7 +++--
 .../resources/mappers/StreamSourceEntityMapper.xml | 18 ++++++-----
 .../service/core/impl/AgentServiceImpl.java        |  6 ++--
 .../service/core/impl/InlongGroupServiceImpl.java  |  3 --
 .../service/core/impl/InlongStreamServiceImpl.java | 35 ++--------------------
 .../source/AbstractStreamSourceOperation.java      |  1 -
 .../service/source/SourceSnapshotOperation.java    |  9 ++++--
 .../service/source/StreamSourceServiceImpl.java    |  2 +-
 8 files changed, 28 insertions(+), 53 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 9cc4144..b70998b 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.springframework.stereotype.Repository;
 
+import java.util.Date;
 import java.util.List;
 
 @Repository
@@ -89,7 +90,8 @@ public interface StreamSourceEntityMapper {
      *
      * @apiNote Should not change the modify_time
      */
-    int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus);
+    int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus,
+            @Param("modifyTime") Date modifyTime);
 
     /**
      * Update the status to `nextStatus` by the given group id and stream id.
@@ -104,7 +106,8 @@ public interface StreamSourceEntityMapper {
      *
      * @apiNote Should not change the modify_time
      */
-    int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @Param("uuid") String uuid);
+    int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @Param("uuid") String uuid,
+            @Param("modifyTime") Date modifyTime);
 
     int updateSnapshot(StreamSourceEntity entity);
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index b12f803..81d476a 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -434,15 +434,16 @@
     <update id="updateStatus">
         update stream_source
         set previous_status = status,
-            status          = #{nextStatus, jdbcType=INTEGER},
-            modify_time     = modify_time
+            status          = #{nextStatus, jdbcType=INTEGER}
+        <if test="modifyTime != null">
+            ,modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+        </if>
         where id = #{id, jdbcType=INTEGER}
     </update>
     <update id="updateStatusByRelatedId">
         update stream_source
         set previous_status = status,
-        status = #{nextStatus, jdbcType=INTEGER},
-        modify_time = modify_time
+        status = #{nextStatus, jdbcType=INTEGER}
         <where>
             inlong_group_id = #{groupId, jdbcType=VARCHAR}
             <if test="streamId != null">
@@ -453,15 +454,16 @@
     <update id="updateIpAndUuid">
         update stream_source
         set agent_ip    = #{agentIp,jdbcType=VARCHAR},
-            uuid        = #{uuid,jdbcType=VARCHAR},
-            modify_time = modify_time
+            uuid        = #{uuid,jdbcType=VARCHAR}
+        <if test="modifyTime != null">
+            ,modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+        </if>
         where id = #{id, jdbcType=INTEGER}
     </update>
     <update id="updateSnapshot" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         update stream_source
         set snapshot    = #{snapshot,jdbcType=LONGVARCHAR},
-            report_time = #{reportTime,jdbcType=TIMESTAMP},
-            modify_time = modify_time
+            report_time = #{reportTime,jdbcType=TIMESTAMP}
         where id = #{id,jdbcType=INTEGER}
     </update>
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index ed42ff4..0f4691d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -163,7 +163,7 @@ public class AgentServiceImpl implements AgentService {
                     nextStatus = SourceState.SOURCE_FAILED.getCode();
                 }
 
-                sourceMapper.updateStatus(taskId, nextStatus);
+                sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
                 LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
             }
             // Other tasks with status 20x will change to 30x in next getTaskResult method
@@ -197,7 +197,7 @@ public class AgentServiceImpl implements AgentService {
             int op = status % MODULUS_100;
             if (status / MODULUS_100 == UNISSUED_STATUS) {
                 int nextStatus = ISSUED_STATUS * MODULUS_100 + op;
-                sourceMapper.updateStatus(id, nextStatus);
+                sourceMapper.updateStatus(id, nextStatus, entity.getModifyTime());
                 LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, id);
             } else {
                 LOGGER.info("skip task status not in 20x, id={}", id);
@@ -212,7 +212,7 @@ public class AgentServiceImpl implements AgentService {
 
         // Update agentIp and uuid for the added and active tasks
         for (StreamSourceEntity entity : addList) {
-            sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid);
+            sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, entity.getModifyTime());
             LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}] ", agentIp, uuid, entity.getId());
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index cff6760..bbf7eeb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -230,9 +230,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         // Check whether the current status can be modified
         this.checkGroupCanUpdate(entity, groupRequest, operator);
         CommonBeanUtils.copyProperties(groupRequest, entity, true);
-        if (GroupState.CONFIG_FAILED.getCode().equals(entity.getStatus())) {
-            entity.setStatus(GroupState.TO_BE_SUBMIT.getCode());
-        }
 
         entity.setModifier(operator);
         groupMapper.updateByIdentifierSelective(entity);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index ef6ba02..cc3d97f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -29,9 +29,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
-import org.apache.inlong.manager.common.pojo.source.SourceDbBasicInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceDbDetailInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileBasicInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceFileDetailInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullPageUpdateRequest;
@@ -70,7 +68,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
-import java.util.Locale;
 import java.util.stream.Collectors;
 
 /**
@@ -493,41 +490,15 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             FullStreamResponse pageInfo = new FullStreamResponse();
             pageInfo.setStreamInfo(streamInfo);
 
-            // 3. Query the basic and detailed information of the data source
-            String dataSourceType = streamInfo.getDataSourceType();
-            if (StringUtils.isEmpty(dataSourceType)) {
-                continue;
-            }
-            switch (dataSourceType.toUpperCase(Locale.ROOT)) {
-                case Constant.DATA_SOURCE_FILE:
-                    SourceFileBasicInfo fileBasicInfo = sourceFileService.getBasicByIdentifier(groupId, streamId);
-                    pageInfo.setFileBasicInfo(fileBasicInfo);
-                    List<SourceFileDetailInfo> fileDetailInfoList = sourceFileService.listDetailByIdentifier(groupId,
-                            streamId);
-                    pageInfo.setFileDetailInfoList(fileDetailInfoList);
-                    break;
-                case Constant.DATA_SOURCE_DB:
-                    SourceDbBasicInfo dbBasicInfo = sourceDbService.getBasicByIdentifier(groupId, streamId);
-                    pageInfo.setDbBasicInfo(dbBasicInfo);
-                    List<SourceDbDetailInfo> dbDetailInfoList = sourceDbService.listDetailByIdentifier(groupId,
-                            streamId);
-                    pageInfo.setDbDetailInfoList(dbDetailInfoList);
-                    break;
-                case Constant.DATA_SOURCE_AUTO_PUSH:
-                    break;
-                default:
-                    throw new BusinessException(ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORTED);
-            }
-
-            // 4. Query stream sources information
+            // 3. Query stream sources information
             List<SourceResponse> sourceList = sourceService.listSource(groupId, streamId);
             pageInfo.setSourceInfo(sourceList);
 
-            // 5. Query various stream sinks and its extended information, field information
+            // 4. Query various stream sinks and its extended information, field information
             List<SinkResponse> sinkList = sinkService.listSink(groupId, streamId);
             pageInfo.setSinkInfo(sinkList);
 
-            // 6. Add a single result to the paginated list
+            // 5. Add a single result to the paginated list
             responseList.add(pageInfo);
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index 27460db..9bc2f7a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -68,7 +68,6 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
     protected abstract SourceResponse getResponse();
 
     @Override
-    @Transactional(isolation = Isolation.REPEATABLE_READ)
     public SourceResponse getById(@NotNull Integer id) {
         StreamSourceEntity entity = sourceMapper.selectById(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
index e9fadfe..213a0e1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
@@ -132,7 +132,8 @@ public class SourceSnapshotOperation implements AutoCloseable {
                 // Update the status from temporary to normal
                 Integer status = idStatusMap.get(id);
                 if (SourceState.TEMP_TO_NORMAL.contains(status)) {
-                    sourceMapper.updateStatus(id, SourceState.SOURCE_NORMAL.getCode());
+                    StreamSourceEntity source = sourceMapper.selectByIdForUpdate(id);
+                    sourceMapper.updateStatus(id, SourceState.SOURCE_NORMAL.getCode(), source.getModifyTime());
                 }
             }
 
@@ -143,10 +144,12 @@ public class SourceSnapshotOperation implements AutoCloseable {
                 Integer cacheId = entry.getKey();
                 Integer cacheStatus = entry.getValue();
                 if (!currentTaskIdSet.contains(cacheId)) {
+                    StreamSourceEntity source = sourceMapper.selectByIdForUpdate(cacheId);
                     if (Objects.equal(cacheStatus, SourceState.BEEN_ISSUED_DELETE.getCode())) {
-                        sourceMapper.updateStatus(cacheId, SourceState.SOURCE_DISABLE.getCode());
+                        sourceMapper.updateStatus(cacheId, SourceState.SOURCE_DISABLE.getCode(),
+                                source.getModifyTime());
                     } else if (Objects.equal(cacheStatus, SourceState.BEEN_ISSUED_FROZEN.getCode())) {
-                        sourceMapper.updateStatus(cacheId, SourceState.SOURCE_FROZEN.getCode());
+                        sourceMapper.updateStatus(cacheId, SourceState.SOURCE_FROZEN.getCode(), source.getModifyTime());
                     }
                 }
             }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 3b5c092..b6ec0f1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -170,7 +170,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public boolean updateStatus(Integer id, Integer targetStatus, String operator) {
-        sourceMapper.updateStatus(id, targetStatus);
+        sourceMapper.updateStatus(id, targetStatus, null);
         LOGGER.info("success to update source status={} for id={} by {}", targetStatus, id, operator);
         return true;
     }