You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/14 07:45:33 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5885][Manager] Refactor the task issue logic to simply code complexity (#5886)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 649d09508 [INLONG-5885][Manager] Refactor the task issue logic to simply code complexity (#5886)
649d09508 is described below
commit 649d0950805799efd8922ec41e999e0b08cd6235
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Sep 14 15:31:55 2022 +0800
[INLONG-5885][Manager] Refactor the task issue logic to simply code complexity (#5886)
Co-authored-by: healchow <he...@gmail.com>
---
.../service/core/impl/AgentServiceImpl.java | 102 +++++++++++----------
.../inlong/manager/service/ServiceBaseTest.java | 1 +
.../service/core/impl/AgentServiceTest.java | 68 ++++++++++++++
3 files changed, 123 insertions(+), 48 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 2258581a9..111035c87 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -97,7 +97,7 @@ public class AgentServiceImpl implements AgentService {
// Update task status, other tasks with status 20x will change to 30x in next request
if (CollectionUtils.isEmpty(request.getCommandInfo())) {
- LOGGER.warn("task result was empty, just return");
+ LOGGER.info("task result was empty in request: {}, just return", request);
return;
}
for (CommandEntity command : request.getCommandInfo()) {
@@ -211,61 +211,17 @@ public class AgentServiceImpl implements AgentService {
}
final String agentIp = taskRequest.getAgentIp();
final String agentClusterName = taskRequest.getClusterName();
- final String uuid = taskRequest.getUuid();
Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(agentClusterName),
"both agent ip and cluster name are blank when fetching file task");
List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpOrCluster(needAddStatusList,
Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName);
List<DataConfig> fileTasks = Lists.newArrayList();
for (StreamSourceEntity sourceEntity : sourceEntities) {
- FileSourceDTO fileSourceDTO = FileSourceDTO.getFromJson(sourceEntity.getExtParams());
- final String destIp = sourceEntity.getAgentIp();
- final String destClusterName = sourceEntity.getInlongClusterName();
-
- // Use ip directly if it is not empty
- if (StringUtils.isNotBlank(destIp)) {
- if (destIp.equals(agentIp)) {
- int op = getOp(sourceEntity.getStatus());
- int nextStatus = getNextStatus(sourceEntity.getStatus());
- sourceEntity.setUuid(uuid);
- sourceEntity.setStatus(nextStatus);
- if (sourceMapper.updateByPrimaryKeySelective(sourceEntity) == 1) {
- sourceEntity.setVersion(sourceEntity.getVersion() + 1);
- fileTasks.add(getDataConfig(sourceEntity, op));
- }
- }
+ DataConfig taskConfig = getFileTaskFromEntity(taskRequest, sourceEntity);
+ if (taskConfig == null) {
continue;
}
-
- // Cluster name is not blank, split subtask if necessary
- // The template task's id is assigned to the subtask's template id field
- if (StringUtils.isNotBlank(destClusterName) && destClusterName.equals(agentClusterName)
- && Objects.isNull(sourceEntity.getTemplateId())) {
-
- // Is the task already fetched by this agent ?
- List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId());
- if (subSources.stream().anyMatch(subSource -> subSource.getAgentIp().equals(agentIp))) {
- continue;
- }
-
- // If not, clone a sub task for the new agent
- // Note that a new source name with random suffix is generated to adhere to the unique constraint
- StreamSourceEntity fileEntity = CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
- FileSourceDTO childFileSourceDTO = CommonBeanUtils.copyProperties(fileSourceDTO, FileSourceDTO::new);
- fileEntity.setExtParams(JsonUtils.toJsonString(childFileSourceDTO));
- fileEntity.setAgentIp(agentIp);
- fileEntity.setUuid(uuid);
- fileEntity.setSourceName(fileEntity.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric(10));
- fileEntity.setTemplateId(sourceEntity.getId());
- int op = getOp(fileEntity.getStatus());
- int nextStatus = getNextStatus(fileEntity.getStatus());
- fileEntity.setStatus(nextStatus);
- if (sourceMapper.insert(fileEntity) > 0) {
- // refresh entity version and others.
- fileEntity = sourceMapper.selectById(fileEntity.getId());
- fileTasks.add(getDataConfig(fileEntity, op));
- }
- }
+ fileTasks.add(taskConfig);
if (fileTasks.size() >= TASK_FETCH_SIZE) {
break;
}
@@ -273,6 +229,56 @@ public class AgentServiceImpl implements AgentService {
return fileTasks;
}
+ private DataConfig getFileTaskFromEntity(TaskRequest taskRequest, StreamSourceEntity sourceEntity) {
+ final String agentIp = taskRequest.getAgentIp();
+ final String uuid = taskRequest.getUuid();
+
+ // fetch task by designated agent ip
+ final String destIp = sourceEntity.getAgentIp();
+ if (StringUtils.isNotBlank(destIp) && destIp.equals(agentIp)) {
+ int op = getOp(sourceEntity.getStatus());
+ int nextStatus = getNextStatus(sourceEntity.getStatus());
+ sourceEntity.setUuid(uuid);
+ sourceEntity.setStatus(nextStatus);
+ if (sourceMapper.updateByPrimaryKeySelective(sourceEntity) == 1) {
+ sourceEntity.setVersion(sourceEntity.getVersion() + 1);
+ return getDataConfig(sourceEntity, op);
+ }
+ }
+
+ // fetch task by cluster name and template source
+ String destClusterName = sourceEntity.getInlongClusterName();
+ boolean isTemplateTask = sourceEntity.getTemplateId() == null
+ && StringUtils.isNotBlank(destClusterName)
+ && destClusterName.equals(taskRequest.getClusterName());
+ if (isTemplateTask) {
+ // is the task already fetched by this agent ?
+ List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId());
+ if (subSources.stream().anyMatch(subSource -> subSource.getAgentIp().equals(agentIp))) {
+ return null;
+ }
+
+ // if not, clone a subtask for this Agent.
+ // note: a new source name with random suffix is generated to adhere to the unique constraint
+ StreamSourceEntity fileEntity = CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
+ fileEntity.setExtParams(sourceEntity.getExtParams());
+ fileEntity.setAgentIp(agentIp);
+ fileEntity.setUuid(uuid);
+ fileEntity.setSourceName(fileEntity.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric(10));
+ fileEntity.setTemplateId(sourceEntity.getId());
+ int nextStatus = getNextStatus(fileEntity.getStatus());
+ fileEntity.setStatus(nextStatus);
+
+ // create new sub source task
+ sourceMapper.insert(fileEntity);
+
+ // select again to refresh entity version and others.
+ return getDataConfig(sourceMapper.selectById(fileEntity.getId()), getOp(fileEntity.getStatus()));
+ }
+
+ return null;
+ }
+
private List<DataConfig> fetchIssuedTasks(TaskRequest taskRequest) {
final String agentIp = taskRequest.getAgentIp();
final String uuid = taskRequest.getUuid();
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index b6f7a5507..0407c8bcc 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -50,6 +50,7 @@ public class ServiceBaseTest extends BaseTest {
public static final String GLOBAL_GROUP_ID = "global_group";
public static final String GLOBAL_STREAM_ID = "global_stream";
public static final String GLOBAL_OPERATOR = "admin";
+ public static final String GLOBAL_CLUSTER_NAME = "global_cluster";
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBaseTest.class);
@Autowired
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index 0ddbe6791..d50acf694 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -17,9 +17,19 @@
package org.apache.inlong.manager.service.core.impl;
+import com.google.common.collect.Lists;
+import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.db.CommandEntity;
+import org.apache.inlong.common.enums.PullJobTypeEnum;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+import org.apache.inlong.common.pojo.agent.TaskRequest;
+import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.SourceStatus;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.file.FileSourceRequest;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.AgentService;
@@ -56,6 +66,20 @@ class AgentServiceTest extends ServiceBaseTest {
return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
}
+ /**
+ * Save template source
+ */
+ public Integer saveTemplateSource() {
+ streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, GLOBAL_OPERATOR);
+ FileSourceRequest sourceInfo = new FileSourceRequest();
+ sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID);
+ sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
+ sourceInfo.setSourceType(SourceType.FILE);
+ sourceInfo.setSourceName("template_source_in_agent_service_test");
+ sourceInfo.setInlongClusterName(GLOBAL_CLUSTER_NAME);
+ return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
+ }
+
/**
* Test report snapshot.
*/
@@ -78,4 +102,48 @@ class AgentServiceTest extends ServiceBaseTest {
sourceService.delete(id, GLOBAL_OPERATOR);
}
+ /**
+ * Test sub-source task status manipulation.
+ */
+ @Test
+ void testGetAndReportSubSourceTask() {
+ // create template source for cluster agents and approve
+ final Integer templateId = this.saveTemplateSource();
+ sourceService.updateStatus(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, SourceStatus.TO_BE_ISSUED_ADD.getCode(),
+ GLOBAL_OPERATOR);
+
+ // get sub-source task
+ TaskRequest getRequest = new TaskRequest();
+ getRequest.setAgentIp("127.0.0.1");
+ getRequest.setClusterName(GLOBAL_CLUSTER_NAME);
+ getRequest.setPullJobType(PullJobTypeEnum.NEW.getType());
+ TaskResult result = agentService.getTaskResult(getRequest);
+ Assertions.assertEquals(1, result.getDataConfigs().size());
+ DataConfig subSourceTask = result.getDataConfigs().get(0);
+ // new sub-source version must be 1
+ Assertions.assertEquals(1, subSourceTask.getVersion());
+ // sub-source's id must be different from its template source
+ Assertions.assertNotEquals(templateId, subSourceTask.getTaskId());
+ // operation is to add new task
+ Assertions.assertEquals(SourceStatus.BEEN_ISSUED_ADD.getCode() % 100,
+ Integer.valueOf(subSourceTask.getOp()));
+
+ // report sub-source status
+ CommandEntity reportTask = new CommandEntity();
+ reportTask.setTaskId(subSourceTask.getTaskId());
+ reportTask.setVersion(subSourceTask.getVersion());
+ reportTask.setCommandResult(Constants.RESULT_SUCCESS);
+ TaskRequest reportRequest = new TaskRequest();
+ reportRequest.setAgentIp("127.0.0.1");
+ reportRequest.setCommandInfo(Lists.newArrayList(reportTask));
+ agentService.report(reportRequest);
+
+ // check sub-source task status
+ StreamSource subSource = sourceService.get(subSourceTask.getTaskId());
+ Assertions.assertEquals(SourceStatus.SOURCE_NORMAL.getCode(), subSource.getStatus());
+
+ sourceService.delete(templateId, GLOBAL_OPERATOR);
+ sourceService.delete(subSource.getId(), GLOBAL_OPERATOR);
+ }
+
}