You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/08/02 11:09:46 UTC

[dolphinscheduler] 08/11: [Fix-11007] [Master] fix forced_success bug (#11088)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 68a73b04a26bf68419307a826a5043ba1524d534
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Sat Jul 30 23:28:31 2022 +0800

    [Fix-11007] [Master] fix forced_success bug (#11088)
    
    * fix forced_success bug
    
    * add comments
    
    * add transactional
    
    * refactor code
    
    Co-authored-by: JinyLeeChina <ji...@foxmail.com>
    
    (cherry picked from commit e5cca0e79bfe16d07e931bcc68c279643ad45fab)
---
 .../api/service/impl/TaskInstanceServiceImpl.java  |  3 ++
 .../service/process/ProcessService.java            |  2 ++
 .../service/process/ProcessServiceImpl.java        | 38 ++++++++++++++++++++--
 3 files changed, 40 insertions(+), 3 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index cfcd95d88c..45edd9730e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -46,6 +46,7 @@ import java.util.stream.Collectors;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -163,6 +164,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
      * @param taskInstanceId task instance id
      * @return the result code and msg
      */
+    @Transactional
     @Override
     public Map<String, Object> forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) {
         Project project = projectMapper.queryByCode(projectCode);
@@ -195,6 +197,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
         task.setState(ExecutionStatus.FORCED_SUCCESS);
         int changedNum = taskInstanceMapper.updateById(task);
         if (changedNum > 0) {
+            processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId);
             putMsg(result, Status.SUCCESS);
         } else {
             putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index f61676e693..86f6eb0160 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -295,4 +295,6 @@ public interface ProcessService {
                               org.apache.dolphinscheduler.remote.command.CommandType taskType);
 
     ProcessInstance loadNextProcess4Serial(long code, int state, int id);
+
+    void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 2f0e1c077a..14c998517e 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1308,10 +1308,10 @@ public class ProcessServiceImpl implements ProcessService {
      *
      * @param parentInstance parentInstance
      * @param parentTask     parentTask
+     * @param processMap     processMap
      * @return process instance map
      */
-    private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
-        ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
+    private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask, ProcessInstanceMap processMap) {
         if (processMap != null) {
             return processMap;
         }
@@ -1375,11 +1375,16 @@ public class ProcessServiceImpl implements ProcessService {
             // recover failover tolerance would not create a new command when the sub command already have been created
             return;
         }
-        instanceMap = setProcessInstanceMap(parentProcessInstance, task);
+        instanceMap = setProcessInstanceMap(parentProcessInstance, task, instanceMap);
         ProcessInstance childInstance = null;
         if (instanceMap.getProcessInstanceId() != 0) {
             childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
         }
+        if (childInstance != null && childInstance.getState() == ExecutionStatus.SUCCESS
+            && CommandType.START_FAILURE_TASK_PROCESS == parentProcessInstance.getCommandType()) {
+            logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId());
+            return;
+        }
         Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
         updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode());
         initSubInstanceState(childInstance);
@@ -3050,4 +3055,31 @@ public class ProcessServiceImpl implements ProcessService {
             throw new ServiceException("delete command fail, id:" + commandId);
         }
     }
+
+    @Override
+    public void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId) {
+        TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
+        if (task == null) {
+            return;
+        }
+        ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId());
+        if (processInstance != null && (processInstance.getState().typeIsFailure() || processInstance.getState().typeIsCancel())) {
+            List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
+            List<Long> instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
+            List<ProcessTaskRelation> taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion());
+            List<TaskDefinitionLog> taskDefinitionLogs = genTaskDefineList(taskRelations);
+            List<Long> definiteTaskCodeList = taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES)
+                .map(TaskDefinitionLog::getCode).collect(Collectors.toList());
+            // only all tasks have instances
+            if (org.apache.dolphinscheduler.common.utils.CollectionUtils.equalLists(instanceTaskCodeList, definiteTaskCodeList)) {
+                List<Integer> failTaskList = validTaskList.stream().filter(instance -> instance.getState().typeIsFailure() || instance.getState().typeIsCancel())
+                    .map(TaskInstance::getId).collect(Collectors.toList());
+                if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) {
+                    processInstance.setState(ExecutionStatus.SUCCESS);
+                    updateProcessInstance(processInstance);
+                }
+            }
+        }
+    }
 }
\ No newline at end of file