You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/25 01:38:08 UTC

[dolphinscheduler] branch dev updated: [Bug-7865][MasterServer] retry logic optimization (#8156)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b5fa54b  [Bug-7865][MasterServer] retry logic optimization (#8156)
b5fa54b is described below

commit b5fa54b6bec9ca08528952acc20f6787d0e4f74e
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Tue Jan 25 09:37:57 2022 +0800

    [Bug-7865][MasterServer] retry logic optimization (#8156)
    
    * submit task optimization
    
    * cloneAndReset
    
    * update
    
    * tolerant task restart when init
    
    * fix snoar check
    
    * fix test
    
    * delete unuse file
    
    * taskInstance key
    
    * code style
    
    * skip complete when retry
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../dolphinscheduler/common/enums/StateEvent.java  |  10 +
 .../common/enums/StateEventType.java               |   1 +
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  13 +-
 .../master/runner/StateWheelExecuteThread.java     | 205 ++++++---
 .../master/runner/WorkflowExecuteThread.java       | 508 ++++++++++++++-------
 .../master/runner/WorkflowExecuteThreadPool.java   |   4 +
 .../master/runner/task/BaseTaskProcessor.java      |   4 +-
 .../master/runner/task/ConditionTaskProcessor.java |   5 -
 .../master/runner/task/DependentTaskProcessor.java |   5 -
 .../server/master/runner/task/ITaskProcessor.java  |   3 +-
 .../master/runner/task/SubTaskProcessor.java       |   5 -
 .../master/runner/task/SwitchTaskProcessor.java    |   7 +-
 .../server/master/runner/task/TaskInstanceKey.java |  83 ++++
 .../server/master/WorkflowExecuteThreadTest.java   |  10 +-
 .../service/process/ProcessService.java            |  30 +-
 15 files changed, 605 insertions(+), 288 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
index f24b3c1..7cc806a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
@@ -35,6 +35,8 @@ public class StateEvent {
 
     private int taskInstanceId;
 
+    private long taskCode;
+
     private int processInstanceId;
 
     private String context;
@@ -53,6 +55,10 @@ public class StateEvent {
         return taskInstanceId;
     }
 
+    public long getTaskCode() {
+        return taskCode;
+    }
+
     public int getProcessInstanceId() {
         return processInstanceId;
     }
@@ -73,6 +79,10 @@ public class StateEvent {
         this.taskInstanceId = taskInstanceId;
     }
 
+    public void setTaskCode(long taskCode) {
+        this.taskCode = taskCode;
+    }
+
     public Channel getChannel() {
         return channel;
     }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
index fc795d4..c758bc7 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
@@ -26,6 +26,7 @@ public enum StateEventType {
     PROCESS_TIMEOUT(2, "process timeout"),
     TASK_TIMEOUT(3, "task timeout"),
     WAIT_TASK_GROUP(4, "wait task group"),
+    TASK_RETRY(5, "task retry")
     ;
 
     StateEventType(int code, String descp) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 976060c..b4e52c0 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
+import org.apache.commons.lang3.SerializationUtils;
+
 import java.io.Serializable;
 import java.util.Date;
 import java.util.Map;
@@ -599,7 +601,8 @@ public class TaskInstance implements Serializable {
     }
 
     /**
-     * determine if you can try again
+     * determine if a task instance can retry
+     * if subProcess,
      *
      * @return can try result
      */
@@ -609,10 +612,8 @@ public class TaskInstance implements Serializable {
         }
         if (this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
             return true;
-        } else {
-            return (this.getState().typeIsFailure()
-                    && this.getRetryTimes() < this.getMaxRetryTimes());
         }
+        return this.getState() == ExecutionStatus.FAILURE && (this.getRetryTimes() < this.getMaxRetryTimes());
     }
 
     /**
@@ -624,9 +625,7 @@ public class TaskInstance implements Serializable {
         if (getState() != ExecutionStatus.FAILURE) {
             return true;
         }
-        if (getId() == 0
-                || getMaxRetryTimes() == 0
-                || getRetryInterval() == 0) {
+        if (getMaxRetryTimes() == 0 || getRetryInterval() == 0) {
             return true;
         }
         Date now = new Date();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 8c3712f..3abfdd2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
@@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 
+import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
 import org.apache.hadoop.util.ThreadUtil;
 
 import java.util.Map.Entry;
@@ -42,8 +44,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * 1. timeout check wheel
- * 2. dependent task check wheel
+ * Check thread
+ * 1. timeout task check
+ * 2. dependent task state check
+ * 3. retry task check
+ * 4. timeout process check
  */
 @Component
 public class StateWheelExecuteThread extends Thread {
@@ -56,14 +61,19 @@ public class StateWheelExecuteThread extends Thread {
     private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
 
     /**
-     * task time out check list, key is taskInstanceId, value is processInstanceId
+     * task time out check list
      */
-    private ConcurrentHashMap<Integer, Integer> taskInstanceTimeoutCheckList = new ConcurrentHashMap<>();
+    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
 
     /**
-     * task retry check list, key is taskInstanceId, value is processInstanceId
+     * task retry check list
      */
-    private ConcurrentHashMap<Integer, Integer> taskInstanceRetryCheckList = new ConcurrentHashMap<>();
+    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
+
+    /**
+     * task state check list
+     */
+    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
 
     @Autowired
     private MasterConfig masterConfig;
@@ -80,6 +90,7 @@ public class StateWheelExecuteThread extends Thread {
             try {
                 checkTask4Timeout();
                 checkTask4Retry();
+                checkTask4State();
                 checkProcess4Timeout();
             } catch (Exception e) {
                 logger.error("state wheel thread check error:", e);
@@ -96,8 +107,39 @@ public class StateWheelExecuteThread extends Thread {
         processInstanceTimeoutCheckList.remove(processInstance.getId());
     }
 
-    public void addTask4TimeoutCheck(TaskInstance taskInstance) {
-        if (taskInstanceTimeoutCheckList.containsKey(taskInstance.getId())) {
+    private void checkProcess4Timeout() {
+        if (processInstanceTimeoutCheckList.isEmpty()) {
+            return;
+        }
+        for (Integer processInstanceId : processInstanceTimeoutCheckList) {
+            if (processInstanceId == null) {
+                continue;
+            }
+            WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+            if (workflowExecuteThread == null) {
+                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
+                processInstanceTimeoutCheckList.remove(processInstanceId);
+                continue;
+            }
+            ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
+            if (processInstance == null) {
+                continue;
+            }
+            long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
+            if (timeRemain < 0) {
+                addProcessTimeoutEvent(processInstance);
+                processInstanceTimeoutCheckList.remove(processInstance.getId());
+            }
+        }
+    }
+
+    public void addTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+        if (taskInstanceKey == null) {
+            logger.error("taskInstanceKey is null");
+            return;
+        }
+        if (taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
             return;
         }
         TaskDefinition taskDefinition = taskInstance.getTaskDefine();
@@ -106,19 +148,29 @@ public class StateWheelExecuteThread extends Thread {
             return;
         }
         if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
-            taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+            taskInstanceTimeoutCheckList.add(taskInstanceKey);
         }
         if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
-            taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+            taskInstanceTimeoutCheckList.add(taskInstanceKey);
         }
     }
 
-    public void removeTask4TimeoutCheck(TaskInstance taskInstance) {
-        taskInstanceTimeoutCheckList.remove(taskInstance.getId());
+    public void removeTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+        if (taskInstanceKey == null) {
+            logger.error("taskInstanceKey is null");
+            return;
+        }
+        taskInstanceTimeoutCheckList.remove(taskInstanceKey);
     }
 
-    public void addTask4RetryCheck(TaskInstance taskInstance) {
-        if (taskInstanceRetryCheckList.containsKey(taskInstance.getId())) {
+    public void addTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+        if (taskInstanceKey == null) {
+            logger.error("taskInstanceKey is null");
+            return;
+        }
+        if (taskInstanceRetryCheckList.contains(taskInstanceKey)) {
             return;
         }
         TaskDefinition taskDefinition = taskInstance.getTaskDefine();
@@ -126,43 +178,69 @@ public class StateWheelExecuteThread extends Thread {
             logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
             return;
         }
-        if (taskInstance.taskCanRetry()) {
-            taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+        logger.debug("addTask4RetryCheck, taskCode:{}, processInstanceId:{}", taskInstance.getTaskCode(), taskInstance.getProcessInstanceId());
+        taskInstanceRetryCheckList.add(taskInstanceKey);
+    }
+
+    public void removeTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+        if (taskInstanceKey == null) {
+            logger.error("taskInstanceKey is null");
+            return;
         }
+        taskInstanceRetryCheckList.remove(taskInstanceKey);
+    }
 
+    public void addTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+        if (taskInstanceKey == null) {
+            logger.error("taskInstanceKey is null");
+            return;
+        }
+        if (taskInstanceStateCheckList.contains(taskInstanceKey)) {
+            return;
+        }
         if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
-            taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+            taskInstanceStateCheckList.add(taskInstanceKey);
         }
     }
 
-    public void removeTask4RetryCheck(TaskInstance taskInstance) {
-        taskInstanceRetryCheckList.remove(taskInstance.getId());
+    public void removeTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+        TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
+        if (taskInstanceKey == null) {
+            logger.error("taskInstanceKey is null");
+            return;
+        }
+        taskInstanceStateCheckList.remove(taskInstanceKey);
     }
 
     private void checkTask4Timeout() {
         if (taskInstanceTimeoutCheckList.isEmpty()) {
             return;
         }
-        for (Entry<Integer, Integer> entry : taskInstanceTimeoutCheckList.entrySet()) {
-            int processInstanceId = entry.getValue();
-            int taskInstanceId = entry.getKey();
+        for (TaskInstanceKey taskInstanceKey : taskInstanceTimeoutCheckList) {
+            int processInstanceId = taskInstanceKey.getProcessInstanceId();
+            long taskCode = taskInstanceKey.getTaskCode();
 
             WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
             if (workflowExecuteThread == null) {
-                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}",
-                        processInstanceId, taskInstanceId);
-                taskInstanceTimeoutCheckList.remove(taskInstanceId);
+                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+                        processInstanceId, taskCode);
+                taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                 continue;
             }
-            TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+            TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
             if (taskInstance == null) {
+                logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+                        processInstanceId, taskCode);
+                taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                 continue;
             }
             if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
                 long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
                 if (timeRemain < 0) {
                     addTaskTimeoutEvent(taskInstance);
-                    taskInstanceTimeoutCheckList.remove(taskInstance.getId());
+                    taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                 }
             }
         }
@@ -172,54 +250,63 @@ public class StateWheelExecuteThread extends Thread {
         if (taskInstanceRetryCheckList.isEmpty()) {
             return;
         }
-        for (Entry<Integer, Integer> entry : taskInstanceRetryCheckList.entrySet()) {
-            int processInstanceId = entry.getValue();
-            int taskInstanceId = entry.getKey();
+        for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) {
+            int processInstanceId = taskInstanceKey.getProcessInstanceId();
+            long taskCode = taskInstanceKey.getTaskCode();
 
             WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
             if (workflowExecuteThread == null) {
-                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}",
-                        processInstanceId, taskInstanceId);
-                taskInstanceRetryCheckList.remove(taskInstanceId);
+                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+                        processInstanceId, taskCode);
+                taskInstanceRetryCheckList.remove(taskInstanceKey);
                 continue;
             }
-            TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+            TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
             if (taskInstance == null) {
+                logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+                        processInstanceId, taskCode);
+                taskInstanceRetryCheckList.remove(taskInstanceKey);
                 continue;
             }
 
-            if (!taskInstance.getState().typeIsFinished() && (taskInstance.isSubProcess() || taskInstance.isDependTask())) {
-                addTaskStateChangeEvent(taskInstance);
-            } else if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
-                addTaskStateChangeEvent(taskInstance);
-                taskInstanceRetryCheckList.remove(taskInstance.getId());
+            if (taskInstance.retryTaskIntervalOverTime()) {
+                // reset taskInstance endTime and state
+                // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
+                taskInstance.setEndTime(null);
+                taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+
+                addTaskRetryEvent(taskInstance);
+                taskInstanceRetryCheckList.remove(taskInstanceKey);
             }
         }
     }
 
-    private void checkProcess4Timeout() {
-        if (processInstanceTimeoutCheckList.isEmpty()) {
+    private void checkTask4State() {
+        if (taskInstanceStateCheckList.isEmpty()) {
             return;
         }
-        for (Integer processInstanceId : processInstanceTimeoutCheckList) {
-            if (processInstanceId == null) {
-                continue;
-            }
+        for (TaskInstanceKey taskInstanceKey : taskInstanceStateCheckList) {
+            int processInstanceId = taskInstanceKey.getProcessInstanceId();
+            long taskCode = taskInstanceKey.getTaskCode();
+
             WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
             if (workflowExecuteThread == null) {
-                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
-                processInstanceTimeoutCheckList.remove(processInstanceId);
+                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+                        processInstanceId, taskCode);
+                taskInstanceStateCheckList.remove(taskInstanceKey);
                 continue;
             }
-            ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
-            if (processInstance == null) {
+            TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
+            if (taskInstance == null) {
+                logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
+                        processInstanceId, taskCode);
+                taskInstanceStateCheckList.remove(taskInstanceKey);
                 continue;
             }
-            long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
-            if (timeRemain < 0) {
-                addProcessTimeoutEvent(processInstance);
-                processInstanceTimeoutCheckList.remove(processInstance.getId());
+            if (taskInstance.getState().typeIsFinished()) {
+                continue;
             }
+            addTaskStateChangeEvent(taskInstance);
         }
     }
 
@@ -228,6 +315,17 @@ public class StateWheelExecuteThread extends Thread {
         stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
         stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
         stateEvent.setTaskInstanceId(taskInstance.getId());
+        stateEvent.setTaskCode(taskInstance.getTaskCode());
+        stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
+    }
+
+    private void addTaskRetryEvent(TaskInstance taskInstance) {
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setType(StateEventType.TASK_RETRY);
+        stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
+        stateEvent.setTaskInstanceId(taskInstance.getId());
+        stateEvent.setTaskCode(taskInstance.getTaskCode());
         stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
         workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
@@ -237,6 +335,7 @@ public class StateWheelExecuteThread extends Thread {
         stateEvent.setType(StateEventType.TASK_TIMEOUT);
         stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
         stateEvent.setTaskInstanceId(taskInstance.getId());
+        stateEvent.setTaskCode(taskInstance.getTaskCode());
         workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 73e655f..2bc16a3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -159,34 +159,38 @@ public class WorkflowExecuteThread {
     private Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
 
     /**
-     * running TaskNode, taskId as key
+     * running taskProcessor, taskCode as key, taskProcessor as value
+     * only on taskProcessor per taskCode
      */
-    private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
+    private final Map<Long, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
 
     /**
      * valid task map, taskCode as key, taskId as value
+     * in a DAG, only one taskInstance per taskCode is valid
      */
-    private Map<String, Integer> validTaskMap = new ConcurrentHashMap<>();
+    private Map<Long, Integer> validTaskMap = new ConcurrentHashMap<>();
 
     /**
-     * error task map, taskCode as key, taskId as value
+     * error task map, taskCode as key, taskInstanceId as value
+     * in a DAG, only one taskInstance per taskCode is valid
      */
-    private Map<String, Integer> errorTaskMap = new ConcurrentHashMap<>();
+    private Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
 
     /**
-     * complete task map, taskCode as key, taskId as value
+     * complete task map, taskCode as key, taskInstanceId as value
+     * in a DAG, only one taskInstance per taskCode is valid
      */
-    private Map<String, Integer> completeTaskMap = new ConcurrentHashMap<>();
+    private Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
 
     /**
      * depend failed task map, taskCode as key, taskId as value
      */
-    private Map<String, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
+    private Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
 
     /**
      * forbidden task map, code as key
      */
-    private Map<String, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
+    private Map<Long, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
 
     /**
      * skip task map, code as key
@@ -209,6 +213,12 @@ public class WorkflowExecuteThread {
     private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
 
     /**
+     * wait to retry taskInstance map, taskCode as key, taskInstance as value
+     * before retry, the taskInstance id is 0
+     */
+    private Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<>();
+
+    /**
      * state wheel execute thread
      */
     private StateWheelExecuteThread stateWheelExecuteThread;
@@ -317,6 +327,9 @@ public class WorkflowExecuteThread {
             case WAIT_TASK_GROUP:
                 result = checkForceStartAndWakeUp(stateEvent);
                 break;
+            case TASK_RETRY:
+                result = taskRetryEventHandler(stateEvent);
+                break;
             default:
                 break;
         }
@@ -330,8 +343,8 @@ public class WorkflowExecuteThread {
     private boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
         TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
         if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
-            ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
             TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+            ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
             ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
             taskProcessor.init(taskInstance, processInstance);
             taskProcessor.action(TaskAction.DISPATCH);
@@ -341,8 +354,8 @@ public class WorkflowExecuteThread {
         if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
             boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue);
             if (acquireTaskGroup) {
-                ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
                 TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+                ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
                 ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
                 taskProcessor.init(taskInstance, processInstance);
                 taskProcessor.action(TaskAction.DISPATCH);
@@ -363,7 +376,7 @@ public class WorkflowExecuteThread {
         }
         TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
         if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy) {
-            ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
+            ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
             taskProcessor.action(TaskAction.TIMEOUT);
         } else {
             processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, taskInstance.getTaskDefine());
@@ -387,79 +400,57 @@ public class WorkflowExecuteThread {
             return true;
         }
 
-        if (task.getState().typeIsFinished() && !completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) {
+        if (task.getState().typeIsFinished()) {
+            if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
+                return true;
+            }
             taskFinished(task);
             if (task.getTaskGroupId() > 0) {
-                //release task group
-                TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(task);
-                if (nextTaskInstance != null) {
-                    if (nextTaskInstance.getProcessInstanceId() == task.getProcessInstanceId()) {
-                        StateEvent nextEvent = new StateEvent();
-                        nextEvent.setProcessInstanceId(this.processInstance.getId());
-                        nextEvent.setTaskInstanceId(nextTaskInstance.getId());
-                        nextEvent.setType(StateEventType.WAIT_TASK_GROUP);
-                        this.stateEvents.add(nextEvent);
-                    } else {
-                        ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
-                        this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
-                            org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
-                    }
-                }
+                releaseTaskGroup(task);
             }
-        } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
-            ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
+            return true;
+        }
+        if (activeTaskProcessorMaps.containsKey(task.getTaskCode())) {
+            ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(task.getTaskCode());
             iTaskProcessor.action(TaskAction.RUN);
 
-            if (iTaskProcessor.taskState().typeIsFinished()) {
-                task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+            if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
                 taskFinished(task);
             }
-        } else {
-            logger.error("state handler error: {}", stateEvent);
+            return true;
         }
+        logger.error("state handler error: {}", stateEvent);
+
         return true;
     }
 
-    private void taskFinished(TaskInstance task) {
-        logger.info("work flow {} task {} state:{} ",
+    private void taskFinished(TaskInstance taskInstance) {
+        logger.info("work flow {} task id:{} code:{} state:{} ",
             processInstance.getId(),
-            task.getId(),
-            task.getState());
-        if (task.taskCanRetry()) {
-            addTaskToStandByList(task);
-            if (!task.retryTaskIntervalOverTime()) {
-                logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{} / {}, interval:{}",
-                    processInstance.getId(),
-                    task.getId(),
-                    task.getState(),
-                    task.getRetryTimes(),
-                    task.getMaxRetryTimes(),
-                    task.getRetryInterval());
-                stateWheelExecuteThread.addTask4TimeoutCheck(task);
-                stateWheelExecuteThread.addTask4RetryCheck(task);
-            } else {
-                submitStandByTask();
-                stateWheelExecuteThread.removeTask4TimeoutCheck(task);
-                stateWheelExecuteThread.removeTask4RetryCheck(task);
-            }
-            return;
-        }
+                taskInstance.getId(),
+                taskInstance.getTaskCode(),
+                taskInstance.getState());
 
-        completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
-        activeTaskProcessorMaps.remove(task.getId());
-        stateWheelExecuteThread.removeTask4TimeoutCheck(task);
-        stateWheelExecuteThread.removeTask4RetryCheck(task);
+        activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
+        stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
+        stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
+        stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);
 
-        if (task.getState().typeIsSuccess()) {
-            processInstance.setVarPool(task.getVarPool());
+        if (taskInstance.getState().typeIsSuccess()) {
+            completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
+            processInstance.setVarPool(taskInstance.getVarPool());
             processService.saveProcessInstance(processInstance);
-            submitPostNode(Long.toString(task.getTaskCode()));
-        } else if (task.getState().typeIsFailure()) {
-            if (task.isConditionsTask()
-                || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
-                submitPostNode(Long.toString(task.getTaskCode()));
+            submitPostNode(Long.toString(taskInstance.getTaskCode()));
+        } else if (taskInstance.taskCanRetry()) {
+            // retry task
+            retryTaskInstance(taskInstance);
+        } else if (taskInstance.getState().typeIsFailure()) {
+            completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
+            if (taskInstance.isConditionsTask()
+                || DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
+                submitPostNode(Long.toString(taskInstance.getTaskCode()));
             } else {
-                errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+                errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
                 if (processInstance.getFailureStrategy() == FailureStrategy.END) {
                     killAllTasks();
                 }
@@ -469,6 +460,73 @@ public class WorkflowExecuteThread {
     }
 
     /**
+     * release task group
+     * @param taskInstance
+     */
+    private void releaseTaskGroup(TaskInstance taskInstance) {
+        if (taskInstance.getTaskGroupId() > 0) {
+            TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
+            if (nextTaskInstance != null) {
+                if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) {
+                    StateEvent nextEvent = new StateEvent();
+                    nextEvent.setProcessInstanceId(this.processInstance.getId());
+                    nextEvent.setTaskInstanceId(nextTaskInstance.getId());
+                    nextEvent.setType(StateEventType.WAIT_TASK_GROUP);
+                    this.stateEvents.add(nextEvent);
+                } else {
+                    ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
+                    this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
+                            org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
+                }
+            }
+        }
+    }
+
+    /**
+     * crate new task instance to retry, different objects from the original
+     * @param taskInstance
+     */
+    private void retryTaskInstance(TaskInstance taskInstance) {
+        if (!taskInstance.taskCanRetry()) {
+            return;
+        }
+        TaskInstance newTaskInstance =  cloneRetryTaskInstance(taskInstance);
+        if (newTaskInstance == null) {
+            logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
+            return;
+        }
+        waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
+        if (!taskInstance.retryTaskIntervalOverTime()) {
+            logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
+                    processInstance.getId(),
+                    newTaskInstance.getTaskCode(),
+                    newTaskInstance.getState(),
+                    newTaskInstance.getRetryTimes(),
+                    newTaskInstance.getMaxRetryTimes(),
+                    newTaskInstance.getRetryInterval());
+            stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
+            stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
+        } else {
+            addTaskToStandByList(newTaskInstance);
+            submitStandByTask();
+            waitToRetryTaskInstanceMap.remove(newTaskInstance.getTaskCode());
+        }
+    }
+
+    /**
+     * handle task retry event
+     * @param stateEvent
+     * @return
+     */
+    private boolean taskRetryEventHandler(StateEvent stateEvent) {
+        TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode());
+        addTaskToStandByList(taskInstance);
+        submitStandByTask();
+        waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode());
+        return true;
+    }
+
+    /**
      * update process instance
      */
     public void refreshProcessInstance(int processInstanceId) {
@@ -492,9 +550,9 @@ public class WorkflowExecuteThread {
         processService.packageTaskInstance(taskInstance, processInstance);
         taskInstanceMap.put(taskInstance.getId(), taskInstance);
 
-        validTaskMap.remove(Long.toString(taskInstance.getTaskCode()));
+        validTaskMap.remove(taskInstance.getTaskCode());
         if (Flag.YES == taskInstance.getFlag()) {
-            validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
+            validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
         }
     }
 
@@ -561,6 +619,20 @@ public class WorkflowExecuteThread {
         return null;
     }
 
+    public TaskInstance getActiveTaskInstanceByTaskCode(long taskCode) {
+        if (activeTaskProcessorMaps.containsKey(taskCode)) {
+            return activeTaskProcessorMaps.get(taskCode).taskInstance();
+        }
+        return null;
+    }
+
+    public TaskInstance getRetryTaskInstanceByTaskCode(long taskCode) {
+        if (waitToRetryTaskInstanceMap.containsKey(taskCode)) {
+            return waitToRetryTaskInstanceMap.get(taskCode);
+        }
+        return null;
+    }
+
     private boolean processStateChangeHandler(StateEvent stateEvent) {
         try {
             logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
@@ -736,7 +808,7 @@ public class WorkflowExecuteThread {
 
         taskNodeList.forEach(taskNode -> {
             if (taskNode.isForbidden()) {
-                forbiddenTaskMap.put(Long.toString(taskNode.getCode()), taskNode);
+                forbiddenTaskMap.put(taskNode.getCode(), taskNode);
             }
         });
 
@@ -767,17 +839,39 @@ public class WorkflowExecuteThread {
         if (!isNewProcessInstance()) {
             List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
             for (TaskInstance task : validTaskInstanceList) {
-                validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+                if (validTaskMap.containsKey(task.getTaskCode())) {
+                    int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
+                    TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
+                    if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) {
+                        task.setFlag(Flag.NO);
+                        processService.updateTaskInstance(task);
+                        continue;
+                    }
+                    logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", task.getTaskCode());
+                }
+
+                validTaskMap.put(task.getTaskCode(), task.getId());
                 taskInstanceMap.put(task.getId(), task);
 
                 if (task.isTaskComplete()) {
-                    completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+                    completeTaskMap.put(task.getTaskCode(), task.getId());
+                    continue;
                 }
                 if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
                     continue;
                 }
-                if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
-                    errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+                if (task.taskCanRetry()) {
+                    if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
+                        // tolerantTaskInstance add to standby list directly
+                        TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
+                        addTaskToStandByList(tolerantTaskInstance);
+                    } else {
+                        retryTaskInstance(task);
+                    }
+                    continue;
+                }
+                if (task.getState().typeIsFailure()) {
+                    errorTaskMap.put(task.getTaskCode(), task.getId());
                 }
             }
         }
@@ -832,19 +926,32 @@ public class WorkflowExecuteThread {
                     taskInstance.getId(), taskInstance.getName());
                 return null;
             }
-            validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
+
+            // in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid
+            if (validTaskMap.containsKey(taskInstance.getTaskCode())) {
+                int oldTaskInstanceId = validTaskMap.get(taskInstance.getTaskCode());
+                if (taskInstance.getId() != oldTaskInstanceId) {
+                    TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
+                    oldTaskInstance.setFlag(Flag.NO);
+                    processService.updateTaskInstance(oldTaskInstance);
+                    validTaskMap.remove(taskInstance.getTaskCode());
+                    activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
+                }
+            }
+
+            validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
             taskInstanceMap.put(taskInstance.getId(), taskInstance);
-            activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
+            activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
             taskProcessor.action(TaskAction.RUN);
 
-            stateWheelExecuteThread.addTask4TimeoutCheck(taskInstance);
-            stateWheelExecuteThread.addTask4RetryCheck(taskInstance);
+            stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
+            stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance);
 
-            if (taskProcessor.taskState().typeIsFinished()) {
+            if (taskProcessor.taskInstance().getState().typeIsFinished()) {
                 StateEvent stateEvent = new StateEvent();
                 stateEvent.setProcessInstanceId(this.processInstance.getId());
                 stateEvent.setTaskInstanceId(taskInstance.getId());
-                stateEvent.setExecutionStatus(taskProcessor.taskState());
+                stateEvent.setExecutionStatus(taskProcessor.taskInstance().getState());
                 stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
                 this.stateEvents.add(stateEvent);
             }
@@ -898,78 +1005,129 @@ public class WorkflowExecuteThread {
      */
     private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
         TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(), taskNode.getVersion());
-        if (taskInstance == null) {
-            taskInstance = new TaskInstance();
-            taskInstance.setTaskCode(taskNode.getCode());
-            taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
-            // task name
-            taskInstance.setName(taskNode.getName());
-            // task instance state
-            taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
-            // process instance id
-            taskInstance.setProcessInstanceId(processInstance.getId());
-            // task instance type
-            taskInstance.setTaskType(taskNode.getType().toUpperCase());
-            // task instance whether alert
-            taskInstance.setAlertFlag(Flag.NO);
-
-            // task instance start time
-            taskInstance.setStartTime(null);
-
-            // task instance flag
-            taskInstance.setFlag(Flag.YES);
-
-            // task dry run flag
-            taskInstance.setDryRun(processInstance.getDryRun());
-
-            // task instance retry times
-            taskInstance.setRetryTimes(0);
-
-            // max task instance retry times
-            taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes());
-
-            // retry task instance interval
-            taskInstance.setRetryInterval(taskNode.getRetryInterval());
-
-            //set task param
-            taskInstance.setTaskParams(taskNode.getTaskParams());
-
-            //set task group and priority
-            taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
-            taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
-
-            // task instance priority
-            if (taskNode.getTaskInstancePriority() == null) {
-                taskInstance.setTaskInstancePriority(Priority.MEDIUM);
-            } else {
-                taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
-            }
+        if (taskInstance != null) {
+            return taskInstance;
+        }
+
+        return newTaskInstance(processInstance, taskNode);
+    }
+
+    /**
+     * clone a new taskInstance for retry and reset some logic fields
+     * @return
+     */
+    public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
+        TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
+        if (taskNode == null) {
+            logger.error("taskNode is null, code:{}", taskInstance.getTaskCode());
+            return null;
+        }
+        TaskInstance newTaskInstance =  newTaskInstance(processInstance, taskNode);
+        newTaskInstance.setTaskDefine(taskInstance.getTaskDefine());
+        newTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
+        newTaskInstance.setProcessInstance(processInstance);
+        newTaskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
+        // todo relative funtion: TaskInstance.retryTaskIntervalOverTime
+        newTaskInstance.setState(taskInstance.getState());
+        newTaskInstance.setEndTime(taskInstance.getEndTime());
+        return newTaskInstance;
+    }
 
-            String processWorkerGroup = processInstance.getWorkerGroup();
-            processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup;
-            String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
+    /**
+     * clone a new taskInstance for tolerant and reset some logic fields
+     * @return
+     */
+    public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
+        TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
+        if (taskNode == null) {
+            logger.error("taskNode is null, code:{}", taskInstance.getTaskCode());
+            return null;
+        }
+        TaskInstance newTaskInstance =  newTaskInstance(processInstance, taskNode);
+        newTaskInstance.setTaskDefine(taskInstance.getTaskDefine());
+        newTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
+        newTaskInstance.setProcessInstance(processInstance);
+        newTaskInstance.setRetryTimes(taskInstance.getRetryTimes());
+        newTaskInstance.setState(taskInstance.getState());
+        return newTaskInstance;
+    }
 
-            Long processEnvironmentCode = Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : processInstance.getEnvironmentCode();
-            Long taskEnvironmentCode = Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode();
+    /**
+     * new a taskInstance
+     * @param processInstance
+     * @param taskNode
+     * @return
+     */
+    public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setTaskCode(taskNode.getCode());
+        taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
+        // task name
+        taskInstance.setName(taskNode.getName());
+        // task instance state
+        taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+        // process instance id
+        taskInstance.setProcessInstanceId(processInstance.getId());
+        // task instance type
+        taskInstance.setTaskType(taskNode.getType().toUpperCase());
+        // task instance whether alert
+        taskInstance.setAlertFlag(Flag.NO);
+
+        // task instance start time
+        taskInstance.setStartTime(null);
+
+        // task instance flag
+        taskInstance.setFlag(Flag.YES);
+
+        // task dry run flag
+        taskInstance.setDryRun(processInstance.getDryRun());
+
+        // task instance retry times
+        taskInstance.setRetryTimes(0);
+
+        // max task instance retry times
+        taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes());
+
+        // retry task instance interval
+        taskInstance.setRetryInterval(taskNode.getRetryInterval());
+
+        //set task param
+        taskInstance.setTaskParams(taskNode.getTaskParams());
+
+        //set task group and priority
+        taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
+        taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
+
+        // task instance priority
+        if (taskNode.getTaskInstancePriority() == null) {
+            taskInstance.setTaskInstancePriority(Priority.MEDIUM);
+        } else {
+            taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
+        }
 
-            if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
-                taskInstance.setWorkerGroup(processWorkerGroup);
-                taskInstance.setEnvironmentCode(processEnvironmentCode);
-            } else {
-                taskInstance.setWorkerGroup(taskWorkerGroup);
-                taskInstance.setEnvironmentCode(taskEnvironmentCode);
-            }
+        String processWorkerGroup = processInstance.getWorkerGroup();
+        processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup;
+        String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
 
-            if (!taskInstance.getEnvironmentCode().equals(-1L)) {
-                Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
-                if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) {
-                    taskInstance.setEnvironmentConfig(environment.getConfig());
-                }
-            }
-            // delay execution time
-            taskInstance.setDelayTime(taskNode.getDelayTime());
+        Long processEnvironmentCode = Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : processInstance.getEnvironmentCode();
+        Long taskEnvironmentCode = Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode();
+
+        if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
+            taskInstance.setWorkerGroup(processWorkerGroup);
+            taskInstance.setEnvironmentCode(processEnvironmentCode);
+        } else {
+            taskInstance.setWorkerGroup(taskWorkerGroup);
+            taskInstance.setEnvironmentCode(taskEnvironmentCode);
         }
 
+        if (!taskInstance.getEnvironmentCode().equals(-1L)) {
+            Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
+            if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) {
+                taskInstance.setEnvironmentConfig(environment.getConfig());
+            }
+        }
+        // delay execution time
+        taskInstance.setDelayTime(taskNode.getDelayTime());
         return taskInstance;
     }
 
@@ -978,7 +1136,7 @@ public class WorkflowExecuteThread {
         Map<String, TaskInstance> allTaskInstance = new HashMap<>();
         if (CollectionUtils.isNotEmpty(preTask)) {
             for (String preTaskCode : preTask) {
-                Integer taskId = completeTaskMap.get(preTaskCode);
+                Integer taskId = completeTaskMap.get(Long.parseLong(preTaskCode));
                 if (taskId == null) {
                     continue;
                 }
@@ -1073,7 +1231,7 @@ public class WorkflowExecuteThread {
                 continue;
             }
 
-            if (completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) {
+            if (task.getId() > 0 && completeTaskMap.containsKey(task.getTaskCode())) {
                 logger.info("task {} has already run success", task.getName());
                 continue;
             }
@@ -1106,10 +1264,11 @@ public class WorkflowExecuteThread {
         for (String depsNode : indirectDepCodeList) {
             if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) {
                 // dependencies must be fully completed
-                if (!completeTaskMap.containsKey(depsNode)) {
+                Long despNodeTaskCode = Long.parseLong(depsNode);
+                if (!completeTaskMap.containsKey(despNodeTaskCode)) {
                     return DependResult.WAITING;
                 }
-                Integer depsTaskId = completeTaskMap.get(depsNode);
+                Integer depsTaskId = completeTaskMap.get(despNodeTaskCode);
                 ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
                 if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
                     return DependResult.NON_EXEC;
@@ -1138,7 +1297,7 @@ public class WorkflowExecuteThread {
         TaskNode taskNode = dag.getNode(taskCode);
         List<String> depCodeList = taskNode.getDepList();
         for (String depsNode : depCodeList) {
-            if (forbiddenTaskMap.containsKey(depsNode)) {
+            if (forbiddenTaskMap.containsKey(Long.parseLong(depsNode))) {
                 setIndirectDepList(depsNode, indirectDepCodeList);
             } else {
                 indirectDepCodeList.add(depsNode);
@@ -1157,7 +1316,8 @@ public class WorkflowExecuteThread {
                 return false;
             }
         } else {
-            Integer taskInstanceId = completeTaskMap.get(dependNodeName);
+            long taskCode = Long.parseLong(dependNodeName);
+            Integer taskInstanceId = completeTaskMap.get(taskCode);
             ExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
             if (depTaskState.typeIsFailure()) {
                 return false;
@@ -1228,7 +1388,9 @@ public class WorkflowExecuteThread {
                 return true;
             }
             if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
-                return readyToSubmitTaskQueue.size() == 0 && activeTaskProcessorMaps.size() == 0;
+                return readyToSubmitTaskQueue.size() == 0
+                        && activeTaskProcessorMaps.size() == 0
+                        && waitToRetryTaskInstanceMap.size() == 0;
             }
         }
         return false;
@@ -1310,7 +1472,7 @@ public class WorkflowExecuteThread {
         // success
         if (state == ExecutionStatus.RUNNING_EXECUTION) {
             List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
-            if (readyToSubmitTaskQueue.size() > 0) {
+            if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
                 //tasks currently pending submission, no retries, indicating that depend is waiting to complete
                 return ExecutionStatus.RUNNING_EXECUTION;
             } else if (CollectionUtils.isNotEmpty(killTasks)) {
@@ -1445,19 +1607,23 @@ public class WorkflowExecuteThread {
     private void killAllTasks() {
         logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
             activeTaskProcessorMaps.size());
-        for (int taskId : activeTaskProcessorMaps.keySet()) {
-            TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
+        for (long taskCode : activeTaskProcessorMaps.keySet()) {
+            ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode);
+            Integer taskInstanceId = validTaskMap.get(taskCode);
+            if (taskInstanceId == null || taskInstanceId.equals(0)) {
+                continue;
+            }
+            TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
             if (taskInstance == null || taskInstance.getState().typeIsFinished()) {
                 continue;
             }
-            ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskId);
             taskProcessor.action(TaskAction.STOP);
-            if (taskProcessor.taskState().typeIsFinished()) {
+            if (taskProcessor.taskInstance().getState().typeIsFinished()) {
                 StateEvent stateEvent = new StateEvent();
                 stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
                 stateEvent.setProcessInstanceId(this.processInstance.getId());
                 stateEvent.setTaskInstanceId(taskInstance.getId());
-                stateEvent.setExecutionStatus(taskProcessor.taskState());
+                stateEvent.setExecutionStatus(taskProcessor.taskInstance().getState());
                 this.addStateEvent(stateEvent);
             }
         }
@@ -1485,7 +1651,7 @@ public class WorkflowExecuteThread {
                         task.setState(retryTask.getState());
                         logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
                         removeTaskFromStandbyList(task);
-                        completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+                        completeTaskMap.put(task.getTaskCode(), task.getId());
                         taskInstanceMap.put(task.getId(), task);
                         submitPostNode(Long.toString(task.getTaskCode()));
                         continue;
@@ -1499,21 +1665,15 @@ public class WorkflowExecuteThread {
                 }
                 DependResult dependResult = getDependResultForTask(task);
                 if (DependResult.SUCCESS == dependResult) {
-                    if (task.retryTaskIntervalOverTime()) {
-                        int originalId = task.getId();
-                        TaskInstance taskInstance = submitTaskExec(task);
-                        if (taskInstance == null) {
-                            this.taskFailedSubmit = true;
-                        } else {
-                            removeTaskFromStandbyList(task);
-                            if (taskInstance.getId() != originalId) {
-                                activeTaskProcessorMaps.remove(originalId);
-                            }
-                        }
+                    TaskInstance taskInstance = submitTaskExec(task);
+                    if (taskInstance == null) {
+                        this.taskFailedSubmit = true;
+                    } else {
+                        removeTaskFromStandbyList(task);
                     }
                 } else if (DependResult.FAILED == dependResult) {
                     // if the dependency fails, the current node is not submitted and the state changes to failure.
-                    dependFailedTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+                    dependFailedTaskMap.put(task.getTaskCode(), task.getId());
                     removeTaskFromStandbyList(task);
                     logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
                 } else if (DependResult.NON_EXEC == dependResult) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 4587055..f9fb9c9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -62,6 +62,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     @Autowired
     private StateEventCallbackService stateEventCallbackService;
 
+    @Autowired
+    private StateWheelExecuteThread stateWheelExecuteThread;
+
     /**
      * multi-thread filter, avoid handling workflow at the same time
      */
@@ -119,6 +122,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
             @Override
             public void onSuccess(Object result) {
                 if (workflowExecuteThread.workFlowFinish()) {
+                    stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
                     processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
                     notifyProcessChanged(workflowExecuteThread.getProcessInstance());
                     logger.info("process instance {} finished.", processInstanceId);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 6e67d48..deb0166 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -204,8 +204,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         return null;
     }
 
-    public ExecutionStatus taskState() {
-        return this.taskInstance.getState();
+    public TaskInstance taskInstance() {
+        return this.taskInstance;
     }
 
     /**
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 9646285..bb639a3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -72,11 +72,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
-    public ExecutionStatus taskState() {
-        return this.taskInstance.getState();
-    }
-
-    @Override
     public boolean runTask() {
         if (conditionResult.equals(DependResult.WAITING)) {
             setConditionResult();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 0f89a44..a2bbae7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -90,11 +90,6 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
-    public ExecutionStatus taskState() {
-        return this.taskInstance.getState();
-    }
-
-    @Override
     public boolean runTask() {
         if (!allDependentItemFinished) {
             allDependentItemFinished = allDependentTaskFinish();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index d1f3c4c..d7d241b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 
@@ -32,6 +31,6 @@ public interface ITaskProcessor {
 
     String getType();
 
-    ExecutionStatus taskState();
+    TaskInstance taskInstance();
 
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 5a63556..19217af 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -65,11 +65,6 @@ public class SubTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
-    public ExecutionStatus taskState() {
-        return this.taskInstance.getState();
-    }
-
-    @Override
     public boolean runTask() {
         try {
             this.runLock.lock();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 912822b..b194eac 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -77,7 +77,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     @Override
     public boolean runTask() {
         try {
-            if (!this.taskState().typeIsFinished() && setSwitchResult()) {
+            if (!this.taskInstance().getState().typeIsFinished() && setSwitchResult()) {
                 endTaskState();
             }
         } catch (Exception e) {
@@ -120,11 +120,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
         return TaskType.SWITCH.getDesc();
     }
 
-    @Override
-    public ExecutionStatus taskState() {
-        return this.taskInstance.getState();
-    }
-
     private boolean setSwitchResult() {
         List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
                 taskInstance.getProcessInstanceId()
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
new file mode 100644
index 0000000..a5d7707
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.Objects;
+
+/**
+ * task instance key, processInstanceId
+ */
+public class TaskInstanceKey {
+    private int processInstanceId;
+    private long taskCode;
+    private int taskVersion;
+
+    public TaskInstanceKey(int processInstanceId, long taskCode, int taskVersion) {
+        this.processInstanceId = processInstanceId;
+        this.taskCode = taskCode;
+        this.taskVersion = taskVersion;
+    }
+
+    public int getProcessInstanceId() {
+        return processInstanceId;
+    }
+
+    public long getTaskCode() {
+        return taskCode;
+    }
+
+    public int getTaskVersion() {
+        return taskVersion;
+    }
+
+    public static TaskInstanceKey getTaskInstanceKey(ProcessInstance processInstance, TaskInstance taskInstance) {
+        if (processInstance == null || taskInstance == null) {
+            return null;
+        }
+        return new TaskInstanceKey(processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
+    }
+
+    @Override
+    public String toString() {
+        return "TaskKey{" +
+                "processInstanceId=" + processInstanceId +
+                ", taskCode=" + taskCode +
+                ", taskVersion=" + taskVersion +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TaskInstanceKey taskInstanceKey = (TaskInstanceKey) o;
+        return processInstanceId == taskInstanceKey.processInstanceId && taskCode == taskInstanceKey.taskCode && taskVersion == taskInstanceKey.taskVersion;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(processInstanceId, taskCode, taskVersion);
+    }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 3d18498..5907a81 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -156,7 +156,7 @@ public class WorkflowExecuteThreadTest {
             Map<String, String> cmdParam = new HashMap<>();
             cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
             Mockito.when(processService.findTaskInstanceByIdList(
-                Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
+                    Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
             ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
             Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
             Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
@@ -198,9 +198,9 @@ public class WorkflowExecuteThreadTest {
             taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
             taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
 
-            Map<String, Integer> completeTaskList = new ConcurrentHashMap<>();
-            completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance1.getId());
-            completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance2.getId());
+            Map<Long, Integer> completeTaskList = new ConcurrentHashMap<>();
+            completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId());
+            completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
 
             Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
 
@@ -216,7 +216,7 @@ public class WorkflowExecuteThreadTest {
             Assert.assertNotNull(taskInstance.getVarPool());
 
             taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
-            completeTaskList.put(Long.toString(taskInstance2.getTaskCode()), taskInstance2.getId());
+            completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
 
             completeTaskMapField.setAccessible(true);
             completeTaskMapField.set(workflowExecuteThread, completeTaskList);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 5cebcf7..6d49860 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -114,6 +114,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.SerializationUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1378,30 +1379,11 @@ public class ProcessService {
      */
     public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
         ExecutionStatus processInstanceState = processInstance.getState();
-
-        if (taskInstance.getState().typeIsFailure()) {
-            if (taskInstance.isSubProcess()) {
-                taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
-            } else {
-                if (processInstanceState != ExecutionStatus.READY_STOP
-                        && processInstanceState != ExecutionStatus.READY_PAUSE) {
-                    // failure task set invalid
-                    taskInstance.setFlag(Flag.NO);
-                    updateTaskInstance(taskInstance);
-                    // crate new task instance
-                    if (taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE) {
-                        taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
-                    }
-                    taskInstance.setSubmitTime(null);
-                    taskInstance.setLogPath(null);
-                    taskInstance.setExecutePath(null);
-                    taskInstance.setStartTime(null);
-                    taskInstance.setEndTime(null);
-                    taskInstance.setFlag(Flag.YES);
-                    taskInstance.setHost(null);
-                    taskInstance.setId(0);
-                }
-            }
+        if (processInstanceState.typeIsFinished()
+                || processInstanceState == ExecutionStatus.READY_PAUSE
+                || processInstanceState == ExecutionStatus.READY_STOP) {
+            logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState);
+            return null;
         }
         taskInstance.setExecutorId(processInstance.getExecutorId());
         taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());