You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 04:59:30 UTC

[inlong] 08/09: [INLONG-5691][Manager] Remove the append operation for agent IPs (#5692)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 1f97f202108731d7bf31381d3f3e404fd46fabc2
Author: woofyzhao <49...@qq.com>
AuthorDate: Thu Aug 25 16:04:36 2022 +0800

    [INLONG-5691][Manager] Remove the append operation for agent IPs (#5692)
---
 .../manager/dao/mapper/StreamSourceEntityMapper.java |  2 --
 .../resources/mappers/StreamSourceEntityMapper.xml   |  5 -----
 .../manager/service/core/impl/AgentServiceImpl.java  | 20 ++++++++------------
 .../src/main/resources/h2/apache_inlong_manager.sql  |  5 +++--
 .../manager-web/sql/apache_inlong_manager.sql        |  5 +++--
 5 files changed, 14 insertions(+), 23 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index eabd61220..3b0144d3d 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -129,8 +129,6 @@ public interface StreamSourceEntityMapper {
 
     int updateSnapshot(StreamSourceEntity entity);
 
-    int appendAgentIp(@Param("id") Integer id, @Param("agentIp") String agentIp);
-
     /**
      * Physical delete stream sources.
      */
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 32f8c56ef..c1b955f57 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -390,11 +390,6 @@
             modify_time = modify_time
         where id = #{id,jdbcType=INTEGER}
     </update>
-    <update id="appendAgentIp">
-        update stream_source
-        set agent_ip = IF(agent_ip is NULL or agent_ip = '', #{agentIp,jdbcType=VARCHAR}, CONCAT(agent_ip, ',', #{agentIp}))
-        where id = #{id,jdbcType=INTEGER}
-    </update>
 
     <delete id="deleteByRelatedId">
         delete
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 94ec64d61..91b3f7238 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -30,12 +30,12 @@ import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
@@ -212,6 +212,8 @@ public class AgentServiceImpl implements AgentService {
         final String agentIp = taskRequest.getAgentIp();
         final String agentClusterName = taskRequest.getClusterName();
         final String uuid = taskRequest.getUuid();
+        Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(agentClusterName),
+                "both agent ip and cluster name are blank when fetching file task");
         List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpOrCluster(needAddStatusList,
                 Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName,TASK_FETCH_SIZE * 10);
         List<DataConfig> fileTasks = Lists.newArrayList();
@@ -235,18 +237,15 @@ public class AgentServiceImpl implements AgentService {
                 continue;
             }
 
-            // Cluster name is not blank, split task if necessary
-            // The agent ip field of the entity holds the ip list of the agents that has already been issued
+            // Cluster name is not blank, split subtask if necessary
+            // The template task's id is assigned to the subtask's template id field
             if (StringUtils.isNotBlank(destClusterName) && destClusterName.equals(agentClusterName)
                     && Objects.isNull(sourceEntity.getTemplateId())) {
 
                 // Is the task already fetched by this agent ?
-                if (StringUtils.isNotBlank(sourceEntity.getAgentIp())) {
-                    if (Arrays.asList(sourceEntity.getAgentIp().split(InlongConstants.COMMA)).contains(agentIp)) {
-                        LOGGER.debug("Task={} has already been fetched by agentIP={}", sourceEntity.getExtParams(),
-                                agentIp);
-                        continue;
-                    }
+                List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId());
+                if (subSources.stream().anyMatch(subSource -> subSource.getAgentIp().equals(agentIp))) {
+                    continue;
                 }
 
                 // If not, clone a sub task for the new agent
@@ -264,9 +263,6 @@ public class AgentServiceImpl implements AgentService {
                 if (sourceMapper.insert(fileEntity) > 0) {
                     fileTasks.add(getDataConfig(fileEntity, op));
                 }
-
-                // Append new agent ip
-                sourceMapper.appendAgentIp(sourceEntity.getId(), agentIp);
             }
             if (fileTasks.size() >= TASK_FETCH_SIZE) {
                 break;
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 2d7753dfb..99a22413c 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -382,7 +382,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `source_name`         varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
     `source_type`         varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
     `template_id`         int(11)      DEFAULT NULL COMMENT 'Id of the template task this agent belongs to',
-    `agent_ip`            varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task',
+    `agent_ip`            varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task',
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
     `inlong_cluster_name` varchar(128)          DEFAULT NULL COMMENT 'Cluster name of the agent running the task',
@@ -401,7 +401,8 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
     KEY `source_status_index` (`status`, `is_deleted`),
-    KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`)
+    KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`),
+    KEY `template_id_index` (`template_id`)
 );
 
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 0c249a2ff..e7bbf585b 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -403,7 +403,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `source_name`         varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
     `source_type`         varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
     `template_id`         int(11)      DEFAULT NULL COMMENT 'Id of the template task this agent belongs to',
-    `agent_ip`            varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task',
+    `agent_ip`            varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task',
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
     `inlong_cluster_name` varchar(128)          DEFAULT NULL COMMENT 'Cluster name of the agent running the task',
@@ -422,7 +422,8 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
     KEY `source_status_index` (`status`, `is_deleted`),
-    KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`)
+    KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`),
+    KEY `template_id_index` (`template_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';