You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/19 08:34:16 UTC

[dolphinscheduler] 05/07: [Bug][Dependent]: Id also clone due to duplicate when use dependent mode. (#11929)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 714e258be6e3f666e1c5711fd6575ebbb987c8d3
Author: Stalary <st...@163.com>
AuthorDate: Thu Sep 15 10:00:38 2022 +0800

    [Bug][Dependent]: Id also clone due to duplicate when use dependent mode. (#11929)
---
 .../api/service/impl/ExecutorServiceImpl.java      |  6 +-
 .../api/service/ExecutorServiceTest.java           | 64 +++++++++++++++++++++-
 2 files changed, 65 insertions(+), 5 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 957447dd68..5755362d99 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -889,7 +889,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     /**
      * create complement dependent command
      */
-    protected int createComplementDependentCommand(List<Schedule> schedules, Command command) {
+    public int createComplementDependentCommand(List<Schedule> schedules, Command command) {
         int dependentProcessDefinitionCreateCount = 0;
         Command dependentCommand;
 
@@ -903,9 +903,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         List<DependentProcessDefinition> dependentProcessDefinitionList =
                 getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
                         CronUtils.getMaxCycle(schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup());
-
         dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
         for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
+            // If the id is Integer, the auto-increment id will be obtained by mybatis-plus
+            // and causing duplicate when clone it.
+            dependentCommand.setId(null);
             dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
             dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
             Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 9affbd2c2f..59418e2171 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -20,18 +20,30 @@ package org.apache.dolphinscheduler.api.service;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.RERUN;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
 import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Project;
@@ -45,16 +57,17 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
-import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.assertj.core.util.Lists;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -177,7 +190,8 @@ public class ExecutorServiceTest {
                 .thenReturn(checkProjectAndAuth());
         Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
         Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
-        Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
+        doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null));
+        doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null));
         Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
         Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
         Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
@@ -236,6 +250,50 @@ public class ExecutorServiceTest {
 
     }
 
+    @Test
+    public void testComplementWithDependentMode() {
+        Schedule schedule = new Schedule();
+        schedule.setStartTime(new Date());
+        schedule.setEndTime(new Date());
+        schedule.setCrontab("0 0 7 * * ? *");
+        schedule.setFailureStrategy(FailureStrategy.CONTINUE);
+        schedule.setReleaseState(ReleaseState.OFFLINE);
+        schedule.setWarningType(WarningType.NONE);
+        schedule.setCreateTime(new Date());
+        schedule.setUpdateTime(new Date());
+        List<Schedule> schedules = Lists.newArrayList(schedule);
+        Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(
+                processDefinitionCode))
+                .thenReturn(schedules);
+
+        DependentProcessDefinition dependentProcessDefinition = new DependentProcessDefinition();
+        dependentProcessDefinition.setProcessDefinitionCode(2);
+        dependentProcessDefinition.setProcessDefinitionVersion(1);
+        dependentProcessDefinition.setTaskDefinitionCode(1);
+        dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        dependentProcessDefinition.setTaskParams(
+                "{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
+        Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode))
+                .thenReturn(Lists.newArrayList(dependentProcessDefinition));
+
+        Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
+        processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP);
+        Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
+                .thenReturn(processDefinitionWorkerGroupMap);
+
+        Command command = new Command();
+        command.setId(1);
+        command.setCommandType(CommandType.COMPLEMENT_DATA);
+        command.setCommandParam(
+                "{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}");
+        command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        command.setProcessDefinitionCode(processDefinitionCode);
+        command.setExecutorId(1);
+
+        int count = executorService.createComplementDependentCommand(schedules, command);
+        Assert.assertEquals(1, count);
+    }
+
     /**
      * date error
      */