You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/14 07:32:00 UTC

[inlong] branch master updated: [INLONG-5885][Manager] Refactor the task issue logic to simply code complexity (#5886)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 69e17cbcb [INLONG-5885][Manager] Refactor the task issue logic to simply code complexity (#5886)
69e17cbcb is described below

commit 69e17cbcb3ac1bbceeb5336147c266987c4b4dcd
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Sep 14 15:31:55 2022 +0800

    [INLONG-5885][Manager] Refactor the task issue logic to simply code complexity (#5886)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../service/core/impl/AgentServiceImpl.java        | 102 +++++++++++----------
 .../inlong/manager/service/ServiceBaseTest.java    |   1 +
 .../service/core/impl/AgentServiceTest.java        |  68 ++++++++++++++
 3 files changed, 123 insertions(+), 48 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 2258581a9..111035c87 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -97,7 +97,7 @@ public class AgentServiceImpl implements AgentService {
 
         // Update task status, other tasks with status 20x will change to 30x in next request
         if (CollectionUtils.isEmpty(request.getCommandInfo())) {
-            LOGGER.warn("task result was empty, just return");
+            LOGGER.info("task result was empty in request: {}, just return", request);
             return;
         }
         for (CommandEntity command : request.getCommandInfo()) {
@@ -211,61 +211,17 @@ public class AgentServiceImpl implements AgentService {
         }
         final String agentIp = taskRequest.getAgentIp();
         final String agentClusterName = taskRequest.getClusterName();
-        final String uuid = taskRequest.getUuid();
         Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(agentClusterName),
                 "both agent ip and cluster name are blank when fetching file task");
         List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpOrCluster(needAddStatusList,
                 Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName);
         List<DataConfig> fileTasks = Lists.newArrayList();
         for (StreamSourceEntity sourceEntity : sourceEntities) {
-            FileSourceDTO fileSourceDTO = FileSourceDTO.getFromJson(sourceEntity.getExtParams());
-            final String destIp = sourceEntity.getAgentIp();
-            final String destClusterName = sourceEntity.getInlongClusterName();
-
-            // Use ip directly if it is not empty
-            if (StringUtils.isNotBlank(destIp)) {
-                if (destIp.equals(agentIp)) {
-                    int op = getOp(sourceEntity.getStatus());
-                    int nextStatus = getNextStatus(sourceEntity.getStatus());
-                    sourceEntity.setUuid(uuid);
-                    sourceEntity.setStatus(nextStatus);
-                    if (sourceMapper.updateByPrimaryKeySelective(sourceEntity) == 1) {
-                        sourceEntity.setVersion(sourceEntity.getVersion() + 1);
-                        fileTasks.add(getDataConfig(sourceEntity, op));
-                    }
-                }
+            DataConfig taskConfig = getFileTaskFromEntity(taskRequest, sourceEntity);
+            if (taskConfig == null) {
                 continue;
             }
-
-            // Cluster name is not blank, split subtask if necessary
-            // The template task's id is assigned to the subtask's template id field
-            if (StringUtils.isNotBlank(destClusterName) && destClusterName.equals(agentClusterName)
-                    && Objects.isNull(sourceEntity.getTemplateId())) {
-
-                // Is the task already fetched by this agent ?
-                List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId());
-                if (subSources.stream().anyMatch(subSource -> subSource.getAgentIp().equals(agentIp))) {
-                    continue;
-                }
-
-                // If not, clone a sub task for the new agent
-                // Note that a new source name with random suffix is generated to adhere to the unique constraint
-                StreamSourceEntity fileEntity = CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
-                FileSourceDTO childFileSourceDTO = CommonBeanUtils.copyProperties(fileSourceDTO, FileSourceDTO::new);
-                fileEntity.setExtParams(JsonUtils.toJsonString(childFileSourceDTO));
-                fileEntity.setAgentIp(agentIp);
-                fileEntity.setUuid(uuid);
-                fileEntity.setSourceName(fileEntity.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric(10));
-                fileEntity.setTemplateId(sourceEntity.getId());
-                int op = getOp(fileEntity.getStatus());
-                int nextStatus = getNextStatus(fileEntity.getStatus());
-                fileEntity.setStatus(nextStatus);
-                if (sourceMapper.insert(fileEntity) > 0) {
-                    // refresh entity version and others.
-                    fileEntity = sourceMapper.selectById(fileEntity.getId());
-                    fileTasks.add(getDataConfig(fileEntity, op));
-                }
-            }
+            fileTasks.add(taskConfig);
             if (fileTasks.size() >= TASK_FETCH_SIZE) {
                 break;
             }
@@ -273,6 +229,56 @@ public class AgentServiceImpl implements AgentService {
         return fileTasks;
     }
 
+    private DataConfig getFileTaskFromEntity(TaskRequest taskRequest, StreamSourceEntity sourceEntity) {
+        final String agentIp = taskRequest.getAgentIp();
+        final String uuid = taskRequest.getUuid();
+
+        // fetch task by designated agent ip
+        final String destIp = sourceEntity.getAgentIp();
+        if (StringUtils.isNotBlank(destIp) && destIp.equals(agentIp)) {
+            int op = getOp(sourceEntity.getStatus());
+            int nextStatus = getNextStatus(sourceEntity.getStatus());
+            sourceEntity.setUuid(uuid);
+            sourceEntity.setStatus(nextStatus);
+            if (sourceMapper.updateByPrimaryKeySelective(sourceEntity) == 1) {
+                sourceEntity.setVersion(sourceEntity.getVersion() + 1);
+                return getDataConfig(sourceEntity, op);
+            }
+        }
+
+        // fetch task by cluster name and template source
+        String destClusterName = sourceEntity.getInlongClusterName();
+        boolean isTemplateTask = sourceEntity.getTemplateId() == null
+                && StringUtils.isNotBlank(destClusterName)
+                && destClusterName.equals(taskRequest.getClusterName());
+        if (isTemplateTask) {
+            // is the task already fetched by this agent ?
+            List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId());
+            if (subSources.stream().anyMatch(subSource -> subSource.getAgentIp().equals(agentIp))) {
+                return null;
+            }
+
+            // if not, clone a subtask for this Agent.
+            // note: a new source name with random suffix is generated to adhere to the unique constraint
+            StreamSourceEntity fileEntity = CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
+            fileEntity.setExtParams(sourceEntity.getExtParams());
+            fileEntity.setAgentIp(agentIp);
+            fileEntity.setUuid(uuid);
+            fileEntity.setSourceName(fileEntity.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric(10));
+            fileEntity.setTemplateId(sourceEntity.getId());
+            int nextStatus = getNextStatus(fileEntity.getStatus());
+            fileEntity.setStatus(nextStatus);
+
+            // create new sub source task
+            sourceMapper.insert(fileEntity);
+
+            // select again to refresh entity version and others.
+            return getDataConfig(sourceMapper.selectById(fileEntity.getId()), getOp(fileEntity.getStatus()));
+        }
+
+        return null;
+    }
+
     private List<DataConfig> fetchIssuedTasks(TaskRequest taskRequest) {
         final String agentIp = taskRequest.getAgentIp();
         final String uuid = taskRequest.getUuid();
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index b6f7a5507..0407c8bcc 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -50,6 +50,7 @@ public class ServiceBaseTest extends BaseTest {
     public static final String GLOBAL_GROUP_ID = "global_group";
     public static final String GLOBAL_STREAM_ID = "global_stream";
     public static final String GLOBAL_OPERATOR = "admin";
+    public static final String GLOBAL_CLUSTER_NAME = "global_cluster";
     private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBaseTest.class);
 
     @Autowired
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index 0ddbe6791..d50acf694 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -17,9 +17,19 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
+import com.google.common.collect.Lists;
+import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.db.CommandEntity;
+import org.apache.inlong.common.enums.PullJobTypeEnum;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+import org.apache.inlong.common.pojo.agent.TaskRequest;
+import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.SourceStatus;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.file.FileSourceRequest;
 import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.AgentService;
@@ -56,6 +66,20 @@ class AgentServiceTest extends ServiceBaseTest {
         return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
     }
 
+    /**
+     * Save template source
+     */
+    public Integer saveTemplateSource() {
+        streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, GLOBAL_OPERATOR);
+        FileSourceRequest sourceInfo = new FileSourceRequest();
+        sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID);
+        sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
+        sourceInfo.setSourceType(SourceType.FILE);
+        sourceInfo.setSourceName("template_source_in_agent_service_test");
+        sourceInfo.setInlongClusterName(GLOBAL_CLUSTER_NAME);
+        return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
+    }
+
     /**
      * Test report snapshot.
      */
@@ -78,4 +102,48 @@ class AgentServiceTest extends ServiceBaseTest {
         sourceService.delete(id, GLOBAL_OPERATOR);
     }
 
+    /**
+     * Test sub-source task status manipulation.
+     */
+    @Test
+    void testGetAndReportSubSourceTask() {
+        // create template source for cluster agents and approve
+        final Integer templateId = this.saveTemplateSource();
+        sourceService.updateStatus(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, SourceStatus.TO_BE_ISSUED_ADD.getCode(),
+                GLOBAL_OPERATOR);
+
+        // get sub-source task
+        TaskRequest getRequest = new TaskRequest();
+        getRequest.setAgentIp("127.0.0.1");
+        getRequest.setClusterName(GLOBAL_CLUSTER_NAME);
+        getRequest.setPullJobType(PullJobTypeEnum.NEW.getType());
+        TaskResult result = agentService.getTaskResult(getRequest);
+        Assertions.assertEquals(1, result.getDataConfigs().size());
+        DataConfig subSourceTask = result.getDataConfigs().get(0);
+        // new sub-source version must be 1
+        Assertions.assertEquals(1, subSourceTask.getVersion());
+        // sub-source's id must be different from its template source
+        Assertions.assertNotEquals(templateId, subSourceTask.getTaskId());
+        // operation is to add new task
+        Assertions.assertEquals(SourceStatus.BEEN_ISSUED_ADD.getCode() % 100,
+                Integer.valueOf(subSourceTask.getOp()));
+
+        // report sub-source status
+        CommandEntity reportTask = new CommandEntity();
+        reportTask.setTaskId(subSourceTask.getTaskId());
+        reportTask.setVersion(subSourceTask.getVersion());
+        reportTask.setCommandResult(Constants.RESULT_SUCCESS);
+        TaskRequest reportRequest = new TaskRequest();
+        reportRequest.setAgentIp("127.0.0.1");
+        reportRequest.setCommandInfo(Lists.newArrayList(reportTask));
+        agentService.report(reportRequest);
+
+        // check sub-source task status
+        StreamSource subSource = sourceService.get(subSourceTask.getTaskId());
+        Assertions.assertEquals(SourceStatus.SOURCE_NORMAL.getCode(), subSource.getStatus());
+
+        sourceService.delete(templateId, GLOBAL_OPERATOR);
+        sourceService.delete(subSource.getId(), GLOBAL_OPERATOR);
+    }
+
 }