You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/19 08:34:16 UTC
[dolphinscheduler] 05/07: [Bug][Dependent]: Id also clone due to duplicate when use dependent mode. (#11929)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 714e258be6e3f666e1c5711fd6575ebbb987c8d3
Author: Stalary <st...@163.com>
AuthorDate: Thu Sep 15 10:00:38 2022 +0800
[Bug][Dependent]: Id also clone due to duplicate when use dependent mode. (#11929)
---
.../api/service/impl/ExecutorServiceImpl.java | 6 +-
.../api/service/ExecutorServiceTest.java | 64 +++++++++++++++++++++-
2 files changed, 65 insertions(+), 5 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 957447dd68..5755362d99 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -889,7 +889,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/**
* create complement dependent command
*/
- protected int createComplementDependentCommand(List<Schedule> schedules, Command command) {
+ public int createComplementDependentCommand(List<Schedule> schedules, Command command) {
int dependentProcessDefinitionCreateCount = 0;
Command dependentCommand;
@@ -903,9 +903,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
List<DependentProcessDefinition> dependentProcessDefinitionList =
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
CronUtils.getMaxCycle(schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup());
-
dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
+ // If the id is Integer, the auto-increment id will be obtained by mybatis-plus
+ // and causing duplicate when clone it.
+ dependentCommand.setId(null);
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 9affbd2c2f..59418e2171 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -20,18 +20,30 @@ package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.RERUN;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
@@ -45,16 +57,17 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
-import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -177,7 +190,8 @@ public class ExecutorServiceTest {
.thenReturn(checkProjectAndAuth());
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
- Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
+ doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null));
+ doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null));
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
@@ -236,6 +250,50 @@ public class ExecutorServiceTest {
}
+ @Test
+ public void testComplementWithDependentMode() {
+ Schedule schedule = new Schedule();
+ schedule.setStartTime(new Date());
+ schedule.setEndTime(new Date());
+ schedule.setCrontab("0 0 7 * * ? *");
+ schedule.setFailureStrategy(FailureStrategy.CONTINUE);
+ schedule.setReleaseState(ReleaseState.OFFLINE);
+ schedule.setWarningType(WarningType.NONE);
+ schedule.setCreateTime(new Date());
+ schedule.setUpdateTime(new Date());
+ List<Schedule> schedules = Lists.newArrayList(schedule);
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(
+ processDefinitionCode))
+ .thenReturn(schedules);
+
+ DependentProcessDefinition dependentProcessDefinition = new DependentProcessDefinition();
+ dependentProcessDefinition.setProcessDefinitionCode(2);
+ dependentProcessDefinition.setProcessDefinitionVersion(1);
+ dependentProcessDefinition.setTaskDefinitionCode(1);
+ dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+ dependentProcessDefinition.setTaskParams(
+ "{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
+ Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode))
+ .thenReturn(Lists.newArrayList(dependentProcessDefinition));
+
+ Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
+ processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP);
+ Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
+ .thenReturn(processDefinitionWorkerGroupMap);
+
+ Command command = new Command();
+ command.setId(1);
+ command.setCommandType(CommandType.COMPLEMENT_DATA);
+ command.setCommandParam(
+ "{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}");
+ command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+ command.setProcessDefinitionCode(processDefinitionCode);
+ command.setExecutorId(1);
+
+ int count = executorService.createComplementDependentCommand(schedules, command);
+ Assert.assertEquals(1, count);
+ }
+
/**
* date error
*/