You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/30 06:08:48 UTC
[incubator-inlong] branch master updated: [INLONG-3448][Manager] Limit the number of Agent pull tasks (#3456)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8dc73be [INLONG-3448][Manager] Limit the number of Agent pull tasks (#3456)
8dc73be is described below
commit 8dc73be8de98bdc7c9843654d13020d9963004f5
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 30 14:08:42 2022 +0800
[INLONG-3448][Manager] Limit the number of Agent pull tasks (#3456)
---
.../inlong/manager/service/core/impl/AgentServiceImpl.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 0ae5b5a..e79e7c9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.db.CommandEntity;
import org.apache.inlong.common.enums.ComponentTypeEnum;
+import org.apache.inlong.common.enums.PullJobTypeEnum;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
@@ -67,6 +68,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -171,8 +173,14 @@ public class AgentServiceImpl implements AgentService {
}
// Query the tasks that needed to add or active - without agentIp and uuid
- List<Integer> needAddStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
- SourceState.TO_BE_ISSUED_ACTIVE.getCode());
+ List<Integer> needAddStatusList;
+ if (PullJobTypeEnum.NEVER != PullJobTypeEnum.getPullJobType(request.getPullJobType())) {
+ needAddStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
+ SourceState.TO_BE_ISSUED_ACTIVE.getCode());
+ } else {
+ LOGGER.warn("agent pull job type is [NEVER], just pull to be active tasks");
+ needAddStatusList = Collections.singletonList(SourceState.TO_BE_ISSUED_ACTIVE.getCode());
+ }
List<StreamSourceEntity> entityList = sourceMapper.selectByStatus(needAddStatusList);
String agentIp = request.getAgentIp();