You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/09/20 12:06:44 UTC
[dolphinscheduler] branch 2.0.7-prepare updated: to #10692: fix Parameter transfer problem when recover failed task and output of subprogress (#10694)
This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch 2.0.7-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.7-prepare by this push:
new 9ca3962bf1 to #10692: fix Parameter transfer problem when recover failed task and output of subprogress (#10694)
9ca3962bf1 is described below
commit 9ca3962bf129a8f112605191e2c36b7ab235faf5
Author: zwZjut <zw...@163.com>
AuthorDate: Tue Sep 20 20:06:38 2022 +0800
to #10692: fix Parameter transfer problem when recover failed task and output of subprogress (#10694)
Co-authored-by: 宏豁 <ho...@alibaba-inc.com>
---
.../master/runner/WorkflowExecuteThread.java | 21 +++--
.../master/runner/task/SubTaskProcessor.java | 91 ++++++++++++++++++++++
2 files changed, 106 insertions(+), 6 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 411c07b2a1..ae837dfe86 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -78,6 +78,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -461,7 +462,7 @@ public class WorkflowExecuteThread implements Runnable {
taskRetryCheckList.remove(task.getId());
depStateCheckList.remove(task.getId());
if (task.getState().typeIsSuccess()) {
- processInstance.setVarPool(task.getVarPool());
+ // processInstance.setVarPool(task.getVarPool());
processService.saveProcessInstance(processInstance);
submitPostNode(Long.toString(task.getTaskCode()));
} else if (task.getState().typeIsFailure()) {
@@ -925,11 +926,12 @@ public class WorkflowExecuteThread implements Runnable {
if (allProperty.size() > 0) {
taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
}
- } else {
- if (StringUtils.isNotEmpty(processInstance.getVarPool())) {
- taskInstance.setVarPool(processInstance.getVarPool());
- }
}
+// else {
+// if (StringUtils.isNotEmpty(processInstance.getVarPool())) {
+// taskInstance.setVarPool(processInstance.getVarPool());
+// }
+// }
}
private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
@@ -1466,7 +1468,14 @@ public class WorkflowExecuteThread implements Runnable {
//init varPool only this task is the first time running
if (task.isFirstRun()) {
//get pre task ,get all the task varPool to this task
- Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
+ Set<String> preTask = new HashSet<>();
+ preTask.addAll(dag.getPreviousNodes(Long.toString(task.getTaskCode())));
+ TaskNode taskNode = dag.getNode(Long.toString(task.getTaskCode()));
+ if (null != taskNode && null != taskNode.getDepList() && !taskNode.getDepList().isEmpty()) {
+ logger.debug("in submitStandByTask: taskCode:{}, taskType: {}, preTasks: {}, depList:{}",
+ task.getTaskCode(), taskNode.getType(), taskNode.getPreTasks(), taskNode.getDepList());
+ preTask.addAll(taskNode.getDepList());
+ }
getPreVarPool(task, preTask);
}
DependResult dependResult = getDependResultForTask(task);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 29ecf7d1a4..280ef0fe27 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -17,9 +17,18 @@
package org.apache.dolphinscheduler.server.master.runner.task;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+
+import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -28,9 +37,19 @@ import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Comparator;
import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
/**
*
@@ -110,6 +129,77 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
}
+ private Map<String, Property> mergeEndNodeTaskInstanceVarPool(Set<String> taskCodes) {
+ List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(subProcessInstance.getId());
+ logger.info("in dealFinish1, mergeEndNodeTaskInstanceVarPool, taskInstanceList.size:{}, subProcessInstance.getId:{}", taskInstanceList.size(),subProcessInstance.getId());
+ // filter end nodes and sort by end time reversed
+ List<TaskInstance> endTaskInstancesSortedByEndTimeReversed = taskInstanceList.stream()
+ .filter(o -> taskCodes.contains(Long.toString(o.getTaskCode()))).
+ sorted(Comparator.comparing(TaskInstance::getEndTime).reversed()).collect(Collectors.toList());
+ logger.info("in dealFinish1, mergeEndNodeTaskInstanceVarPool, endTaskInstancesSortedByEndTimeReversed.size:{}", endTaskInstancesSortedByEndTimeReversed.size());
+ Map<String, Property> allProperties = new HashMap<>();
+ for (TaskInstance taskInstance : endTaskInstancesSortedByEndTimeReversed) {
+ String varPool = taskInstance.getVarPool();
+ if (org.apache.commons.lang.StringUtils.isNotEmpty(varPool)) {
+ List<Property> properties = JSONUtils.toList(varPool, Property.class);
+ properties.forEach(o -> {
+ allProperties.put(o.getProp(), o);
+ });
+ }
+ }
+ return allProperties;
+ }
+
+ private void dealFinish1() {
+ // build dag
+ ProcessDefinition processDefinition = processService.findProcessDefinition(subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion());
+ if (null == processDefinition) {
+ logger.error("process definition not found in meta data, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}",
+ subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion(), subProcessInstance.getId());
+ throw new RuntimeException(String.format("process definition code %s, version %s does not exist", subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion()));
+ }
+ subProcessInstance.setProcessDefinition(processDefinition);
+ DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(subProcessInstance.getProcessDefinition());
+ // get end nodes
+ Set<String> endTaskCodes = dag.getEndNode().stream().collect(Collectors.toSet());
+ logger.info("in dealFinish1, endTaskCodes:{}", endTaskCodes);
+ if (endTaskCodes == null || endTaskCodes.isEmpty()) {
+ return;
+ }
+ // get var pool of sub progress instance;
+ Map<String, Property> varPoolPropertiesMap = mergeEndNodeTaskInstanceVarPool(endTaskCodes);
+ logger.debug("in dealFinish1, varPoolPropertiesMap:{}", varPoolPropertiesMap);
+ // merge var pool: 1. task instance var pool from pre task ; 2. var pool from sub progress
+ // filter by localParams
+ String taskVarPool = taskInstance.getVarPool();
+ Map<String, Property> taskVarPoolProperties = new HashMap<>();
+ if (StringUtils.isNotEmpty(taskVarPool)) {
+ taskVarPoolProperties = JSONUtils.toList(taskVarPool, Property.class).stream().collect(Collectors.toMap(Property::getProp, (p) -> p));
+ }
+ Map<String, Object> taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {
+ });
+ Object localParams = taskParams.get(LOCAL_PARAMS);
+ Map<String, Property> outProperties = new HashMap<>();
+ if (localParams != null) {
+ List<Property> properties = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+ outProperties = properties.stream().filter(r -> Direct.OUT == r.getDirect()).collect(Collectors.toMap(Property::getProp, (p) -> p));
+ // put all task instance var pool from pre task
+ outProperties.putAll(taskVarPoolProperties);
+ for (Map.Entry<String, Property> o : outProperties.entrySet()) {
+ if (varPoolPropertiesMap.containsKey(o.getKey())) {
+ o.getValue().setValue(varPoolPropertiesMap.get(o.getKey()).getValue());
+ }
+ }
+ } else {
+ outProperties.putAll(taskVarPoolProperties);
+ outProperties.putAll(varPoolPropertiesMap);
+ }
+ taskInstance.setVarPool(JSONUtils.toJsonString(outProperties.values()));
+ logger.debug("in dealFinish1, varPool:{}", taskInstance.getVarPool());
+ //deal with localParam for show in the page
+ processService.changeOutParam(taskInstance);
+ }
+
@Override
protected boolean persistTask(TaskAction taskAction) {
switch (taskAction) {
@@ -175,6 +265,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
sendToSubProcess();
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
+ dealFinish1();
processService.saveTaskInstance(taskInstance);
return true;
}