You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/09/20 12:06:44 UTC

[dolphinscheduler] branch 2.0.7-prepare updated: to #10692: fix Parameter transfer problem when recover failed task and output of subprogress (#10694)

This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch 2.0.7-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.7-prepare by this push:
     new 9ca3962bf1 to #10692: fix Parameter transfer problem when recover failed task and output of subprogress (#10694)
9ca3962bf1 is described below

commit 9ca3962bf129a8f112605191e2c36b7ab235faf5
Author: zwZjut <zw...@163.com>
AuthorDate: Tue Sep 20 20:06:38 2022 +0800

    to #10692: fix Parameter transfer problem when recover failed task and output of subprogress (#10694)
    
    Co-authored-by: 宏豁 <ho...@alibaba-inc.com>
---
 .../master/runner/WorkflowExecuteThread.java       | 21 +++--
 .../master/runner/task/SubTaskProcessor.java       | 91 ++++++++++++++++++++++
 2 files changed, 106 insertions(+), 6 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 411c07b2a1..ae837dfe86 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -78,6 +78,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -461,7 +462,7 @@ public class WorkflowExecuteThread implements Runnable {
         taskRetryCheckList.remove(task.getId());
         depStateCheckList.remove(task.getId());
         if (task.getState().typeIsSuccess()) {
-            processInstance.setVarPool(task.getVarPool());
+            // processInstance.setVarPool(task.getVarPool());
             processService.saveProcessInstance(processInstance);
             submitPostNode(Long.toString(task.getTaskCode()));
         } else if (task.getState().typeIsFailure()) {
@@ -925,11 +926,12 @@ public class WorkflowExecuteThread implements Runnable {
             if (allProperty.size() > 0) {
                 taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
             }
-        } else {
-            if (StringUtils.isNotEmpty(processInstance.getVarPool())) {
-                taskInstance.setVarPool(processInstance.getVarPool());
-            }
         }
+//        else {
+//            if (StringUtils.isNotEmpty(processInstance.getVarPool())) {
+//                taskInstance.setVarPool(processInstance.getVarPool());
+//            }
+//        }
     }
 
     private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
@@ -1466,7 +1468,14 @@ public class WorkflowExecuteThread implements Runnable {
                 //init varPool only this task is the first time running
                 if (task.isFirstRun()) {
                     //get pre task ,get all the task varPool to this task
-                    Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
+                    Set<String> preTask = new HashSet<>();
+                    preTask.addAll(dag.getPreviousNodes(Long.toString(task.getTaskCode())));
+                    TaskNode taskNode = dag.getNode(Long.toString(task.getTaskCode()));
+                    if (null != taskNode && null != taskNode.getDepList() && !taskNode.getDepList().isEmpty()) {
+                        logger.debug("in submitStandByTask: taskCode:{}, taskType: {}, preTasks: {}, depList:{}",
+                                task.getTaskCode(), taskNode.getType(), taskNode.getPreTasks(), taskNode.getDepList());
+                        preTask.addAll(taskNode.getDepList());
+                    }
                     getPreVarPool(task, preTask);
                 }
                 DependResult dependResult = getDependResultForTask(task);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 29ecf7d1a4..280ef0fe27 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -17,9 +17,18 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+
+import org.apache.dolphinscheduler.common.enums.Direct;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -28,9 +37,19 @@ import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Comparator;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
 
 /**
  *
@@ -110,6 +129,77 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         }
     }
 
+    private  Map<String, Property> mergeEndNodeTaskInstanceVarPool(Set<String> taskCodes) {
+        List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(subProcessInstance.getId());
+        logger.info("in dealFinish1, mergeEndNodeTaskInstanceVarPool, taskInstanceList.size:{}, subProcessInstance.getId:{}", taskInstanceList.size(),subProcessInstance.getId());
+        // filter end nodes and sort by end time reversed
+        List<TaskInstance> endTaskInstancesSortedByEndTimeReversed = taskInstanceList.stream()
+                .filter(o -> taskCodes.contains(Long.toString(o.getTaskCode()))).
+                        sorted(Comparator.comparing(TaskInstance::getEndTime).reversed()).collect(Collectors.toList());
+        logger.info("in dealFinish1, mergeEndNodeTaskInstanceVarPool, endTaskInstancesSortedByEndTimeReversed.size:{}", endTaskInstancesSortedByEndTimeReversed.size());
+        Map<String, Property> allProperties = new HashMap<>();
+        for (TaskInstance taskInstance : endTaskInstancesSortedByEndTimeReversed) {
+            String varPool = taskInstance.getVarPool();
+            if (org.apache.commons.lang.StringUtils.isNotEmpty(varPool)) {
+                List<Property> properties = JSONUtils.toList(varPool, Property.class);
+                properties.forEach(o -> {
+                    allProperties.put(o.getProp(), o);
+                });
+            }
+        }
+        return allProperties;
+    }
+
+    private void dealFinish1() {
+        // build dag
+        ProcessDefinition processDefinition = processService.findProcessDefinition(subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion());
+        if (null == processDefinition) {
+            logger.error("process definition not found in meta data, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}",
+                    subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion(), subProcessInstance.getId());
+            throw new RuntimeException(String.format("process definition  code %s, version %s does not exist", subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion()));
+        }
+        subProcessInstance.setProcessDefinition(processDefinition);
+        DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(subProcessInstance.getProcessDefinition());
+        // get end nodes
+        Set<String> endTaskCodes = dag.getEndNode().stream().collect(Collectors.toSet());
+        logger.info("in dealFinish1, endTaskCodes:{}", endTaskCodes);
+        if (endTaskCodes == null || endTaskCodes.isEmpty()) {
+            return;
+        }
+        // get var pool of sub progress instance;
+        Map<String, Property> varPoolPropertiesMap = mergeEndNodeTaskInstanceVarPool(endTaskCodes);
+        logger.debug("in dealFinish1, varPoolPropertiesMap:{}", varPoolPropertiesMap);
+        // merge var pool: 1. task instance var pool from pre task ; 2. var pool from sub progress
+        // filter by localParams
+        String taskVarPool = taskInstance.getVarPool();
+        Map<String, Property> taskVarPoolProperties = new HashMap<>();
+        if (StringUtils.isNotEmpty(taskVarPool)) {
+            taskVarPoolProperties = JSONUtils.toList(taskVarPool, Property.class).stream().collect(Collectors.toMap(Property::getProp, (p) -> p));
+        }
+        Map<String, Object> taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {
+        });
+        Object localParams = taskParams.get(LOCAL_PARAMS);
+        Map<String, Property> outProperties = new HashMap<>();
+        if (localParams != null) {
+            List<Property> properties = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+            outProperties = properties.stream().filter(r -> Direct.OUT == r.getDirect()).collect(Collectors.toMap(Property::getProp, (p) -> p));
+            // put all task instance var pool from pre task
+            outProperties.putAll(taskVarPoolProperties);
+            for (Map.Entry<String, Property> o : outProperties.entrySet()) {
+                if (varPoolPropertiesMap.containsKey(o.getKey())) {
+                    o.getValue().setValue(varPoolPropertiesMap.get(o.getKey()).getValue());
+                }
+            }
+        } else {
+            outProperties.putAll(taskVarPoolProperties);
+            outProperties.putAll(varPoolPropertiesMap);
+        }
+        taskInstance.setVarPool(JSONUtils.toJsonString(outProperties.values()));
+        logger.debug("in dealFinish1, varPool:{}", taskInstance.getVarPool());
+        //deal with localParam for show in the page
+        processService.changeOutParam(taskInstance);
+    }
+
     @Override
     protected boolean persistTask(TaskAction taskAction) {
         switch (taskAction) {
@@ -175,6 +265,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         sendToSubProcess();
         this.taskInstance.setState(ExecutionStatus.KILL);
         this.taskInstance.setEndTime(new Date());
+        dealFinish1();
         processService.saveTaskInstance(taskInstance);
         return true;
     }