You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/30 06:08:48 UTC

[incubator-inlong] branch master updated: [INLONG-3448][Manager] Limit the number of Agent pull tasks (#3456)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dc73be  [INLONG-3448][Manager] Limit the number of Agent pull tasks (#3456)
8dc73be is described below

commit 8dc73be8de98bdc7c9843654d13020d9963004f5
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 30 14:08:42 2022 +0800

    [INLONG-3448][Manager] Limit the number of Agent pull tasks (#3456)
---
 .../inlong/manager/service/core/impl/AgentServiceImpl.java   | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 0ae5b5a..e79e7c9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.db.CommandEntity;
 import org.apache.inlong.common.enums.ComponentTypeEnum;
+import org.apache.inlong.common.enums.PullJobTypeEnum;
 import org.apache.inlong.common.pojo.agent.CmdConfig;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
@@ -67,6 +68,7 @@ import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -171,8 +173,14 @@ public class AgentServiceImpl implements AgentService {
         }
 
         // Query the tasks that needed to add or active - without agentIp and uuid
-        List<Integer> needAddStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
-                SourceState.TO_BE_ISSUED_ACTIVE.getCode());
+        List<Integer> needAddStatusList;
+        if (PullJobTypeEnum.NEVER != PullJobTypeEnum.getPullJobType(request.getPullJobType())) {
+            needAddStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
+                    SourceState.TO_BE_ISSUED_ACTIVE.getCode());
+        } else {
+            LOGGER.warn("agent pull job type is [NEVER], just pull to be active tasks");
+            needAddStatusList = Collections.singletonList(SourceState.TO_BE_ISSUED_ACTIVE.getCode());
+        }
         List<StreamSourceEntity> entityList = sourceMapper.selectByStatus(needAddStatusList);
 
         String agentIp = request.getAgentIp();