You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/07 10:26:32 UTC

[incubator-inlong] branch master updated: [INLONG-2974][Manager] Support agent to pull tasks without ip and uuid (#2976)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dd6915e  [INLONG-2974][Manager] Support agent to pull tasks without ip and uuid (#2976)
dd6915e is described below

commit dd6915e7c72407f0d580a90cd01c50e0ea39f6c9
Author: healchow <he...@gmail.com>
AuthorDate: Mon Mar 7 18:26:29 2022 +0800

    [INLONG-2974][Manager] Support agent to pull tasks without ip and uuid (#2976)
    
    * [INLONG-2974][Manager] Support agent to pull tasks without ip and uuid
    
    * [INLONG-2974][Manager] Optimize query parameters
    
    * [INLONG-2974][Manager] Change the isolation level to REPEATABLE_READ
---
 .../dao/mapper/StreamSourceEntityMapper.java       | 15 ++++-
 .../resources/mappers/StreamSourceEntityMapper.xml | 27 +++++++-
 .../service/core/impl/AgentServiceImpl.java        | 72 +++++++++++++++-------
 3 files changed, 89 insertions(+), 25 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index f1a9797..0bfb691 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -71,9 +71,15 @@ public interface StreamSourceEntityMapper {
             @Param("sourceType") String sourceType);
 
     /**
+     * Query the tasks that need to be added.
+     */
+    List<StreamSourceEntity> selectByStatusForUpdate(@Param("list") List<Integer> list);
+
+    /**
      * Query the sources with status 20x by the given agent IP and agent UUID.
      */
-    List<StreamSourceEntity> selectByIpAndUuid(@Param("agentIp") String agentIp, @Param("uuid") String uuid);
+    List<StreamSourceEntity> selectByStatusAndIp(@Param("list") List<Integer> list,
+            @Param("agentIp") String agentIp, @Param("uuid") String uuid);
 
     /**
      * Get the distinct source type from the given groupId and streamId
@@ -106,6 +112,13 @@ public interface StreamSourceEntityMapper {
     int updateStatusByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
             @Param("nextStatus") Integer nextStatus);
 
+    /**
+     * Update the agentIp and uuid.
+     *
+     * @apiNote Should not change the modify_time
+     */
+    int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @Param("uuid") String uuid);
+
     int updateSnapshot(StreamSourceEntity entity);
 
     int deleteByPrimaryKey(Integer id);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index c8909f7..595b69a 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -277,13 +277,29 @@
             </if>
         </where>
     </select>
-    <select id="selectByIpAndUuid" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+    <select id="selectByStatusForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
         <where>
             is_deleted = 0
-            and status >= 200 and status &lt; 300
+            and status in
+            <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
+                #{item}
+            </foreach>
+            limit 2 for update
+        </where>
+    </select>
+    <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source
+        <where>
+            is_deleted = 0
+            and status in
+            <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
+                #{item}
+            </foreach>
             and agent_ip = #{agentIp, jdbcType=VARCHAR}
             <if test="uuid != null and uuid != ''">
                 and uuid = #{uuid, jdbcType=VARCHAR}
@@ -418,6 +434,13 @@
             </if>
         </where>
     </update>
+    <update id="updateIpAndUuid">
+        update stream_source
+        set agent_ip    = #{agentIp,jdbcType=VARCHAR},
+            uuid        = #{uuid,jdbcType=VARCHAR},
+            modify_time = modify_time
+        where id = #{id, jdbcType=INTEGER}
+    </update>
     <update id="updateSnapshot" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         update stream_source
         set snapshot    = #{snapshot,jdbcType=LONGVARCHAR},
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index a95e691..e882af5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -60,12 +60,15 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -170,12 +173,24 @@ public class AgentServiceImpl implements AgentService {
     /**
      * Get task result by the request
      */
-    private TaskResult getTaskResult(TaskRequest request) {
-        // Query all tasks with status in 20x
+    @Transactional(isolation = Isolation.REPEATABLE_READ)
+    TaskResult getTaskResult(TaskRequest request) {
+        // Query the tasks that needed to add or active - without agentIp and uuid
+        List<Integer> addedStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
+                SourceState.TO_BE_ISSUED_ACTIVE.getCode());
+        List<StreamSourceEntity> addList = sourceMapper.selectByStatusForUpdate(addedStatusList);
+
+        // Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
+        List<Integer> statusList = Arrays.asList(SourceState.TO_BE_ISSUED_DELETE.getCode(),
+                SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
+                SourceState.TO_BE_ISSUED_FROZEN.getCode(), SourceState.TO_BE_ISSUED_CHECK.getCode(),
+                SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
         String agentIp = request.getAgentIp();
         String uuid = request.getUuid();
+        List<StreamSourceEntity> entityList = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
+        entityList.addAll(addList);
+
         List<DataConfig> dataConfigs = Lists.newArrayList();
-        List<StreamSourceEntity> entityList = sourceMapper.selectByIpAndUuid(agentIp, uuid);
         for (StreamSourceEntity entity : entityList) {
             // Change 20x to 30x
             int id = entity.getId();
@@ -188,33 +203,46 @@ public class AgentServiceImpl implements AgentService {
                 continue;
             }
 
-            DataConfig dataConfig = new DataConfig();
-            dataConfig.setIp(entity.getAgentIp());
-            dataConfig.setUuid(entity.getUuid());
-            dataConfig.setOp(String.valueOf(op));
-            dataConfig.setTaskId(entity.getId());
-            dataConfig.setTaskType(getTaskType(entity));
-            dataConfig.setTaskName(entity.getSourceName());
-            dataConfig.setSnapshot(entity.getSnapshot());
-            dataConfig.setExtParams(entity.getExtParams());
-            LocalDateTime dateTime = LocalDateTime.ofInstant(entity.getModifyTime().toInstant(),
-                    ZoneId.systemDefault());
-            dataConfig.setDeliveryTime(dateTime.format(TIME_FORMATTER));
-
-            String groupId = entity.getInlongGroupId();
-            String streamId = entity.getInlongStreamId();
-            dataConfig.setInlongGroupId(groupId);
-            dataConfig.setInlongStreamId(streamId);
-            InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
-            dataConfig.setSyncSend(streamEntity.getSyncSend());
+            DataConfig dataConfig = getDataConfig(entity, op);
             dataConfigs.add(dataConfig);
         }
         // Query pending special commands
         List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
 
+        // Update agentIp and uuid for the added and active tasks
+        for (StreamSourceEntity entity : addList) {
+            sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid);
+        }
+
         return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
     }
 
+    /**
+     * Get the DataConfig from the stream source entity
+     */
+    private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
+        DataConfig dataConfig = new DataConfig();
+        dataConfig.setIp(entity.getAgentIp());
+        dataConfig.setUuid(entity.getUuid());
+        dataConfig.setOp(String.valueOf(op));
+        dataConfig.setTaskId(entity.getId());
+        dataConfig.setTaskType(getTaskType(entity));
+        dataConfig.setTaskName(entity.getSourceName());
+        dataConfig.setSnapshot(entity.getSnapshot());
+        dataConfig.setExtParams(entity.getExtParams());
+        LocalDateTime dateTime = LocalDateTime.ofInstant(entity.getModifyTime().toInstant(),
+                ZoneId.systemDefault());
+        dataConfig.setDeliveryTime(dateTime.format(TIME_FORMATTER));
+
+        String groupId = entity.getInlongGroupId();
+        String streamId = entity.getInlongStreamId();
+        dataConfig.setInlongGroupId(groupId);
+        dataConfig.setInlongStreamId(streamId);
+        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+        dataConfig.setSyncSend(streamEntity.getSyncSend());
+        return dataConfig;
+    }
+
     private int getTaskType(StreamSourceEntity sourceEntity) {
         SourceType sourceType = SourceType.forType(sourceEntity.getSourceType());
         if (sourceType != SourceType.BINLOG) {