You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/15 04:38:33 UTC
[inlong] branch master updated: [INLONG-6533][Manager] Fix the problem of not distributing the task according to the cluster name (#6534)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f40d290df [INLONG-6533][Manager] Fix the problem of not distributing the task according to the cluster name (#6534)
f40d290df is described below
commit f40d290dfbb461e17589ea9dc2c67b9c93486853
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Tue Nov 15 12:38:26 2022 +0800
[INLONG-6533][Manager] Fix the problem of not distributing the task according to the cluster name (#6534)
* Fix the problem of not distributing the task according to the cluster name
* Extract a constant to improve the code
Co-authored-by: healchow <he...@gmail.com>
---
.../dao/mapper/StreamSourceEntityMapper.java | 6 ++--
.../resources/mappers/StreamSourceEntityMapper.xml | 15 +++++----
.../service/core/impl/AgentServiceImpl.java | 36 ++++++++++++----------
3 files changed, 30 insertions(+), 27 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index aee466d88..caa80bcc9 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -81,12 +81,12 @@ public interface StreamSourceEntityMapper {
@Param("clusterName") String clusterName);
/**
- * Query the sources with status 20x by the given agent IP and agent UUID.
+ * Query the sources by the given status and Agent cluster info.
*
* @apiNote Sources with is_deleted > 0 should also be returned to agents to clear their local tasks.
*/
- List<StreamSourceEntity> selectByStatusAndIp(@Param("statusList") List<Integer> statusList,
- @Param("agentIp") String agentIp, @Param("uuid") String uuid);
+ List<StreamSourceEntity> selectByStatusAndCluster(@Param("statusList") List<Integer> statusList,
+ @Param("clusterName") String clusterName, @Param("agentIp") String agentIp, @Param("uuid") String uuid);
/**
* Select all sources by groupIds
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index e1e7f56a8..97ef663fe 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -224,18 +224,17 @@
and (agent_ip = #{agentIp, jdbcType=VARCHAR} or inlong_cluster_name = #{clusterName, jdbcType=VARCHAR})
</where>
</select>
- <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ <select id="selectByStatusAndCluster" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
<where>
- agent_ip = #{agentIp, jdbcType=VARCHAR}
- <if test="statusList != null and statusList.size()>0">
- and status in
- <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=",">
- #{item}
- </foreach>
- </if>
+ inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
+ and agent_ip = #{agentIp, jdbcType=VARCHAR}
+ and `status` in
+ <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
<if test="uuid != null and uuid != ''">
and uuid = #{uuid, jdbcType=VARCHAR}
</if>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index d091bdf7e..29f0dcbff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -71,6 +71,18 @@ public class AgentServiceImpl implements AgentService {
private static final int MODULUS_100 = 100;
private static final int TASK_FETCH_SIZE = 2;
+ /**
+ * Need issued status list, not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
+ */
+ private static final List<Integer> NEED_ISSUED_STATUS = Arrays.asList(
+ SourceStatus.TO_BE_ISSUED_DELETE.getCode(),
+ SourceStatus.TO_BE_ISSUED_RETRY.getCode(),
+ SourceStatus.TO_BE_ISSUED_BACKTRACK.getCode(),
+ SourceStatus.TO_BE_ISSUED_FROZEN.getCode(),
+ SourceStatus.TO_BE_ISSUED_CHECK.getCode(),
+ SourceStatus.TO_BE_ISSUED_REDO_METRIC.getCode(),
+ SourceStatus.TO_BE_ISSUED_MAKEUP.getCode());
+
@Autowired
private StreamSourceEntityMapper sourceMapper;
@Autowired
@@ -153,20 +165,17 @@ public class AgentServiceImpl implements AgentService {
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
propagation = Propagation.REQUIRES_NEW)
public TaskResult getTaskResult(TaskRequest request) {
- if (request == null || StringUtils.isBlank(request.getAgentIp())) {
+ if (StringUtils.isBlank(request.getClusterName()) || StringUtils.isBlank(request.getAgentIp())) {
throw new BusinessException("agent request or agent ip was empty, just return");
}
List<DataConfig> tasks = Lists.newArrayList();
// Query the tasks that needed to add or active - without agentIp and uuid
- List<DataConfig> nonFileTasks = fetchNonFileTasks(request);
- tasks.addAll(nonFileTasks);
+ tasks.addAll(fetchNonFileTasks(request));
// Query file collecting tasks
- List<DataConfig> fileTasks = fetchFileTasks(request);
- tasks.addAll(fileTasks);
- // Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
- List<DataConfig> needIssuedTasks = fetchIssuedTasks(request);
- tasks.addAll(needIssuedTasks);
+ tasks.addAll(fetchFileTasks(request));
+ // Query other tasks by agentIp and uuid
+ tasks.addAll(fetchNeedIssuedTasks(request));
// Query pending special commands
List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
@@ -281,14 +290,9 @@ public class AgentServiceImpl implements AgentService {
return null;
}
- private List<DataConfig> fetchIssuedTasks(TaskRequest taskRequest) {
- final String agentIp = taskRequest.getAgentIp();
- final String uuid = taskRequest.getUuid();
- List<Integer> statusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_DELETE.getCode(),
- SourceStatus.TO_BE_ISSUED_RETRY.getCode(), SourceStatus.TO_BE_ISSUED_BACKTRACK.getCode(),
- SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), SourceStatus.TO_BE_ISSUED_CHECK.getCode(),
- SourceStatus.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceStatus.TO_BE_ISSUED_MAKEUP.getCode());
- List<StreamSourceEntity> sourceEntities = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
+ private List<DataConfig> fetchNeedIssuedTasks(TaskRequest taskRequest) {
+ List<StreamSourceEntity> sourceEntities = sourceMapper.selectByStatusAndCluster(NEED_ISSUED_STATUS,
+ taskRequest.getClusterName(), taskRequest.getAgentIp(), taskRequest.getUuid());
List<DataConfig> issuedTasks = Lists.newArrayList();
for (StreamSourceEntity issuedTask : sourceEntities) {
int op = getOp(issuedTask.getStatus());