You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/25 01:38:08 UTC
[dolphinscheduler] branch dev updated: [Bug-7865][MasterServer] retry logic optimization (#8156)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b5fa54b [Bug-7865][MasterServer] retry logic optimization (#8156)
b5fa54b is described below
commit b5fa54b6bec9ca08528952acc20f6787d0e4f74e
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Tue Jan 25 09:37:57 2022 +0800
[Bug-7865][MasterServer] retry logic optimization (#8156)
* submit task optimization
* cloneAndReset
* update
* tolerant task restart when init
* fix snoar check
* fix test
* delete unuse file
* taskInstance key
* code style
* skip complete when retry
Co-authored-by: caishunfeng <53...@qq.com>
---
.../dolphinscheduler/common/enums/StateEvent.java | 10 +
.../common/enums/StateEventType.java | 1 +
.../dolphinscheduler/dao/entity/TaskInstance.java | 13 +-
.../master/runner/StateWheelExecuteThread.java | 205 ++++++---
.../master/runner/WorkflowExecuteThread.java | 508 ++++++++++++++-------
.../master/runner/WorkflowExecuteThreadPool.java | 4 +
.../master/runner/task/BaseTaskProcessor.java | 4 +-
.../master/runner/task/ConditionTaskProcessor.java | 5 -
.../master/runner/task/DependentTaskProcessor.java | 5 -
.../server/master/runner/task/ITaskProcessor.java | 3 +-
.../master/runner/task/SubTaskProcessor.java | 5 -
.../master/runner/task/SwitchTaskProcessor.java | 7 +-
.../server/master/runner/task/TaskInstanceKey.java | 83 ++++
.../server/master/WorkflowExecuteThreadTest.java | 10 +-
.../service/process/ProcessService.java | 30 +-
15 files changed, 605 insertions(+), 288 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
index f24b3c1..7cc806a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
@@ -35,6 +35,8 @@ public class StateEvent {
private int taskInstanceId;
+ private long taskCode;
+
private int processInstanceId;
private String context;
@@ -53,6 +55,10 @@ public class StateEvent {
return taskInstanceId;
}
+ public long getTaskCode() {
+ return taskCode;
+ }
+
public int getProcessInstanceId() {
return processInstanceId;
}
@@ -73,6 +79,10 @@ public class StateEvent {
this.taskInstanceId = taskInstanceId;
}
+ public void setTaskCode(long taskCode) {
+ this.taskCode = taskCode;
+ }
+
public Channel getChannel() {
return channel;
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
index fc795d4..c758bc7 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
@@ -26,6 +26,7 @@ public enum StateEventType {
PROCESS_TIMEOUT(2, "process timeout"),
TASK_TIMEOUT(3, "task timeout"),
WAIT_TASK_GROUP(4, "wait task group"),
+ TASK_RETRY(5, "task retry")
;
StateEventType(int code, String descp) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 976060c..b4e52c0 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.commons.lang3.SerializationUtils;
+
import java.io.Serializable;
import java.util.Date;
import java.util.Map;
@@ -599,7 +601,8 @@ public class TaskInstance implements Serializable {
}
/**
- * determine if you can try again
+ * determine if a task instance can retry
+ * if subProcess,
*
* @return can try result
*/
@@ -609,10 +612,8 @@ public class TaskInstance implements Serializable {
}
if (this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
return true;
- } else {
- return (this.getState().typeIsFailure()
- && this.getRetryTimes() < this.getMaxRetryTimes());
}
+ return this.getState() == ExecutionStatus.FAILURE && (this.getRetryTimes() < this.getMaxRetryTimes());
}
/**
@@ -624,9 +625,7 @@ public class TaskInstance implements Serializable {
if (getState() != ExecutionStatus.FAILURE) {
return true;
}
- if (getId() == 0
- || getMaxRetryTimes() == 0
- || getRetryInterval() == 0) {
+ if (getMaxRetryTimes() == 0 || getRetryInterval() == 0) {
return true;
}
Date now = new Date();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 8c3712f..3abfdd2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner;
+import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
@@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.hadoop.util.ThreadUtil;
import java.util.Map.Entry;
@@ -42,8 +44,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * 1. timeout check wheel
- * 2. dependent task check wheel
+ * Check thread
+ * 1. timeout task check
+ * 2. dependent task state check
+ * 3. retry task check
+ * 4. timeout process check
*/
@Component
public class StateWheelExecuteThread extends Thread {
@@ -56,14 +61,19 @@ public class StateWheelExecuteThread extends Thread {
private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
/**
- * task time out check list, key is taskInstanceId, value is processInstanceId
+ * task time out check list
*/
- private ConcurrentHashMap<Integer, Integer> taskInstanceTimeoutCheckList = new ConcurrentHashMap<>();
+ private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
/**
- * task retry check list, key is taskInstanceId, value is processInstanceId
+ * task retry check list
*/
- private ConcurrentHashMap<Integer, Integer> taskInstanceRetryCheckList = new ConcurrentHashMap<>();
+ private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
+
+ /**
+ * task state check list
+ */
+ private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
@Autowired
private MasterConfig masterConfig;
@@ -80,6 +90,7 @@ public class StateWheelExecuteThread extends Thread {
try {
checkTask4Timeout();
checkTask4Retry();
+ checkTask4State();
checkProcess4Timeout();
} catch (Exception e) {
logger.error("state wheel thread check error:", e);
@@ -96,8 +107,39 @@ public class StateWheelExecuteThread extends Thread {
processInstanceTimeoutCheckList.remove(processInstance.getId());
}
- public void addTask4TimeoutCheck(TaskInstance taskInstance) {
- if (taskInstanceTimeoutCheckList.containsKey(taskInstance.getId())) {
+ private void checkProcess4Timeout() {
+ if (processInstanceTimeoutCheckList.isEmpty()) {
+ return;
+ }
+ for (Integer processInstanceId : processInstanceTimeoutCheckList) {
+ if (processInstanceId == null) {
+ continue;
+ }
+ WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteThread == null) {
+ logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
+ processInstanceTimeoutCheckList.remove(processInstanceId);
+ continue;
+ }
+ ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
+ if (processInstance == null) {
+ continue;
+ }
+ long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
+ if (timeRemain < 0) {
+ addProcessTimeoutEvent(processInstance);
+ processInstanceTimeoutCheckList.remove(processInstance.getId());
+ }
+ }
+ }
+
+ public void addTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+ if (taskInstanceKey == null) {
+ logger.error("taskInstanceKey is null");
+ return;
+ }
+ if (taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
@@ -106,19 +148,29 @@ public class StateWheelExecuteThread extends Thread {
return;
}
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
- taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+ taskInstanceTimeoutCheckList.add(taskInstanceKey);
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
- taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+ taskInstanceTimeoutCheckList.add(taskInstanceKey);
}
}
- public void removeTask4TimeoutCheck(TaskInstance taskInstance) {
- taskInstanceTimeoutCheckList.remove(taskInstance.getId());
+ public void removeTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+ if (taskInstanceKey == null) {
+ logger.error("taskInstanceKey is null");
+ return;
+ }
+ taskInstanceTimeoutCheckList.remove(taskInstanceKey);
}
- public void addTask4RetryCheck(TaskInstance taskInstance) {
- if (taskInstanceRetryCheckList.containsKey(taskInstance.getId())) {
+ public void addTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+ if (taskInstanceKey == null) {
+ logger.error("taskInstanceKey is null");
+ return;
+ }
+ if (taskInstanceRetryCheckList.contains(taskInstanceKey)) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
@@ -126,43 +178,69 @@ public class StateWheelExecuteThread extends Thread {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
}
- if (taskInstance.taskCanRetry()) {
- taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+ logger.debug("addTask4RetryCheck, taskCode:{}, processInstanceId:{}", taskInstance.getTaskCode(), taskInstance.getProcessInstanceId());
+ taskInstanceRetryCheckList.add(taskInstanceKey);
+ }
+
+ public void removeTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+ if (taskInstanceKey == null) {
+ logger.error("taskInstanceKey is null");
+ return;
}
+ taskInstanceRetryCheckList.remove(taskInstanceKey);
+ }
+ public void addTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+ if (taskInstanceKey == null) {
+ logger.error("taskInstanceKey is null");
+ return;
+ }
+ if (taskInstanceStateCheckList.contains(taskInstanceKey)) {
+ return;
+ }
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
- taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+ taskInstanceStateCheckList.add(taskInstanceKey);
}
}
- public void removeTask4RetryCheck(TaskInstance taskInstance) {
- taskInstanceRetryCheckList.remove(taskInstance.getId());
+ public void removeTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+ TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+ if (taskInstanceKey == null) {
+ logger.error("taskInstanceKey is null");
+ return;
+ }
+ taskInstanceStateCheckList.remove(taskInstanceKey);
}
private void checkTask4Timeout() {
if (taskInstanceTimeoutCheckList.isEmpty()) {
return;
}
- for (Entry<Integer, Integer> entry : taskInstanceTimeoutCheckList.entrySet()) {
- int processInstanceId = entry.getValue();
- int taskInstanceId = entry.getKey();
+ for (TaskInstanceKey taskInstanceKey : taskInstanceTimeoutCheckList) {
+ int processInstanceId = taskInstanceKey.getProcessInstanceId();
+ long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
- logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}",
- processInstanceId, taskInstanceId);
- taskInstanceTimeoutCheckList.remove(taskInstanceId);
+ logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+ processInstanceId, taskCode);
+ taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
- TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+ TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (taskInstance == null) {
+ logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+ processInstanceId, taskCode);
+ taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
addTaskTimeoutEvent(taskInstance);
- taskInstanceTimeoutCheckList.remove(taskInstance.getId());
+ taskInstanceTimeoutCheckList.remove(taskInstanceKey);
}
}
}
@@ -172,54 +250,63 @@ public class StateWheelExecuteThread extends Thread {
if (taskInstanceRetryCheckList.isEmpty()) {
return;
}
- for (Entry<Integer, Integer> entry : taskInstanceRetryCheckList.entrySet()) {
- int processInstanceId = entry.getValue();
- int taskInstanceId = entry.getKey();
+ for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) {
+ int processInstanceId = taskInstanceKey.getProcessInstanceId();
+ long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
- logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}",
- processInstanceId, taskInstanceId);
- taskInstanceRetryCheckList.remove(taskInstanceId);
+ logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+ processInstanceId, taskCode);
+ taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
- TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+ TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
if (taskInstance == null) {
+ logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+ processInstanceId, taskCode);
+ taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
- if (!taskInstance.getState().typeIsFinished() && (taskInstance.isSubProcess() || taskInstance.isDependTask())) {
- addTaskStateChangeEvent(taskInstance);
- } else if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
- addTaskStateChangeEvent(taskInstance);
- taskInstanceRetryCheckList.remove(taskInstance.getId());
+ if (taskInstance.retryTaskIntervalOverTime()) {
+ // reset taskInstance endTime and state
+ // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
+ taskInstance.setEndTime(null);
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+
+ addTaskRetryEvent(taskInstance);
+ taskInstanceRetryCheckList.remove(taskInstanceKey);
}
}
}
- private void checkProcess4Timeout() {
- if (processInstanceTimeoutCheckList.isEmpty()) {
+ private void checkTask4State() {
+ if (taskInstanceStateCheckList.isEmpty()) {
return;
}
- for (Integer processInstanceId : processInstanceTimeoutCheckList) {
- if (processInstanceId == null) {
- continue;
- }
+ for (TaskInstanceKey taskInstanceKey : taskInstanceStateCheckList) {
+ int processInstanceId = taskInstanceKey.getProcessInstanceId();
+ long taskCode = taskInstanceKey.getTaskCode();
+
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
- logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
- processInstanceTimeoutCheckList.remove(processInstanceId);
+ logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+ processInstanceId, taskCode);
+ taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
- ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
- if (processInstance == null) {
+ TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
+ if (taskInstance == null) {
+ logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+ processInstanceId, taskCode);
+ taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
- long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
- if (timeRemain < 0) {
- addProcessTimeoutEvent(processInstance);
- processInstanceTimeoutCheckList.remove(processInstance.getId());
+ if (taskInstance.getState().typeIsFinished()) {
+ continue;
}
+ addTaskStateChangeEvent(taskInstance);
}
}
@@ -228,6 +315,17 @@ public class StateWheelExecuteThread extends Thread {
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
+ stateEvent.setTaskCode(taskInstance.getTaskCode());
+ stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+ workflowExecuteThreadPool.submitStateEvent(stateEvent);
+ }
+
+ private void addTaskRetryEvent(TaskInstance taskInstance) {
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setType(StateEventType.TASK_RETRY);
+ stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
+ stateEvent.setTaskInstanceId(taskInstance.getId());
+ stateEvent.setTaskCode(taskInstance.getTaskCode());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
@@ -237,6 +335,7 @@ public class StateWheelExecuteThread extends Thread {
stateEvent.setType(StateEventType.TASK_TIMEOUT);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
+ stateEvent.setTaskCode(taskInstance.getTaskCode());
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 73e655f..2bc16a3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -159,34 +159,38 @@ public class WorkflowExecuteThread {
private Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
/**
- * running TaskNode, taskId as key
+ * running taskProcessor, taskCode as key, taskProcessor as value
+ * only on taskProcessor per taskCode
*/
- private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
+ private final Map<Long, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
/**
* valid task map, taskCode as key, taskId as value
+ * in a DAG, only one taskInstance per taskCode is valid
*/
- private Map<String, Integer> validTaskMap = new ConcurrentHashMap<>();
+ private Map<Long, Integer> validTaskMap = new ConcurrentHashMap<>();
/**
- * error task map, taskCode as key, taskId as value
+ * error task map, taskCode as key, taskInstanceId as value
+ * in a DAG, only one taskInstance per taskCode is valid
*/
- private Map<String, Integer> errorTaskMap = new ConcurrentHashMap<>();
+ private Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
/**
- * complete task map, taskCode as key, taskId as value
+ * complete task map, taskCode as key, taskInstanceId as value
+ * in a DAG, only one taskInstance per taskCode is valid
*/
- private Map<String, Integer> completeTaskMap = new ConcurrentHashMap<>();
+ private Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
/**
* depend failed task map, taskCode as key, taskId as value
*/
- private Map<String, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
+ private Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
/**
* forbidden task map, code as key
*/
- private Map<String, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
+ private Map<Long, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
/**
* skip task map, code as key
@@ -209,6 +213,12 @@ public class WorkflowExecuteThread {
private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
/**
+ * wait to retry taskInstance map, taskCode as key, taskInstance as value
+ * before retry, the taskInstance id is 0
+ */
+ private Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<>();
+
+ /**
* state wheel execute thread
*/
private StateWheelExecuteThread stateWheelExecuteThread;
@@ -317,6 +327,9 @@ public class WorkflowExecuteThread {
case WAIT_TASK_GROUP:
result = checkForceStartAndWakeUp(stateEvent);
break;
+ case TASK_RETRY:
+ result = taskRetryEventHandler(stateEvent);
+ break;
default:
break;
}
@@ -330,8 +343,8 @@ public class WorkflowExecuteThread {
private boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
- ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+ ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
taskProcessor.init(taskInstance, processInstance);
taskProcessor.action(TaskAction.DISPATCH);
@@ -341,8 +354,8 @@ public class WorkflowExecuteThread {
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue);
if (acquireTaskGroup) {
- ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+ ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
taskProcessor.init(taskInstance, processInstance);
taskProcessor.action(TaskAction.DISPATCH);
@@ -363,7 +376,7 @@ public class WorkflowExecuteThread {
}
TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy) {
- ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
+ ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.TIMEOUT);
} else {
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, taskInstance.getTaskDefine());
@@ -387,79 +400,57 @@ public class WorkflowExecuteThread {
return true;
}
- if (task.getState().typeIsFinished() && !completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) {
+ if (task.getState().typeIsFinished()) {
+ if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
+ return true;
+ }
taskFinished(task);
if (task.getTaskGroupId() > 0) {
- //release task group
- TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(task);
- if (nextTaskInstance != null) {
- if (nextTaskInstance.getProcessInstanceId() == task.getProcessInstanceId()) {
- StateEvent nextEvent = new StateEvent();
- nextEvent.setProcessInstanceId(this.processInstance.getId());
- nextEvent.setTaskInstanceId(nextTaskInstance.getId());
- nextEvent.setType(StateEventType.WAIT_TASK_GROUP);
- this.stateEvents.add(nextEvent);
- } else {
- ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
- this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
- org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
- }
- }
+ releaseTaskGroup(task);
}
- } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
- ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
+ return true;
+ }
+ if (activeTaskProcessorMaps.containsKey(task.getTaskCode())) {
+ ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(task.getTaskCode());
iTaskProcessor.action(TaskAction.RUN);
- if (iTaskProcessor.taskState().typeIsFinished()) {
- task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+ if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
taskFinished(task);
}
- } else {
- logger.error("state handler error: {}", stateEvent);
+ return true;
}
+ logger.error("state handler error: {}", stateEvent);
+
return true;
}
- private void taskFinished(TaskInstance task) {
- logger.info("work flow {} task {} state:{} ",
+ private void taskFinished(TaskInstance taskInstance) {
+ logger.info("work flow {} task id:{} code:{} state:{} ",
processInstance.getId(),
- task.getId(),
- task.getState());
- if (task.taskCanRetry()) {
- addTaskToStandByList(task);
- if (!task.retryTaskIntervalOverTime()) {
- logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{} / {}, interval:{}",
- processInstance.getId(),
- task.getId(),
- task.getState(),
- task.getRetryTimes(),
- task.getMaxRetryTimes(),
- task.getRetryInterval());
- stateWheelExecuteThread.addTask4TimeoutCheck(task);
- stateWheelExecuteThread.addTask4RetryCheck(task);
- } else {
- submitStandByTask();
- stateWheelExecuteThread.removeTask4TimeoutCheck(task);
- stateWheelExecuteThread.removeTask4RetryCheck(task);
- }
- return;
- }
+ taskInstance.getId(),
+ taskInstance.getTaskCode(),
+ taskInstance.getState());
- completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
- activeTaskProcessorMaps.remove(task.getId());
- stateWheelExecuteThread.removeTask4TimeoutCheck(task);
- stateWheelExecuteThread.removeTask4RetryCheck(task);
+ activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
+ stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
+ stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
+ stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);
- if (task.getState().typeIsSuccess()) {
- processInstance.setVarPool(task.getVarPool());
+ if (taskInstance.getState().typeIsSuccess()) {
+ completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
+ processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
- submitPostNode(Long.toString(task.getTaskCode()));
- } else if (task.getState().typeIsFailure()) {
- if (task.isConditionsTask()
- || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
- submitPostNode(Long.toString(task.getTaskCode()));
+ submitPostNode(Long.toString(taskInstance.getTaskCode()));
+ } else if (taskInstance.taskCanRetry()) {
+ // retry task
+ retryTaskInstance(taskInstance);
+ } else if (taskInstance.getState().typeIsFailure()) {
+ completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
+ if (taskInstance.isConditionsTask()
+ || DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
+ submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
- errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
@@ -469,6 +460,73 @@ public class WorkflowExecuteThread {
}
/**
+ * release task group
+ * @param taskInstance
+ */
+ private void releaseTaskGroup(TaskInstance taskInstance) {
+ if (taskInstance.getTaskGroupId() > 0) {
+ TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
+ if (nextTaskInstance != null) {
+ if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) {
+ StateEvent nextEvent = new StateEvent();
+ nextEvent.setProcessInstanceId(this.processInstance.getId());
+ nextEvent.setTaskInstanceId(nextTaskInstance.getId());
+ nextEvent.setType(StateEventType.WAIT_TASK_GROUP);
+ this.stateEvents.add(nextEvent);
+ } else {
+ ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
+ this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
+ org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
+ }
+ }
+ }
+ }
+
+ /**
+ * crate new task instance to retry, different objects from the original
+ * @param taskInstance
+ */
+ private void retryTaskInstance(TaskInstance taskInstance) {
+ if (!taskInstance.taskCanRetry()) {
+ return;
+ }
+ TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
+ if (newTaskInstance == null) {
+ logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
+ return;
+ }
+ waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
+ if (!taskInstance.retryTaskIntervalOverTime()) {
+ logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
+ processInstance.getId(),
+ newTaskInstance.getTaskCode(),
+ newTaskInstance.getState(),
+ newTaskInstance.getRetryTimes(),
+ newTaskInstance.getMaxRetryTimes(),
+ newTaskInstance.getRetryInterval());
+ stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
+ stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
+ } else {
+ addTaskToStandByList(newTaskInstance);
+ submitStandByTask();
+ waitToRetryTaskInstanceMap.remove(newTaskInstance.getTaskCode());
+ }
+ }
+
+ /**
+ * handle task retry event
+ * @param stateEvent
+ * @return
+ */
+ private boolean taskRetryEventHandler(StateEvent stateEvent) {
+ TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode());
+ addTaskToStandByList(taskInstance);
+ submitStandByTask();
+ waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode());
+ return true;
+ }
+
+ /**
* update process instance
*/
public void refreshProcessInstance(int processInstanceId) {
@@ -492,9 +550,9 @@ public class WorkflowExecuteThread {
processService.packageTaskInstance(taskInstance, processInstance);
taskInstanceMap.put(taskInstance.getId(), taskInstance);
- validTaskMap.remove(Long.toString(taskInstance.getTaskCode()));
+ validTaskMap.remove(taskInstance.getTaskCode());
if (Flag.YES == taskInstance.getFlag()) {
- validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
+ validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
}
}
@@ -561,6 +619,20 @@ public class WorkflowExecuteThread {
return null;
}
+ public TaskInstance getActiveTaskInstanceByTaskCode(long taskCode) {
+ if (activeTaskProcessorMaps.containsKey(taskCode)) {
+ return activeTaskProcessorMaps.get(taskCode).taskInstance();
+ }
+ return null;
+ }
+
+ public TaskInstance getRetryTaskInstanceByTaskCode(long taskCode) {
+ if (waitToRetryTaskInstanceMap.containsKey(taskCode)) {
+ return waitToRetryTaskInstanceMap.get(taskCode);
+ }
+ return null;
+ }
+
private boolean processStateChangeHandler(StateEvent stateEvent) {
try {
logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
@@ -736,7 +808,7 @@ public class WorkflowExecuteThread {
taskNodeList.forEach(taskNode -> {
if (taskNode.isForbidden()) {
- forbiddenTaskMap.put(Long.toString(taskNode.getCode()), taskNode);
+ forbiddenTaskMap.put(taskNode.getCode(), taskNode);
}
});
@@ -767,17 +839,39 @@ public class WorkflowExecuteThread {
if (!isNewProcessInstance()) {
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
- validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ if (validTaskMap.containsKey(task.getTaskCode())) {
+ int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
+ TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
+ if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) {
+ task.setFlag(Flag.NO);
+ processService.updateTaskInstance(task);
+ continue;
+ }
+ logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", task.getTaskCode());
+ }
+
+ validTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
if (task.isTaskComplete()) {
- completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ completeTaskMap.put(task.getTaskCode(), task.getId());
+ continue;
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
continue;
}
- if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
- errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ if (task.taskCanRetry()) {
+ if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
+ // tolerantTaskInstance add to standby list directly
+ TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
+ addTaskToStandByList(tolerantTaskInstance);
+ } else {
+ retryTaskInstance(task);
+ }
+ continue;
+ }
+ if (task.getState().typeIsFailure()) {
+ errorTaskMap.put(task.getTaskCode(), task.getId());
}
}
}
@@ -832,19 +926,32 @@ public class WorkflowExecuteThread {
taskInstance.getId(), taskInstance.getName());
return null;
}
- validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
+
+ // in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid
+ if (validTaskMap.containsKey(taskInstance.getTaskCode())) {
+ int oldTaskInstanceId = validTaskMap.get(taskInstance.getTaskCode());
+ if (taskInstance.getId() != oldTaskInstanceId) {
+ TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
+ oldTaskInstance.setFlag(Flag.NO);
+ processService.updateTaskInstance(oldTaskInstance);
+ validTaskMap.remove(taskInstance.getTaskCode());
+ activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
+ }
+ }
+
+ validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
- activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
+ activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
taskProcessor.action(TaskAction.RUN);
- stateWheelExecuteThread.addTask4TimeoutCheck(taskInstance);
- stateWheelExecuteThread.addTask4RetryCheck(taskInstance);
+ stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
+ stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance);
- if (taskProcessor.taskState().typeIsFinished()) {
+ if (taskProcessor.taskInstance().getState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setTaskInstanceId(taskInstance.getId());
- stateEvent.setExecutionStatus(taskProcessor.taskState());
+ stateEvent.setExecutionStatus(taskProcessor.taskInstance().getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
this.stateEvents.add(stateEvent);
}
@@ -898,78 +1005,129 @@ public class WorkflowExecuteThread {
*/
private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(), taskNode.getVersion());
- if (taskInstance == null) {
- taskInstance = new TaskInstance();
- taskInstance.setTaskCode(taskNode.getCode());
- taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
- // task name
- taskInstance.setName(taskNode.getName());
- // task instance state
- taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
- // process instance id
- taskInstance.setProcessInstanceId(processInstance.getId());
- // task instance type
- taskInstance.setTaskType(taskNode.getType().toUpperCase());
- // task instance whether alert
- taskInstance.setAlertFlag(Flag.NO);
-
- // task instance start time
- taskInstance.setStartTime(null);
-
- // task instance flag
- taskInstance.setFlag(Flag.YES);
-
- // task dry run flag
- taskInstance.setDryRun(processInstance.getDryRun());
-
- // task instance retry times
- taskInstance.setRetryTimes(0);
-
- // max task instance retry times
- taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes());
-
- // retry task instance interval
- taskInstance.setRetryInterval(taskNode.getRetryInterval());
-
- //set task param
- taskInstance.setTaskParams(taskNode.getTaskParams());
-
- //set task group and priority
- taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
- taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
-
- // task instance priority
- if (taskNode.getTaskInstancePriority() == null) {
- taskInstance.setTaskInstancePriority(Priority.MEDIUM);
- } else {
- taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
- }
+ if (taskInstance != null) {
+ return taskInstance;
+ }
+
+ return newTaskInstance(processInstance, taskNode);
+ }
+
+ /**
+ * clone a new taskInstance for retry and reset some logic fields
+ * @return
+ */
+ public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
+ TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
+ if (taskNode == null) {
+ logger.error("taskNode is null, code:{}", taskInstance.getTaskCode());
+ return null;
+ }
+ TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode);
+ newTaskInstance.setTaskDefine(taskInstance.getTaskDefine());
+ newTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
+ newTaskInstance.setProcessInstance(processInstance);
+ newTaskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
+ // todo relative funtion: TaskInstance.retryTaskIntervalOverTime
+ newTaskInstance.setState(taskInstance.getState());
+ newTaskInstance.setEndTime(taskInstance.getEndTime());
+ return newTaskInstance;
+ }
- String processWorkerGroup = processInstance.getWorkerGroup();
- processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup;
- String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
+ /**
+ * clone a new taskInstance for tolerant and reset some logic fields
+ * @return
+ */
+ public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
+ TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
+ if (taskNode == null) {
+ logger.error("taskNode is null, code:{}", taskInstance.getTaskCode());
+ return null;
+ }
+ TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode);
+ newTaskInstance.setTaskDefine(taskInstance.getTaskDefine());
+ newTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
+ newTaskInstance.setProcessInstance(processInstance);
+ newTaskInstance.setRetryTimes(taskInstance.getRetryTimes());
+ newTaskInstance.setState(taskInstance.getState());
+ return newTaskInstance;
+ }
- Long processEnvironmentCode = Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : processInstance.getEnvironmentCode();
- Long taskEnvironmentCode = Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode();
+ /**
+ * new a taskInstance
+ * @param processInstance
+ * @param taskNode
+ * @return
+ */
+ public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setTaskCode(taskNode.getCode());
+ taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
+ // task name
+ taskInstance.setName(taskNode.getName());
+ // task instance state
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+ // process instance id
+ taskInstance.setProcessInstanceId(processInstance.getId());
+ // task instance type
+ taskInstance.setTaskType(taskNode.getType().toUpperCase());
+ // task instance whether alert
+ taskInstance.setAlertFlag(Flag.NO);
+
+ // task instance start time
+ taskInstance.setStartTime(null);
+
+ // task instance flag
+ taskInstance.setFlag(Flag.YES);
+
+ // task dry run flag
+ taskInstance.setDryRun(processInstance.getDryRun());
+
+ // task instance retry times
+ taskInstance.setRetryTimes(0);
+
+ // max task instance retry times
+ taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes());
+
+ // retry task instance interval
+ taskInstance.setRetryInterval(taskNode.getRetryInterval());
+
+ //set task param
+ taskInstance.setTaskParams(taskNode.getTaskParams());
+
+ //set task group and priority
+ taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
+ taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
+
+ // task instance priority
+ if (taskNode.getTaskInstancePriority() == null) {
+ taskInstance.setTaskInstancePriority(Priority.MEDIUM);
+ } else {
+ taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
+ }
- if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
- taskInstance.setWorkerGroup(processWorkerGroup);
- taskInstance.setEnvironmentCode(processEnvironmentCode);
- } else {
- taskInstance.setWorkerGroup(taskWorkerGroup);
- taskInstance.setEnvironmentCode(taskEnvironmentCode);
- }
+ String processWorkerGroup = processInstance.getWorkerGroup();
+ processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup;
+ String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
- if (!taskInstance.getEnvironmentCode().equals(-1L)) {
- Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
- if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) {
- taskInstance.setEnvironmentConfig(environment.getConfig());
- }
- }
- // delay execution time
- taskInstance.setDelayTime(taskNode.getDelayTime());
+ Long processEnvironmentCode = Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : processInstance.getEnvironmentCode();
+ Long taskEnvironmentCode = Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode();
+
+ if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
+ taskInstance.setWorkerGroup(processWorkerGroup);
+ taskInstance.setEnvironmentCode(processEnvironmentCode);
+ } else {
+ taskInstance.setWorkerGroup(taskWorkerGroup);
+ taskInstance.setEnvironmentCode(taskEnvironmentCode);
}
+ if (!taskInstance.getEnvironmentCode().equals(-1L)) {
+ Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
+ if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) {
+ taskInstance.setEnvironmentConfig(environment.getConfig());
+ }
+ }
+ // delay execution time
+ taskInstance.setDelayTime(taskNode.getDelayTime());
return taskInstance;
}
@@ -978,7 +1136,7 @@ public class WorkflowExecuteThread {
Map<String, TaskInstance> allTaskInstance = new HashMap<>();
if (CollectionUtils.isNotEmpty(preTask)) {
for (String preTaskCode : preTask) {
- Integer taskId = completeTaskMap.get(preTaskCode);
+ Integer taskId = completeTaskMap.get(Long.parseLong(preTaskCode));
if (taskId == null) {
continue;
}
@@ -1073,7 +1231,7 @@ public class WorkflowExecuteThread {
continue;
}
- if (completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) {
+ if (task.getId() > 0 && completeTaskMap.containsKey(task.getTaskCode())) {
logger.info("task {} has already run success", task.getName());
continue;
}
@@ -1106,10 +1264,11 @@ public class WorkflowExecuteThread {
for (String depsNode : indirectDepCodeList) {
if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) {
// dependencies must be fully completed
- if (!completeTaskMap.containsKey(depsNode)) {
+ Long despNodeTaskCode = Long.parseLong(depsNode);
+ if (!completeTaskMap.containsKey(despNodeTaskCode)) {
return DependResult.WAITING;
}
- Integer depsTaskId = completeTaskMap.get(depsNode);
+ Integer depsTaskId = completeTaskMap.get(despNodeTaskCode);
ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
return DependResult.NON_EXEC;
@@ -1138,7 +1297,7 @@ public class WorkflowExecuteThread {
TaskNode taskNode = dag.getNode(taskCode);
List<String> depCodeList = taskNode.getDepList();
for (String depsNode : depCodeList) {
- if (forbiddenTaskMap.containsKey(depsNode)) {
+ if (forbiddenTaskMap.containsKey(Long.parseLong(depsNode))) {
setIndirectDepList(depsNode, indirectDepCodeList);
} else {
indirectDepCodeList.add(depsNode);
@@ -1157,7 +1316,8 @@ public class WorkflowExecuteThread {
return false;
}
} else {
- Integer taskInstanceId = completeTaskMap.get(dependNodeName);
+ long taskCode = Long.parseLong(dependNodeName);
+ Integer taskInstanceId = completeTaskMap.get(taskCode);
ExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
if (depTaskState.typeIsFailure()) {
return false;
@@ -1228,7 +1388,9 @@ public class WorkflowExecuteThread {
return true;
}
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
- return readyToSubmitTaskQueue.size() == 0 && activeTaskProcessorMaps.size() == 0;
+ return readyToSubmitTaskQueue.size() == 0
+ && activeTaskProcessorMaps.size() == 0
+ && waitToRetryTaskInstanceMap.size() == 0;
}
}
return false;
@@ -1310,7 +1472,7 @@ public class WorkflowExecuteThread {
// success
if (state == ExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
- if (readyToSubmitTaskQueue.size() > 0) {
+ if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
//tasks currently pending submission, no retries, indicating that depend is waiting to complete
return ExecutionStatus.RUNNING_EXECUTION;
} else if (CollectionUtils.isNotEmpty(killTasks)) {
@@ -1445,19 +1607,23 @@ public class WorkflowExecuteThread {
private void killAllTasks() {
logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
activeTaskProcessorMaps.size());
- for (int taskId : activeTaskProcessorMaps.keySet()) {
- TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
+ for (long taskCode : activeTaskProcessorMaps.keySet()) {
+ ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode);
+ Integer taskInstanceId = validTaskMap.get(taskCode);
+ if (taskInstanceId == null || taskInstanceId.equals(0)) {
+ continue;
+ }
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
if (taskInstance == null || taskInstance.getState().typeIsFinished()) {
continue;
}
- ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskId);
taskProcessor.action(TaskAction.STOP);
- if (taskProcessor.taskState().typeIsFinished()) {
+ if (taskProcessor.taskInstance().getState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setTaskInstanceId(taskInstance.getId());
- stateEvent.setExecutionStatus(taskProcessor.taskState());
+ stateEvent.setExecutionStatus(taskProcessor.taskInstance().getState());
this.addStateEvent(stateEvent);
}
}
@@ -1485,7 +1651,7 @@ public class WorkflowExecuteThread {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
- completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ completeTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
submitPostNode(Long.toString(task.getTaskCode()));
continue;
@@ -1499,21 +1665,15 @@ public class WorkflowExecuteThread {
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
- if (task.retryTaskIntervalOverTime()) {
- int originalId = task.getId();
- TaskInstance taskInstance = submitTaskExec(task);
- if (taskInstance == null) {
- this.taskFailedSubmit = true;
- } else {
- removeTaskFromStandbyList(task);
- if (taskInstance.getId() != originalId) {
- activeTaskProcessorMaps.remove(originalId);
- }
- }
+ TaskInstance taskInstance = submitTaskExec(task);
+ if (taskInstance == null) {
+ this.taskFailedSubmit = true;
+ } else {
+ removeTaskFromStandbyList(task);
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
- dependFailedTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ dependFailedTaskMap.put(task.getTaskCode(), task.getId());
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 4587055..f9fb9c9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -62,6 +62,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@Autowired
private StateEventCallbackService stateEventCallbackService;
+ @Autowired
+ private StateWheelExecuteThread stateWheelExecuteThread;
+
/**
* multi-thread filter, avoid handling workflow at the same time
*/
@@ -119,6 +122,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onSuccess(Object result) {
if (workflowExecuteThread.workFlowFinish()) {
+ stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged(workflowExecuteThread.getProcessInstance());
logger.info("process instance {} finished.", processInstanceId);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 6e67d48..deb0166 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -204,8 +204,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return null;
}
- public ExecutionStatus taskState() {
- return this.taskInstance.getState();
+ public TaskInstance taskInstance() {
+ return this.taskInstance;
}
/**
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 9646285..bb639a3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -72,11 +72,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
}
@Override
- public ExecutionStatus taskState() {
- return this.taskInstance.getState();
- }
-
- @Override
public boolean runTask() {
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 0f89a44..a2bbae7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -90,11 +90,6 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
}
@Override
- public ExecutionStatus taskState() {
- return this.taskInstance.getState();
- }
-
- @Override
public boolean runTask() {
if (!allDependentItemFinished) {
allDependentItemFinished = allDependentTaskFinish();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index d1f3c4c..d7d241b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -32,6 +31,6 @@ public interface ITaskProcessor {
String getType();
- ExecutionStatus taskState();
+ TaskInstance taskInstance();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 5a63556..19217af 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -65,11 +65,6 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
@Override
- public ExecutionStatus taskState() {
- return this.taskInstance.getState();
- }
-
- @Override
public boolean runTask() {
try {
this.runLock.lock();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 912822b..b194eac 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -77,7 +77,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
@Override
public boolean runTask() {
try {
- if (!this.taskState().typeIsFinished() && setSwitchResult()) {
+ if (!this.taskInstance().getState().typeIsFinished() && setSwitchResult()) {
endTaskState();
}
} catch (Exception e) {
@@ -120,11 +120,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return TaskType.SWITCH.getDesc();
}
- @Override
- public ExecutionStatus taskState() {
- return this.taskInstance.getState();
- }
-
private boolean setSwitchResult() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
taskInstance.getProcessInstanceId()
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
new file mode 100644
index 0000000..a5d7707
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.Objects;
+
+/**
+ * task instance key, processInstanceId
+ */
+public class TaskInstanceKey {
+ private int processInstanceId;
+ private long taskCode;
+ private int taskVersion;
+
+ public TaskInstanceKey(int processInstanceId, long taskCode, int taskVersion) {
+ this.processInstanceId = processInstanceId;
+ this.taskCode = taskCode;
+ this.taskVersion = taskVersion;
+ }
+
+ public int getProcessInstanceId() {
+ return processInstanceId;
+ }
+
+ public long getTaskCode() {
+ return taskCode;
+ }
+
+ public int getTaskVersion() {
+ return taskVersion;
+ }
+
+ public static TaskInstanceKey getTaskInstanceKey(ProcessInstance processInstance, TaskInstance taskInstance) {
+ if (processInstance == null || taskInstance == null) {
+ return null;
+ }
+ return new TaskInstanceKey(processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
+ }
+
+ @Override
+ public String toString() {
+ return "TaskKey{" +
+ "processInstanceId=" + processInstanceId +
+ ", taskCode=" + taskCode +
+ ", taskVersion=" + taskVersion +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskInstanceKey taskInstanceKey = (TaskInstanceKey) o;
+ return processInstanceId == taskInstanceKey.processInstanceId && taskCode == taskInstanceKey.taskCode && taskVersion == taskInstanceKey.taskVersion;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(processInstanceId, taskCode, taskVersion);
+ }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 3d18498..5907a81 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -156,7 +156,7 @@ public class WorkflowExecuteThreadTest {
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
Mockito.when(processService.findTaskInstanceByIdList(
- Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
+ Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
@@ -198,9 +198,9 @@ public class WorkflowExecuteThreadTest {
taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
- Map<String, Integer> completeTaskList = new ConcurrentHashMap<>();
- completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance1.getId());
- completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance2.getId());
+ Map<Long, Integer> completeTaskList = new ConcurrentHashMap<>();
+ completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId());
+ completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
@@ -216,7 +216,7 @@ public class WorkflowExecuteThreadTest {
Assert.assertNotNull(taskInstance.getVarPool());
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
- completeTaskList.put(Long.toString(taskInstance2.getTaskCode()), taskInstance2.getId());
+ completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
completeTaskMapField.setAccessible(true);
completeTaskMapField.set(workflowExecuteThread, completeTaskList);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 5cebcf7..6d49860 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -114,6 +114,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.SerializationUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -1378,30 +1379,11 @@ public class ProcessService {
*/
public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
ExecutionStatus processInstanceState = processInstance.getState();
-
- if (taskInstance.getState().typeIsFailure()) {
- if (taskInstance.isSubProcess()) {
- taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
- } else {
- if (processInstanceState != ExecutionStatus.READY_STOP
- && processInstanceState != ExecutionStatus.READY_PAUSE) {
- // failure task set invalid
- taskInstance.setFlag(Flag.NO);
- updateTaskInstance(taskInstance);
- // crate new task instance
- if (taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE) {
- taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
- }
- taskInstance.setSubmitTime(null);
- taskInstance.setLogPath(null);
- taskInstance.setExecutePath(null);
- taskInstance.setStartTime(null);
- taskInstance.setEndTime(null);
- taskInstance.setFlag(Flag.YES);
- taskInstance.setHost(null);
- taskInstance.setId(0);
- }
- }
+ if (processInstanceState.typeIsFinished()
+ || processInstanceState == ExecutionStatus.READY_PAUSE
+ || processInstanceState == ExecutionStatus.READY_STOP) {
+ logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState);
+ return null;
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());