You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/15 04:38:33 UTC

[inlong] branch master updated: [INLONG-6533][Manager] Fix the problem of not distributing the task according to the cluster name (#6534)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f40d290df [INLONG-6533][Manager] Fix the problem of not distributing the task according to the cluster name (#6534)
f40d290df is described below

commit f40d290dfbb461e17589ea9dc2c67b9c93486853
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Tue Nov 15 12:38:26 2022 +0800

    [INLONG-6533][Manager] Fix the problem of not distributing the task according to the cluster name (#6534)
    
    * Fix the problem of not distributing the task according to the cluster name
    
    * Extract a constant to improve the code
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../dao/mapper/StreamSourceEntityMapper.java       |  6 ++--
 .../resources/mappers/StreamSourceEntityMapper.xml | 15 +++++----
 .../service/core/impl/AgentServiceImpl.java        | 36 ++++++++++++----------
 3 files changed, 30 insertions(+), 27 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index aee466d88..caa80bcc9 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -81,12 +81,12 @@ public interface StreamSourceEntityMapper {
             @Param("clusterName") String clusterName);
 
     /**
-     * Query the sources with status 20x by the given agent IP and agent UUID.
+     * Query the sources by the given status and Agent cluster info.
      *
      * @apiNote Sources with is_deleted > 0 should also be returned to agents to clear their local tasks.
      */
-    List<StreamSourceEntity> selectByStatusAndIp(@Param("statusList") List<Integer> statusList,
-            @Param("agentIp") String agentIp, @Param("uuid") String uuid);
+    List<StreamSourceEntity> selectByStatusAndCluster(@Param("statusList") List<Integer> statusList,
+            @Param("clusterName") String clusterName, @Param("agentIp") String agentIp, @Param("uuid") String uuid);
 
     /**
      * Select all sources by groupIds
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index e1e7f56a8..97ef663fe 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -224,18 +224,17 @@
             and (agent_ip = #{agentIp, jdbcType=VARCHAR} or inlong_cluster_name = #{clusterName, jdbcType=VARCHAR})
         </where>
     </select>
-    <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+    <select id="selectByStatusAndCluster" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
         <where>
-            agent_ip = #{agentIp, jdbcType=VARCHAR}
-            <if test="statusList != null and statusList.size()>0">
-                and status in
-                <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=",">
-                    #{item}
-                </foreach>
-            </if>
+            inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
+            and agent_ip = #{agentIp, jdbcType=VARCHAR}
+            and `status` in
+            <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=",">
+                #{item}
+            </foreach>
             <if test="uuid != null and uuid != ''">
                 and uuid = #{uuid, jdbcType=VARCHAR}
             </if>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index d091bdf7e..29f0dcbff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -71,6 +71,18 @@ public class AgentServiceImpl implements AgentService {
     private static final int MODULUS_100 = 100;
     private static final int TASK_FETCH_SIZE = 2;
 
+    /**
+     * Need issued status list, not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
+     */
+    private static final List<Integer> NEED_ISSUED_STATUS = Arrays.asList(
+            SourceStatus.TO_BE_ISSUED_DELETE.getCode(),
+            SourceStatus.TO_BE_ISSUED_RETRY.getCode(),
+            SourceStatus.TO_BE_ISSUED_BACKTRACK.getCode(),
+            SourceStatus.TO_BE_ISSUED_FROZEN.getCode(),
+            SourceStatus.TO_BE_ISSUED_CHECK.getCode(),
+            SourceStatus.TO_BE_ISSUED_REDO_METRIC.getCode(),
+            SourceStatus.TO_BE_ISSUED_MAKEUP.getCode());
+
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
     @Autowired
@@ -153,20 +165,17 @@ public class AgentServiceImpl implements AgentService {
     @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
             propagation = Propagation.REQUIRES_NEW)
     public TaskResult getTaskResult(TaskRequest request) {
-        if (request == null || StringUtils.isBlank(request.getAgentIp())) {
+        if (StringUtils.isBlank(request.getClusterName()) || StringUtils.isBlank(request.getAgentIp())) {
             throw new BusinessException("agent request or agent ip was empty, just return");
         }
 
         List<DataConfig> tasks = Lists.newArrayList();
         // Query the tasks that needed to add or active - without agentIp and uuid
-        List<DataConfig> nonFileTasks = fetchNonFileTasks(request);
-        tasks.addAll(nonFileTasks);
+        tasks.addAll(fetchNonFileTasks(request));
         // Query file collecting tasks
-        List<DataConfig> fileTasks = fetchFileTasks(request);
-        tasks.addAll(fileTasks);
-        // Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
-        List<DataConfig> needIssuedTasks = fetchIssuedTasks(request);
-        tasks.addAll(needIssuedTasks);
+        tasks.addAll(fetchFileTasks(request));
+        // Query other tasks by agentIp and uuid
+        tasks.addAll(fetchNeedIssuedTasks(request));
 
         // Query pending special commands
         List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
@@ -281,14 +290,9 @@ public class AgentServiceImpl implements AgentService {
         return null;
     }
 
-    private List<DataConfig> fetchIssuedTasks(TaskRequest taskRequest) {
-        final String agentIp = taskRequest.getAgentIp();
-        final String uuid = taskRequest.getUuid();
-        List<Integer> statusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_DELETE.getCode(),
-                SourceStatus.TO_BE_ISSUED_RETRY.getCode(), SourceStatus.TO_BE_ISSUED_BACKTRACK.getCode(),
-                SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), SourceStatus.TO_BE_ISSUED_CHECK.getCode(),
-                SourceStatus.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceStatus.TO_BE_ISSUED_MAKEUP.getCode());
-        List<StreamSourceEntity> sourceEntities = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
+    private List<DataConfig> fetchNeedIssuedTasks(TaskRequest taskRequest) {
+        List<StreamSourceEntity> sourceEntities = sourceMapper.selectByStatusAndCluster(NEED_ISSUED_STATUS,
+                taskRequest.getClusterName(), taskRequest.getAgentIp(), taskRequest.getUuid());
         List<DataConfig> issuedTasks = Lists.newArrayList();
         for (StreamSourceEntity issuedTask : sourceEntities) {
             int op = getOp(issuedTask.getStatus());