You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/10/19 01:36:53 UTC
[dolphinscheduler] branch dev updated: make sure all failed task will save in errorTaskMap (#12424)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 38b643f69b make sure all failed task will save in errorTaskMap (#12424)
38b643f69b is described below
commit 38b643f69b65f4de9dd43809404470934bfadc7b
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Wed Oct 19 09:36:47 2022 +0800
make sure all failed task will save in errorTaskMap (#12424)
---
.../master/runner/WorkflowExecuteRunnable.java | 23 ++++++++--------------
1 file changed, 8 insertions(+), 15 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 962ed187d6..403135f3bd 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -391,13 +391,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().isFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
+ errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
Long.toString(taskInstance.getTaskCode()),
dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
- errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
@@ -421,7 +421,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* release task group
*
- * @param taskInstance
*/
public void releaseTaskGroup(TaskInstance taskInstance) {
logger.info("Release task group");
@@ -448,7 +447,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* crate new task instance to retry, different objects from the original
*
- * @param taskInstance
*/
private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
if (!taskInstance.taskCanRetry()) {
@@ -662,10 +660,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
private boolean needComplementProcess() {
- if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) {
- return true;
- }
- return false;
+ return processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess();
}
/**
@@ -1069,7 +1064,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* clone a new taskInstance for retry and reset some logic fields
*
- * @return
+ * @return taskInstance
*/
public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
@@ -1097,7 +1092,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* clone a new taskInstance for tolerant and reset some logic fields
*
- * @return
+ * @return taskInstance
*/
public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
@@ -1119,9 +1114,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* new a taskInstance
*
- * @param processInstance
- * @param taskNode
- * @return
+ * @param processInstance process instance
+ * @param taskNode task node
+ * @return task instance
*/
public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
TaskInstance taskInstance = new TaskInstance();
@@ -1445,9 +1440,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
long taskCode = Long.parseLong(dependNodeName);
Integer taskInstanceId = completeTaskMap.get(taskCode);
TaskExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
- if (depTaskState.isFailure()) {
- return false;
- }
+ return !depTaskState.isFailure();
}
return true;
}