You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 04:59:30 UTC
[inlong] 08/09: [INLONG-5691][Manager] Remove the append operation for agent IPs (#5692)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 1f97f202108731d7bf31381d3f3e404fd46fabc2
Author: woofyzhao <49...@qq.com>
AuthorDate: Thu Aug 25 16:04:36 2022 +0800
[INLONG-5691][Manager] Remove the append operation for agent IPs (#5692)
---
.../manager/dao/mapper/StreamSourceEntityMapper.java | 2 --
.../resources/mappers/StreamSourceEntityMapper.xml | 5 -----
.../manager/service/core/impl/AgentServiceImpl.java | 20 ++++++++------------
.../src/main/resources/h2/apache_inlong_manager.sql | 5 +++--
.../manager-web/sql/apache_inlong_manager.sql | 5 +++--
5 files changed, 14 insertions(+), 23 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index eabd61220..3b0144d3d 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -129,8 +129,6 @@ public interface StreamSourceEntityMapper {
int updateSnapshot(StreamSourceEntity entity);
- int appendAgentIp(@Param("id") Integer id, @Param("agentIp") String agentIp);
-
/**
* Physical delete stream sources.
*/
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 32f8c56ef..c1b955f57 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -390,11 +390,6 @@
modify_time = modify_time
where id = #{id,jdbcType=INTEGER}
</update>
- <update id="appendAgentIp">
- update stream_source
- set agent_ip = IF(agent_ip is NULL or agent_ip = '', #{agentIp,jdbcType=VARCHAR}, CONCAT(agent_ip, ',', #{agentIp}))
- where id = #{id,jdbcType=INTEGER}
- </update>
<delete id="deleteByRelatedId">
delete
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 94ec64d61..91b3f7238 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -30,12 +30,12 @@ import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
@@ -212,6 +212,8 @@ public class AgentServiceImpl implements AgentService {
final String agentIp = taskRequest.getAgentIp();
final String agentClusterName = taskRequest.getClusterName();
final String uuid = taskRequest.getUuid();
+ Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(agentClusterName),
+ "both agent ip and cluster name are blank when fetching file task");
List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpOrCluster(needAddStatusList,
Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName,TASK_FETCH_SIZE * 10);
List<DataConfig> fileTasks = Lists.newArrayList();
@@ -235,18 +237,15 @@ public class AgentServiceImpl implements AgentService {
continue;
}
- // Cluster name is not blank, split task if necessary
- // The agent ip field of the entity holds the ip list of the agents that has already been issued
+ // Cluster name is not blank, split subtask if necessary
+ // The template task's id is assigned to the subtask's template id field
if (StringUtils.isNotBlank(destClusterName) && destClusterName.equals(agentClusterName)
&& Objects.isNull(sourceEntity.getTemplateId())) {
// Is the task already fetched by this agent ?
- if (StringUtils.isNotBlank(sourceEntity.getAgentIp())) {
- if (Arrays.asList(sourceEntity.getAgentIp().split(InlongConstants.COMMA)).contains(agentIp)) {
- LOGGER.debug("Task={} has already been fetched by agentIP={}", sourceEntity.getExtParams(),
- agentIp);
- continue;
- }
+ List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId());
+ if (subSources.stream().anyMatch(subSource -> subSource.getAgentIp().equals(agentIp))) {
+ continue;
}
// If not, clone a sub task for the new agent
@@ -264,9 +263,6 @@ public class AgentServiceImpl implements AgentService {
if (sourceMapper.insert(fileEntity) > 0) {
fileTasks.add(getDataConfig(fileEntity, op));
}
-
- // Append new agent ip
- sourceMapper.appendAgentIp(sourceEntity.getId(), agentIp);
}
if (fileTasks.size() >= TASK_FETCH_SIZE) {
break;
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 2d7753dfb..99a22413c 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -382,7 +382,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
`template_id` int(11) DEFAULT NULL COMMENT 'Id of the template task this agent belongs to',
- `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task',
+ `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
`inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task',
@@ -401,7 +401,8 @@ CREATE TABLE IF NOT EXISTS `stream_source`
PRIMARY KEY (`id`),
UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
KEY `source_status_index` (`status`, `is_deleted`),
- KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`)
+ KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`),
+ KEY `template_id_index` (`template_id`)
);
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 0c249a2ff..e7bbf585b 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -403,7 +403,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
`template_id` int(11) DEFAULT NULL COMMENT 'Id of the template task this agent belongs to',
- `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task',
+ `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
`inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task',
@@ -422,7 +422,8 @@ CREATE TABLE IF NOT EXISTS `stream_source`
PRIMARY KEY (`id`),
UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
KEY `source_status_index` (`status`, `is_deleted`),
- KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`)
+ KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`),
+ KEY `template_id_index` (`template_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';