You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/19 10:31:35 UTC

[dolphinscheduler] branch dev updated: [DS-6891][MasterServer] reduce db operation when process instance running (#6904)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 28872d8  [DS-6891][MasterServer] reduce db operation when process instance running (#6904)
28872d8 is described below

commit 28872d8706f7414868d6101600f9f8bbd3c9d15f
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Fri Nov 19 18:31:24 2021 +0800

    [DS-6891][MasterServer] reduce db operation when process instance running (#6904)
    
    * remove taskInstanceCacheManager
    
    * remove taskInstanceCacheManager
    
    * [DS-6891][MasterServer] reduce db operation when process instance running
    
    * [DS-6891][MasterServer] reduce db operation when process instance running
    
    * fix test
    
    * fix Transactional method call
    
    * checkstyle
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../master/cache/TaskInstanceCacheManager.java     |  64 ----
 .../cache/impl/TaskInstanceCacheManagerImpl.java   | 155 --------
 .../master/consumer/TaskPriorityQueueConsumer.java |  28 +-
 .../server/master/processor/TaskAckProcessor.java  |  10 -
 .../master/processor/TaskResponseProcessor.java    |  10 -
 .../processor/queue/StateEventResponseService.java |   9 +
 .../processor/queue/TaskResponseService.java       | 108 ++++--
 .../master/runner/WorkflowExecuteThread.java       | 412 ++++++++++++++-------
 .../master/runner/task/BaseTaskProcessor.java      |   8 +-
 .../master/runner/task/CommonTaskProcessor.java    |   2 +-
 .../master/runner/task/ConditionTaskProcessor.java |   2 +-
 .../master/runner/task/DependentTaskProcessor.java |   3 +-
 .../master/runner/task/SubTaskProcessor.java       |   2 +-
 .../master/runner/task/SwitchTaskProcessor.java    |   2 +-
 .../server/master/ConditionsTaskTest.java          |   2 +-
 .../server/master/DependentTaskTest.java           |   9 +-
 .../server/master/SubProcessTaskTest.java          |  10 +-
 .../server/master/SwitchTaskTest.java              |   2 +-
 .../server/master/WorkflowExecuteThreadTest.java   |  39 +-
 .../impl/TaskInstanceCacheManagerImplTest.java     | 177 ---------
 .../consumer/TaskPriorityQueueConsumerTest.java    |   9 -
 .../master/processor/TaskAckProcessorTest.java     |  10 +-
 .../runner/task/CommonTaskProcessorTest.java       |   1 -
 .../server/registry/DependencyConfig.java          |   6 -
 .../processor/TaskCallbackServiceTestConfig.java   |   6 -
 .../service/alert/ProcessAlertManager.java         |  49 ++-
 .../service/process/ProcessService.java            |  91 ++---
 .../service/queue/TaskPriority.java                |  22 +-
 28 files changed, 504 insertions(+), 744 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
deleted file mode 100644
index 1388c5b..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.cache;
-
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
-
-/**
- *  task instance state manager
- */
-public interface TaskInstanceCacheManager {
-
-    /**
-     * get taskInstance by taskInstance id
-     *
-     * @param taskInstanceId taskInstanceId
-     * @return taskInstance
-     */
-    TaskInstance getByTaskInstanceId(Integer taskInstanceId);
-
-    /**
-     * cache taskInstance
-     *
-     * @param taskExecutionContext taskExecutionContext
-     */
-    void cacheTaskInstance(TaskExecutionContext taskExecutionContext);
-
-    /**
-     * cache taskInstance
-     *
-     * @param taskAckCommand taskAckCommand
-     */
-    void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand);
-
-    /**
-     * cache taskInstance
-     *
-     * @param taskExecuteResponseCommand taskExecuteResponseCommand
-     */
-    void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand);
-
-    /**
-     * remove taskInstance by taskInstanceId
-     * @param taskInstanceId taskInstanceId
-     */
-    void removeByTaskInstanceId(Integer taskInstanceId);
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
deleted file mode 100644
index dd2d6eb..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.cache.impl;
-
-import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
-
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- *  taskInstance state manager
- */
-@Component
-public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
-
-    /**
-     * taskInstance cache
-     */
-    private Map<Integer,TaskInstance> taskInstanceCache = new ConcurrentHashMap<>();
-
-    /**
-     * process service
-     */
-    @Autowired
-    private ProcessService processService;
-
-    /**
-     * taskInstance cache refresh timer
-     */
-    private Timer refreshTaskInstanceTimer = null;
-
-    @PostConstruct
-    public void init() {
-        //issue#5539 add thread to fetch task state from database in a fixed rate
-        this.refreshTaskInstanceTimer = new Timer(true);
-        refreshTaskInstanceTimer.scheduleAtFixedRate(
-                new RefreshTaskInstanceTimerTask(), CACHE_REFRESH_TIME_MILLIS, CACHE_REFRESH_TIME_MILLIS
-        );
-    }
-
-    @PreDestroy
-    public void close() {
-        this.refreshTaskInstanceTimer.cancel();
-    }
-
-    /**
-     * get taskInstance by taskInstance id
-     *
-     * @param taskInstanceId taskInstanceId
-     * @return taskInstance
-     */
-    @Override
-    public TaskInstance getByTaskInstanceId(Integer taskInstanceId) {
-        return taskInstanceCache.computeIfAbsent(taskInstanceId, k -> processService.findTaskInstanceById(taskInstanceId));
-    }
-
-    /**
-     * cache taskInstance
-     *
-     * @param taskExecutionContext taskExecutionContext
-     */
-    @Override
-    public void cacheTaskInstance(TaskExecutionContext taskExecutionContext) {
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(taskExecutionContext.getTaskInstanceId());
-        taskInstance.setName(taskExecutionContext.getTaskName());
-        taskInstance.setStartTime(taskExecutionContext.getStartTime());
-        taskInstance.setTaskType(taskExecutionContext.getTaskType());
-        taskInstance.setExecutePath(taskExecutionContext.getExecutePath());
-        taskInstanceCache.put(taskExecutionContext.getTaskInstanceId(), taskInstance);
-    }
-
-    /**
-     * cache taskInstance
-     *
-     * @param taskAckCommand taskAckCommand
-     */
-    @Override
-    public void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand) {
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus()));
-        taskInstance.setStartTime(taskAckCommand.getStartTime());
-        taskInstance.setHost(taskAckCommand.getHost());
-        taskInstance.setExecutePath(taskAckCommand.getExecutePath());
-        taskInstance.setLogPath(taskAckCommand.getLogPath());
-        taskInstanceCache.put(taskAckCommand.getTaskInstanceId(), taskInstance);
-    }
-
-    /**
-     * cache taskInstance
-     *
-     * @param taskExecuteResponseCommand taskExecuteResponseCommand
-     */
-    @Override
-    public void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand) {
-        TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId());
-        taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus()));
-        taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime());
-        taskInstanceCache.put(taskExecuteResponseCommand.getTaskInstanceId(), taskInstance);
-    }
-
-    /**
-     * remove taskInstance by taskInstanceId
-     * @param taskInstanceId taskInstanceId
-     */
-    @Override
-    public void removeByTaskInstanceId(Integer taskInstanceId) {
-        taskInstanceCache.remove(taskInstanceId);
-    }
-
-    class RefreshTaskInstanceTimerTask extends TimerTask {
-        @Override
-        public void run() {
-            for (Entry<Integer, TaskInstance> taskInstanceEntry : taskInstanceCache.entrySet()) {
-                TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey());
-                if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
-                    taskInstanceCache.computeIfPresent(taskInstanceEntry.getKey(), (k, v) -> taskInstance);
-                }
-            }
-
-        }
-    }
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 827759b..290164b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -130,14 +130,16 @@ public class TaskPriorityQueueConsumer extends Thread {
             TaskExecutionContext context = taskPriority.getTaskExecutionContext();
             ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
 
-            if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
-                // when task finish, ignore this task, there is no need to dispatch anymore
-                return true;
-            } else {
-                result = dispatcher.dispatch(executionContext);
+            if (isTaskNeedToCheck(taskPriority)) {
+                if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
+                    // when task finish, ignore this task, there is no need to dispatch anymore
+                    return true;
+                }
             }
+
+            result = dispatcher.dispatch(executionContext);
         } catch (ExecuteException e) {
-            logger.error("dispatch error: {}", e.getMessage(),e);
+            logger.error("dispatch error: {}", e.getMessage(), e);
         }
         return result;
     }
@@ -153,4 +155,18 @@ public class TaskPriorityQueueConsumer extends Thread {
         TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
         return taskInstance.getState().typeIsFinished();
     }
+
+    /**
+     * check if task need to check state, if true, refresh the checkpoint
+     * @param taskPriority
+     * @return
+     */
+    private boolean isTaskNeedToCheck(TaskPriority taskPriority) {
+        long now = System.currentTimeMillis();
+        if (now - taskPriority.getCheckpoint() > Constants.SECOND_TIME_MILLIS) {
+            taskPriority.setCheckpoint(now);
+            return true;
+        }
+        return false;
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 8761232..27b8991 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -24,8 +24,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
-import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -49,14 +47,8 @@ public class TaskAckProcessor implements NettyRequestProcessor {
      */
     private final TaskResponseService taskResponseService;
 
-    /**
-     * taskInstance cache manager
-     */
-    private final TaskInstanceCacheManager taskInstanceCacheManager;
-
     public TaskAckProcessor() {
         this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
-        this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
     }
 
     /**
@@ -71,8 +63,6 @@ public class TaskAckProcessor implements NettyRequestProcessor {
         TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class);
         logger.info("taskAckCommand : {}", taskAckCommand);
 
-        taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
-
         String workerAddress = ChannelUtils.toAddress(channel).getAddress();
 
         ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 405e6be..aa324eb 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -23,8 +23,6 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -48,14 +46,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
      */
     private final TaskResponseService taskResponseService;
 
-    /**
-     * taskInstance cache manager
-     */
-    private final TaskInstanceCacheManager taskInstanceCacheManager;
-
     public TaskResponseProcessor() {
         this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
-        this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
     }
 
     /**
@@ -72,8 +64,6 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
         TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
         logger.info("received command : {}", responseCommand);
 
-        taskInstanceCacheManager.cacheTaskInstance(responseCommand);
-
         // TaskResponseEvent
         TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
                 responseCommand.getEndTime(),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index 72e2355..9a7fc59 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -133,6 +133,15 @@ public class StateEventResponseService {
             }
 
             WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+            switch (stateEvent.getType()) {
+                case TASK_STATE_CHANGE:
+                    workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId());
+                    break;
+                case PROCESS_STATE_CHANGE:
+                    workflowExecuteThread.refreshProcessInstance(stateEvent.getProcessInstanceId());
+                    break;
+                default:
+            }
             workflowExecuteThread.addStateEvent(stateEvent);
             writeResponse(stateEvent, ExecutionStatus.SUCCESS);
         } catch (Exception e) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 9af1ae2..7a0af9d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -142,54 +142,28 @@ public class TaskResponseService {
      */
     private void persist(TaskResponseEvent taskResponseEvent) {
         Event event = taskResponseEvent.getEvent();
-        Channel channel = taskResponseEvent.getChannel();
+        int taskInstanceId = taskResponseEvent.getTaskInstanceId();
+        int processInstanceId = taskResponseEvent.getProcessInstanceId();
+
+        TaskInstance taskInstance;
+        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
+            taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+        } else {
+            taskInstance = processService.findTaskInstanceById(taskInstanceId);
+        }
 
-        TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
         switch (event) {
             case ACK:
-                try {
-                    if (taskInstance != null) {
-                        ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
-                        processService.changeTaskState(taskInstance, status,
-                                taskResponseEvent.getStartTime(),
-                                taskResponseEvent.getWorkerAddress(),
-                                taskResponseEvent.getExecutePath(),
-                                taskResponseEvent.getLogPath(),
-                                taskResponseEvent.getTaskInstanceId());
-                    }
-                    // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
-                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
-                    channel.writeAndFlush(taskAckCommand.convert2Command());
-                } catch (Exception e) {
-                    logger.error("worker ack master error", e);
-                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
-                    channel.writeAndFlush(taskAckCommand.convert2Command());
-                }
+                handleAckEvent(taskResponseEvent, taskInstance);
                 break;
             case RESULT:
-                try {
-                    if (taskInstance != null) {
-                        processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
-                                taskResponseEvent.getEndTime(),
-                                taskResponseEvent.getProcessId(),
-                                taskResponseEvent.getAppIds(),
-                                taskResponseEvent.getTaskInstanceId(),
-                                taskResponseEvent.getVarPool()
-                        );
-                    }
-                    // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
-                    DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
-                    channel.writeAndFlush(taskResponseCommand.convert2Command());
-                } catch (Exception e) {
-                    logger.error("worker response master error", e);
-                    DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
-                    channel.writeAndFlush(taskResponseCommand.convert2Command());
-                }
+                handleResultEvent(taskResponseEvent, taskInstance);
                 break;
             default:
                 throw new IllegalArgumentException("invalid event type : " + event);
         }
-        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(taskResponseEvent.getProcessInstanceId());
+
         if (workflowExecuteThread != null) {
             StateEvent stateEvent = new StateEvent();
             stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
@@ -200,7 +174,59 @@ public class TaskResponseService {
         }
     }
 
-    public BlockingQueue<TaskResponseEvent> getEventQueue() {
-        return eventQueue;
+    /**
+     * handle ack event
+     * @param taskResponseEvent
+     * @param taskInstance
+     */
+    private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
+        Channel channel = taskResponseEvent.getChannel();
+        try {
+            if (taskInstance != null) {
+                if (taskInstance.getState().typeIsFinished()) {
+                    logger.warn("task is finish, ack is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
+                } else {
+                    processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
+                            taskResponseEvent.getStartTime(),
+                            taskResponseEvent.getWorkerAddress(),
+                            taskResponseEvent.getExecutePath(),
+                            taskResponseEvent.getLogPath()
+                    );
+                }
+            }
+            // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
+            DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
+            channel.writeAndFlush(taskAckCommand.convert2Command());
+        } catch (Exception e) {
+            logger.error("worker ack master error", e);
+            DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
+            channel.writeAndFlush(taskAckCommand.convert2Command());
+        }
+    }
+
+    /**
+     * handle result event
+     * @param taskResponseEvent
+     * @param taskInstance
+     */
+    private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
+        Channel channel = taskResponseEvent.getChannel();
+        try {
+            if (taskInstance != null) {
+                processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
+                        taskResponseEvent.getEndTime(),
+                        taskResponseEvent.getProcessId(),
+                        taskResponseEvent.getAppIds(),
+                        taskResponseEvent.getVarPool()
+                );
+            }
+            // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
+            DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
+            channel.writeAndFlush(taskResponseCommand.convert2Command());
+        } catch (Exception e) {
+            logger.error("worker response master error", e);
+            DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
+            channel.writeAndFlush(taskResponseCommand.convert2Command());
+        }
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index ad12abe..44eac21 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -86,9 +86,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Table;
 
 /**
  * master exec thread,split dag
@@ -99,100 +97,116 @@ public class WorkflowExecuteThread implements Runnable {
      * logger of WorkflowExecuteThread
      */
     private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThread.class);
+
     /**
-     * runing TaskNode
+     * master config
      */
-    private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
+    private MasterConfig masterConfig;
 
     /**
-     * process instance
+     * process service
      */
-    private ProcessInstance processInstance;
+    private ProcessService processService;
+
     /**
-     * submit failure nodes
+     * alert manager
      */
-    private boolean taskFailedSubmit = false;
+    private ProcessAlertManager processAlertManager;
 
     /**
-     * recover node id list
+     * netty executor manager
      */
-    private List<TaskInstance> recoverNodeIdList = new ArrayList<>();
+    private NettyExecutorManager nettyExecutorManager;
 
     /**
-     * error task list
+     * process instance
      */
-    private Map<String, TaskInstance> errorTaskList = new ConcurrentHashMap<>();
+    private ProcessInstance processInstance;
 
     /**
-     * complete task list
+     * process definition
      */
-    private Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
+    private ProcessDefinition processDefinition;
 
     /**
-     * ready to submit task queue
+     * the object of DAG
      */
-    private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
+    private DAG<String, TaskNode, TaskNodeRelation> dag;
 
     /**
-     * depend failed task map
+     * key of workflow
      */
-    private Map<String, TaskInstance> dependFailedTask = new ConcurrentHashMap<>();
+    private String key;
 
     /**
-     * forbidden task map
+     * start flag, true: start nodes submit completely
      */
-    private Map<String, TaskNode> forbiddenTaskList = new ConcurrentHashMap<>();
+    private boolean isStart = false;
 
     /**
-     * skip task map
+     * submit failure nodes
      */
-    private Map<String, TaskNode> skipTaskNodeList = new ConcurrentHashMap<>();
+    private boolean taskFailedSubmit = false;
 
     /**
-     * recover tolerance fault task list
+     * task instance hash map, taskId as key
      */
-    private List<TaskInstance> recoverToleranceFaultTaskList = new ArrayList<>();
+    private Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
 
     /**
-     * alert manager
+     * running TaskNode, taskId as key
      */
-    private ProcessAlertManager processAlertManager;
+    private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
 
     /**
-     * the object of DAG
+     * valid task map, taskCode as key, taskId as value
      */
-    private DAG<String, TaskNode, TaskNodeRelation> dag;
+    private Map<String, Integer> validTaskMap = new ConcurrentHashMap<>();
 
     /**
-     * process service
+     * error task map, taskCode as key, taskId as value
      */
-    private ProcessService processService;
+    private Map<String, Integer> errorTaskMap = new ConcurrentHashMap<>();
 
     /**
-     * master config
+     * complete task map, taskCode as key, taskId as value
      */
-    private MasterConfig masterConfig;
+    private Map<String, Integer> completeTaskMap = new ConcurrentHashMap<>();
 
     /**
-     *
+     * depend failed task map, taskCode as key, taskId as value
      */
-    private NettyExecutorManager nettyExecutorManager;
+    private Map<String, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
 
-    private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
+    /**
+     * forbidden task map, code as key
+     */
+    private Map<String, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
 
-    private List<Date> complementListDate = Lists.newLinkedList();
+    /**
+     * skip task map, code as key
+     */
+    private Map<String, TaskNode> skipTaskNodeMap = new ConcurrentHashMap<>();
 
-    private Table<Integer, Long, TaskInstance> taskInstanceHashMap = HashBasedTable.create();
-    private ProcessDefinition processDefinition;
-    private String key;
+    /**
+     * complement date list
+     */
+    private List<Date> complementListDate = Lists.newLinkedList();
 
+    /**
+     * task timeout check list
+     */
     private ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList;
 
     /**
-     * start flag, true: start nodes submit completely
-     *
+     * state event queue
      */
-    private boolean isStart = false;
+    private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
+
+    /**
+     * ready to submit task queue
+     */
+    private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
 
     /**
      * constructor of WorkflowExecuteThread
@@ -200,7 +214,6 @@ public class WorkflowExecuteThread implements Runnable {
      * @param processInstance processInstance
      * @param processService processService
      * @param nettyExecutorManager nettyExecutorManager
-     * @param taskTimeoutCheckList
      */
     public WorkflowExecuteThread(ProcessInstance processInstance
             , ProcessService processService
@@ -232,7 +245,6 @@ public class WorkflowExecuteThread implements Runnable {
 
     /**
      * the process start nodes are submitted completely.
-     * @return
      */
     public boolean isStart() {
         return this.isStart;
@@ -286,9 +298,10 @@ public class WorkflowExecuteThread implements Runnable {
     private boolean stateEventHandler(StateEvent stateEvent) {
         logger.info("process event: {}", stateEvent.toString());
 
-        if (!checkStateEvent(stateEvent)) {
+        if (!checkProcessInstance(stateEvent)) {
             return false;
         }
+
         boolean result = false;
         switch (stateEvent.getType()) {
             case PROCESS_STATE_CHANGE:
@@ -314,16 +327,11 @@ public class WorkflowExecuteThread implements Runnable {
     }
 
     private boolean taskTimeout(StateEvent stateEvent) {
-
-        if (taskInstanceHashMap.containsRow(stateEvent.getTaskInstanceId())) {
+        if (!checkTaskInstanceByStateEvent(stateEvent)) {
             return true;
         }
 
-        TaskInstance taskInstance = taskInstanceHashMap
-                .row(stateEvent.getTaskInstanceId())
-                .values()
-                .iterator().next();
-
+        TaskInstance taskInstance = taskInstanceMap.get(stateEvent.getTaskInstanceId());
         if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
             return true;
         }
@@ -344,7 +352,16 @@ public class WorkflowExecuteThread implements Runnable {
     }
 
     private boolean taskStateChangeHandler(StateEvent stateEvent) {
-        TaskInstance task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+        if (!checkTaskInstanceByStateEvent(stateEvent)) {
+            return true;
+        }
+
+        TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
+        if (task.getState() == null) {
+            logger.error("task state is null, state handler error: {}", stateEvent);
+            return true;
+        }
+
         if (task.getState().typeIsFinished()) {
             taskFinished(task);
         } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
@@ -356,7 +373,7 @@ public class WorkflowExecuteThread implements Runnable {
                 taskFinished(task);
             }
         } else {
-            logger.error("state handler error: {}", stateEvent.toString());
+            logger.error("state handler error: {}", stateEvent);
         }
         return true;
     }
@@ -382,10 +399,11 @@ public class WorkflowExecuteThread implements Runnable {
             }
             return;
         }
-        ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId());
-        completeTaskList.put(Long.toString(task.getTaskCode()), task);
+
+        completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
         activeTaskProcessorMaps.remove(task.getId());
         taskTimeoutCheckList.remove(task.getId());
+
         if (task.getState().typeIsSuccess()) {
             processInstance.setVarPool(task.getVarPool());
             processService.saveProcessInstance(processInstance);
@@ -395,7 +413,7 @@ public class WorkflowExecuteThread implements Runnable {
                     || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
                 submitPostNode(Long.toString(task.getTaskCode()));
             } else {
-                errorTaskList.put(Long.toString(task.getTaskCode()), task);
+                errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
                 if (processInstance.getFailureStrategy() == FailureStrategy.END) {
                     killAllTasks();
                 }
@@ -404,20 +422,102 @@ public class WorkflowExecuteThread implements Runnable {
         this.updateProcessInstanceState();
     }
 
-    private boolean checkStateEvent(StateEvent stateEvent) {
+    /**
+     * update process instance
+     */
+    public void refreshProcessInstance(int processInstanceId) {
+        logger.info("process instance update: {}", processInstanceId);
+        processInstance = processService.findProcessInstanceById(processInstanceId);
+        processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion());
+        processInstance.setProcessDefinition(processDefinition);
+    }
+
+    /**
+     * update task instance
+     */
+    public void refreshTaskInstance(int taskInstanceId) {
+        logger.info("task instance update: {} ", taskInstanceId);
+        TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
+        if (taskInstance == null) {
+            logger.error("can not find task instance, id:{}", taskInstanceId);
+            return;
+        }
+        processService.packageTaskInstance(taskInstance, processInstance);
+        taskInstanceMap.put(taskInstance.getId(), taskInstance);
+
+        validTaskMap.remove(Long.toString(taskInstance.getTaskCode()));
+        if (Flag.YES == taskInstance.getFlag()) {
+            validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
+        }
+    }
+
+    /**
+     * check process instance by state event
+     */
+    public boolean checkProcessInstance(StateEvent stateEvent) {
         if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
             logger.error("mismatch process instance id: {}, state event:{}",
                     this.processInstance.getId(),
-                    stateEvent.toString());
+                    stateEvent);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * check if task instance exist by state event
+     */
+    public boolean checkTaskInstanceByStateEvent(StateEvent stateEvent) {
+        if (stateEvent.getTaskInstanceId() == 0) {
+            logger.error("task instance id null, state event:{}", stateEvent);
+            return false;
+        }
+        if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) {
+            logger.error("mismatch task instance id, event:{}", stateEvent);
             return false;
         }
         return true;
     }
 
+    /**
+     * check if task instance exist by task code
+     */
+    public boolean checkTaskInstanceByCode(long taskCode) {
+        if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
+            return false;
+        }
+        for (TaskInstance taskInstance : taskInstanceMap.values()) {
+            if (taskInstance.getTaskCode() == taskCode) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * check if task instance exist by id
+     */
+    public boolean checkTaskInstanceById(int taskInstanceId) {
+        if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
+            return false;
+        }
+        return taskInstanceMap.containsKey(taskInstanceId);
+    }
+
+    /**
+     * get task instance from memory
+     */
+    public TaskInstance getTaskInstance(int taskInstanceId) {
+        if (taskInstanceMap.containsKey(taskInstanceId)) {
+            return taskInstanceMap.get(taskInstanceId);
+        }
+        return null;
+    }
+
     private boolean processStateChangeHandler(StateEvent stateEvent) {
         try {
             logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
-            processInstance = processService.findProcessInstanceById(this.processInstance.getId());
             if (processComplementData()) {
                 return true;
             }
@@ -477,7 +577,7 @@ public class WorkflowExecuteThread implements Runnable {
         processInstance.setStartTime(new Date());
         processInstance.setEndTime(null);
         processService.saveProcessInstance(processInstance);
-        this.taskInstanceHashMap.clear();
+        this.taskInstanceMap.clear();
         startProcess();
         return true;
     }
@@ -491,7 +591,7 @@ public class WorkflowExecuteThread implements Runnable {
     }
 
     private void startProcess() throws Exception {
-        if (this.taskInstanceHashMap.size() == 0) {
+        if (this.taskInstanceMap.size() == 0) {
             isStart = false;
             buildFlowDag();
             initTaskQueue();
@@ -505,25 +605,22 @@ public class WorkflowExecuteThread implements Runnable {
      */
     private void endProcess() {
         this.stateEvents.clear();
-        processInstance.setEndTime(new Date());
-        ProcessDefinition processDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),processInstance.getProcessDefinitionVersion());
         if (processDefinition.getExecutionType().typeIsSerialWait()) {
             checkSerialProcess(processDefinition);
         }
-        processService.updateProcessInstance(processInstance);
         if (processInstance.getState().typeIsWaitingThread()) {
             processService.createRecoveryWaitingThreadCommand(null, processInstance);
         }
-        List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId());
-        ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-        processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
+        if (processAlertManager.isNeedToSendWarning(processInstance)) {
+            ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+            processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
+        }
     }
 
     public void checkSerialProcess(ProcessDefinition processDefinition) {
-        this.processInstance = processService.findProcessInstanceById(processInstance.getId());
         int nextInstanceId = processInstance.getNextProcessInstanceId();
         if (nextInstanceId == 0) {
-            ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),ExecutionStatus.SERIAL_WAIT.getCode());
+            ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.SERIAL_WAIT.getCode());
             if (nextProcessInstance == null) {
                 return;
             }
@@ -553,19 +650,21 @@ public class WorkflowExecuteThread implements Runnable {
         }
         processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion());
-        recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
+        processInstance.setProcessDefinition(processDefinition);
+
+        List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
         List<TaskNode> taskNodeList =
-            processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
-        forbiddenTaskList.clear();
+                processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
+        forbiddenTaskMap.clear();
 
         taskNodeList.forEach(taskNode -> {
             if (taskNode.isForbidden()) {
-                forbiddenTaskList.put(Long.toString(taskNode.getCode()), taskNode);
+                forbiddenTaskMap.put(Long.toString(taskNode.getCode()), taskNode);
             }
         });
 
         // generate process to get DAG info
-        List<String> recoveryNodeCodeList = getRecoveryNodeCodeList();
+        List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
         List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
         ProcessDag processDag = generateFlowDag(taskNodeList,
                 startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
@@ -584,19 +683,23 @@ public class WorkflowExecuteThread implements Runnable {
 
         taskFailedSubmit = false;
         activeTaskProcessorMaps.clear();
-        dependFailedTask.clear();
-        completeTaskList.clear();
-        errorTaskList.clear();
-        List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
-        for (TaskInstance task : taskInstanceList) {
+        dependFailedTaskMap.clear();
+        completeTaskMap.clear();
+        errorTaskMap.clear();
+
+        List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
+        for (TaskInstance task : validTaskInstanceList) {
+            validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+            taskInstanceMap.put(task.getId(), task);
+
             if (task.isTaskComplete()) {
-                completeTaskList.put(Long.toString(task.getTaskCode()), task);
+                completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
             }
             if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
                 continue;
             }
             if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
-                errorTaskList.put(Long.toString(task.getTaskCode()), task);
+                errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
             }
         }
 
@@ -637,31 +740,32 @@ public class WorkflowExecuteThread implements Runnable {
                     && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
                 notifyProcessHostUpdate(taskInstance);
             }
+
+            // package task instance before submit
+            processService.packageTaskInstance(taskInstance, processInstance);
+
             boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval());
-            if (submit) {
-                this.taskInstanceHashMap.put(taskInstance.getId(), taskInstance.getTaskCode(), taskInstance);
-                activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
-                taskProcessor.run();
-                addTimeoutCheck(taskInstance);
-                TaskDefinition taskDefinition = processService.findTaskDefinition(
-                        taskInstance.getTaskCode(),
-                        taskInstance.getTaskDefinitionVersion());
-                taskInstance.setTaskDefine(taskDefinition);
-                if (taskProcessor.taskState().typeIsFinished()) {
-                    StateEvent stateEvent = new StateEvent();
-                    stateEvent.setProcessInstanceId(this.processInstance.getId());
-                    stateEvent.setTaskInstanceId(taskInstance.getId());
-                    stateEvent.setExecutionStatus(taskProcessor.taskState());
-                    stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-                    this.stateEvents.add(stateEvent);
-                }
-                return taskInstance;
-            } else {
+            if (!submit) {
                 logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
                         processInstance.getId(), processInstance.getName(),
                         taskInstance.getId(), taskInstance.getName());
                 return null;
             }
+            taskInstanceMap.put(taskInstance.getId(), taskInstance);
+            activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
+            taskProcessor.run();
+
+            addTimeoutCheck(taskInstance);
+
+            if (taskProcessor.taskState().typeIsFinished()) {
+                StateEvent stateEvent = new StateEvent();
+                stateEvent.setProcessInstanceId(this.processInstance.getId());
+                stateEvent.setTaskInstanceId(taskInstance.getId());
+                stateEvent.setExecutionStatus(taskProcessor.taskState());
+                stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+                this.stateEvents.add(stateEvent);
+            }
+            return taskInstance;
         } catch (Exception e) {
             logger.error("submit standby task error", e);
             return null;
@@ -688,11 +792,12 @@ public class WorkflowExecuteThread implements Runnable {
         if (taskTimeoutCheckList.containsKey(taskInstance.getId())) {
             return;
         }
-        TaskDefinition taskDefinition = processService.findTaskDefinition(
-                taskInstance.getTaskCode(),
-                taskInstance.getTaskDefinitionVersion()
-        );
-        taskInstance.setTaskDefine(taskDefinition);
+        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
+        if (taskDefinition == null) {
+            logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
+            return;
+        }
+
         if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) {
             this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
         } else {
@@ -711,8 +816,8 @@ public class WorkflowExecuteThread implements Runnable {
      * @return TaskInstance
      */
     private TaskInstance findTaskIfExists(Long taskCode, int taskVersion) {
-        List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(this.processInstance.getId());
-        for (TaskInstance taskInstance : taskInstanceList) {
+        List<TaskInstance> validTaskInstanceList = getValidTaskList();
+        for (TaskInstance taskInstance : validTaskInstanceList) {
             if (taskInstance.getTaskCode() == taskCode && taskInstance.getTaskDefinitionVersion() == taskVersion) {
                 return taskInstance;
             }
@@ -805,7 +910,11 @@ public class WorkflowExecuteThread implements Runnable {
         Map<String, TaskInstance> allTaskInstance = new HashMap<>();
         if (CollectionUtils.isNotEmpty(preTask)) {
             for (String preTaskCode : preTask) {
-                TaskInstance preTaskInstance = completeTaskList.get(preTaskCode);
+                Integer taskId = completeTaskMap.get(preTaskCode);
+                if (taskId == null) {
+                    continue;
+                }
+                TaskInstance preTaskInstance = taskInstanceMap.get(taskId);
                 if (preTaskInstance == null) {
                     continue;
                 }
@@ -854,12 +963,35 @@ public class WorkflowExecuteThread implements Runnable {
         }
     }
 
+    /**
+     * get complete task instance map, taskCode as key
+     */
+    private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
+        Map<String, TaskInstance> completeTaskInstanceMap = new HashMap<>();
+        for (Integer taskInstanceId : completeTaskMap.values()) {
+            TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
+            completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance);
+        }
+        return completeTaskInstanceMap;
+    }
+
+    /**
+     * get valid task list
+     */
+    private List<TaskInstance> getValidTaskList() {
+        List<TaskInstance> validTaskInstanceList = new ArrayList<>();
+        for (Integer taskInstanceId : validTaskMap.values()) {
+            validTaskInstanceList.add(taskInstanceMap.get(taskInstanceId));
+        }
+        return validTaskInstanceList;
+    }
+
     private void submitPostNode(String parentNodeCode) {
-        Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeList, dag, completeTaskList);
+        Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
         List<TaskInstance> taskInstances = new ArrayList<>();
         for (String taskNode : submitTaskNodeList) {
             TaskNode taskNodeObject = dag.getNode(taskNode);
-            if (taskInstanceHashMap.containsColumn(taskNodeObject.getCode())) {
+            if (checkTaskInstanceByCode(taskNodeObject.getCode())) {
                 continue;
             }
             TaskInstance task = createTaskInstance(processInstance, taskNodeObject);
@@ -873,15 +1005,16 @@ public class WorkflowExecuteThread implements Runnable {
                 continue;
             }
 
-            if (completeTaskList.containsKey(Long.toString(task.getTaskCode()))) {
+            if (completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) {
                 logger.info("task {} has already run success", task.getName());
                 continue;
             }
             if (task.getState().typeIsPause() || task.getState().typeIsCancel()) {
                 logger.info("task {} stopped, the state is {}", task.getName(), task.getState());
-            } else {
-                addTaskToStandByList(task);
+                continue;
             }
+
+            addTaskToStandByList(task);
         }
         submitStandByTask();
         updateProcessInstanceState();
@@ -903,15 +1036,16 @@ public class WorkflowExecuteThread implements Runnable {
         List<String> depCodeList = taskNode.getDepList();
         for (String depsNode : depCodeList) {
             if (!dag.containsNode(depsNode)
-                    || forbiddenTaskList.containsKey(depsNode)
-                    || skipTaskNodeList.containsKey(depsNode)) {
+                    || forbiddenTaskMap.containsKey(depsNode)
+                    || skipTaskNodeMap.containsKey(depsNode)) {
                 continue;
             }
             // dependencies must be fully completed
-            if (!completeTaskList.containsKey(depsNode)) {
+            if (!completeTaskMap.containsKey(depsNode)) {
                 return DependResult.WAITING;
             }
-            ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
+            Integer depsTaskId = completeTaskMap.get(depsNode);
+            ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
             if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
                 return DependResult.NON_EXEC;
             }
@@ -923,7 +1057,7 @@ public class WorkflowExecuteThread implements Runnable {
                 return DependResult.FAILED;
             }
         }
-        logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskList.keySet().toArray()));
+        logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
         return DependResult.SUCCESS;
     }
 
@@ -933,12 +1067,13 @@ public class WorkflowExecuteThread implements Runnable {
     private boolean dependTaskSuccess(String dependNodeName, String nextNodeName) {
         if (dag.getNode(dependNodeName).isConditionsTask()) {
             //condition task need check the branch to run
-            List<String> nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeList, dag, completeTaskList);
+            List<String> nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
             if (!nextTaskList.contains(nextNodeName)) {
                 return false;
             }
         } else {
-            ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState();
+            Integer taskInstanceId = completeTaskMap.get(dependNodeName);
+            ExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
             if (depTaskState.typeIsFailure()) {
                 return false;
             }
@@ -954,9 +1089,10 @@ public class WorkflowExecuteThread implements Runnable {
      */
     private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state) {
         List<TaskInstance> resultList = new ArrayList<>();
-        for (Map.Entry<String, TaskInstance> entry : completeTaskList.entrySet()) {
-            if (entry.getValue().getState() == state) {
-                resultList.add(entry.getValue());
+        for (Integer taskInstanceId : completeTaskMap.values()) {
+            TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
+            if (taskInstance != null && taskInstance.getState() == state) {
+                resultList.add(taskInstance);
             }
         }
         return resultList;
@@ -990,10 +1126,10 @@ public class WorkflowExecuteThread implements Runnable {
         if (this.taskFailedSubmit) {
             return true;
         }
-        if (this.errorTaskList.size() > 0) {
+        if (this.errorTaskMap.size() > 0) {
             return true;
         }
-        return this.dependFailedTask.size() > 0;
+        return this.dependFailedTaskMap.size() > 0;
     }
 
     /**
@@ -1049,7 +1185,6 @@ public class WorkflowExecuteThread implements Runnable {
     /**
      * generate the latest process instance status by the tasks state
      *
-     * @param instance
      * @return process instance execution status
      */
     private ExecutionStatus getProcessInstanceState(ProcessInstance instance) {
@@ -1130,8 +1265,7 @@ public class WorkflowExecuteThread implements Runnable {
      * after each batch of tasks is executed, the status of the process instance is updated
      */
     private void updateProcessInstanceState() {
-        ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
-        ExecutionStatus state = getProcessInstanceState(instance);
+        ExecutionStatus state = getProcessInstanceState(processInstance);
         if (processInstance.getState() != state) {
             logger.info(
                     "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
@@ -1139,9 +1273,12 @@ public class WorkflowExecuteThread implements Runnable {
                     processInstance.getState(), state,
                     processInstance.getCommandType());
 
-            instance.setState(state);
-            processService.updateProcessInstance(instance);
-            processInstance = instance;
+            processInstance.setState(state);
+            if (state.typeIsFinished()) {
+                processInstance.setEndTime(new Date());
+            }
+            processService.updateProcessInstance(processInstance);
+
             StateEvent stateEvent = new StateEvent();
             stateEvent.setExecutionStatus(processInstance.getState());
             stateEvent.setProcessInstanceId(this.processInstance.getId());
@@ -1254,7 +1391,8 @@ public class WorkflowExecuteThread implements Runnable {
                         task.setState(retryTask.getState());
                         logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
                         removeTaskFromStandbyList(task);
-                        completeTaskList.put(Long.toString(task.getTaskCode()), task);
+                        completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+                        taskInstanceMap.put(task.getId(), task);
                         submitPostNode(Long.toString(task.getTaskCode()));
                         continue;
                     }
@@ -1281,7 +1419,7 @@ public class WorkflowExecuteThread implements Runnable {
                     }
                 } else if (DependResult.FAILED == dependResult) {
                     // if the dependency fails, the current node is not submitted and the state changes to failure.
-                    dependFailedTask.put(Long.toString(task.getTaskCode()), task);
+                    dependFailedTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
                     removeTaskFromStandbyList(task);
                     logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
                 } else if (DependResult.NON_EXEC == dependResult) {
@@ -1366,10 +1504,10 @@ public class WorkflowExecuteThread implements Runnable {
      *
      * @return recovery node code list
      */
-    private List<String> getRecoveryNodeCodeList() {
+    private List<String> getRecoveryNodeCodeList(List<TaskInstance> recoverNodeList) {
         List<String> recoveryNodeCodeList = new ArrayList<>();
-        if (CollectionUtils.isNotEmpty(recoverNodeIdList)) {
-            for (TaskInstance task : recoverNodeIdList) {
+        if (CollectionUtils.isNotEmpty(recoverNodeList)) {
+            for (TaskInstance task : recoverNodeList) {
                 recoveryNodeCodeList.add(Long.toString(task.getTaskCode()));
             }
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index fb14d96..7194ff9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -161,8 +161,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
      * @return TaskExecutionContext
      */
     protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance) {
-        processService.setTaskInstanceDetail(taskInstance);
-
         int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
         Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
 
@@ -172,12 +170,12 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
                     taskInstance.getStartTime(),
                     taskInstance.getHost(),
                     null,
-                    null,
-                    taskInstance.getId());
+                    null
+            );
             return null;
         }
         // set queue for process instance, user-specified queue takes precedence over tenant queue
-        String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+        String userQueue = processService.queryUserQueueByProcessInstance(taskInstance.getProcessInstance());
         taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
         taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
         taskInstance.setResources(getResourceFullNames(taskInstance));
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index ee1c548..7b193bf 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -65,7 +65,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
     @Override
     public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
         this.processInstance = processInstance;
-        this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval);
+        this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval);
 
         if (this.taskInstance == null) {
             return false;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index ee1cf82..ee03602 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -72,7 +72,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
     @Override
     public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
         this.processInstance = processInstance;
-        this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+        this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
 
         if (this.taskInstance == null) {
             return false;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 8c3a287..d873a81 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.DependentExecute;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -81,7 +80,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
     public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
         this.processInstance = processInstance;
         this.taskInstance = task;
-        this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+        this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
 
         if (this.taskInstance == null) {
             return false;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index e0cd3e8..421efd3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -54,7 +54,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         taskDefinition = processService.findTaskDefinition(
                 task.getTaskCode(), task.getTaskDefinitionVersion()
         );
-        this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+        this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
 
         if (this.taskInstance == null) {
             return false;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 116a8d5..70013e1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -63,7 +63,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
 
         this.processInstance = processInstance;
-        this.taskInstance = processService.submitTask(taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+        this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
 
         if (this.taskInstance == null) {
             return false;
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
index c8a4ae4..4207d30 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
@@ -89,7 +89,7 @@ public class ConditionsTaskTest {
 
         // for MasterBaseTaskExecThread.submit
         Mockito.when(processService
-                .submitTask(taskInstance))
+                .submitTask(processInstance, taskInstance))
                 .thenReturn(taskInstance);
         // for MasterBaseTaskExecThread.call
         Mockito.when(processService
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index 1fa37cc..74017f0 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -93,6 +93,7 @@ public class DependentTaskTest {
         Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
         processInstance = getProcessInstance();
+        taskInstance = getTaskInstance();
 
         // for MasterBaseTaskExecThread.call
         // for DependentTaskExecThread.waitTaskQuit
@@ -102,7 +103,7 @@ public class DependentTaskTest {
 
         // for MasterBaseTaskExecThread.submit
         Mockito.when(processService
-                .submitTask(Mockito.argThat(taskInstance -> taskInstance.getId() == 1000)))
+                .submitTask(processInstance, taskInstance))
                 .thenAnswer(i -> taskInstance);
 
         // for DependentTaskExecThread.initTaskParameters
@@ -346,6 +347,12 @@ public class DependentTaskTest {
         return processInstance;
     }
 
+    private TaskInstance getTaskInstance() {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1000);
+        return taskInstance;
+    }
+
     /**
      * task that dependent on others (and to be tested here)
      * notice: should be filled with setDependence() and be passed to setupTaskInstance()
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
index 5fcfc77..8af6190 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
@@ -74,6 +74,8 @@ public class SubProcessTaskTest {
         Mockito.when(applicationContext.getBean(AlertDao.class)).thenReturn(alertDao);
 
         processInstance = getProcessInstance();
+        TaskInstance taskInstance = getTaskInstance();
+
         Mockito.when(processService
                 .findProcessInstanceById(processInstance.getId()))
                 .thenReturn(processInstance);
@@ -85,7 +87,7 @@ public class SubProcessTaskTest {
 
         // for MasterBaseTaskExecThread.submit
         Mockito.when(processService
-                .submitTask(Mockito.any()))
+                .submitTask(processInstance, taskInstance))
                 .thenAnswer(t -> t.getArgument(0));
 
         TaskDefinition taskDefinition = new TaskDefinition();
@@ -147,6 +149,12 @@ public class SubProcessTaskTest {
         return processInstance;
     }
 
+    private TaskInstance getTaskInstance() {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1000);
+        return taskInstance;
+    }
+
     private ProcessInstance getSubProcessInstance(ExecutionStatus executionStatus) {
         ProcessInstance processInstance = new ProcessInstance();
         processInstance.setId(102);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
index 3221edb..4516d9d 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
@@ -81,7 +81,7 @@ public class SwitchTaskTest {
 
         // for MasterBaseTaskExecThread.submit
         Mockito.when(processService
-                .submitTask(taskInstance))
+                .submitTask(processInstance, taskInstance))
                 .thenReturn(taskInstance);
         // for MasterBaseTaskExecThread.call
         Mockito.when(processService
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 911d0d7..87bc042 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -162,39 +162,52 @@ public class WorkflowExecuteThreadTest {
     public void testGetPreVarPool() {
         try {
             Set<String> preTaskName = new HashSet<>();
-            preTaskName.add("test1");
-            preTaskName.add("test2");
-            Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
+            preTaskName.add(Long.toString(1));
+            preTaskName.add(Long.toString(2));
 
             TaskInstance taskInstance = new TaskInstance();
 
             TaskInstance taskInstance1 = new TaskInstance();
             taskInstance1.setId(1);
-            taskInstance1.setName("test1");
+            taskInstance1.setTaskCode(1);
             taskInstance1.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"1\"}]");
             taskInstance1.setEndTime(new Date());
 
             TaskInstance taskInstance2 = new TaskInstance();
             taskInstance2.setId(2);
-            taskInstance2.setName("test2");
+            taskInstance2.setTaskCode(2);
             taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test2\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
             taskInstance2.setEndTime(new Date());
 
-            completeTaskList.put("test1", taskInstance1);
-            completeTaskList.put("test2", taskInstance2);
+            Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
+            taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
+            taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
+
+            Map<String, Integer> completeTaskList = new ConcurrentHashMap<>();
+            completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance1.getId());
+            completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance2.getId());
 
             Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
 
-            Field field = masterExecThreadClass.getDeclaredField("completeTaskList");
-            field.setAccessible(true);
-            field.set(workflowExecuteThread, completeTaskList);
+            Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap");
+            completeTaskMapField.setAccessible(true);
+            completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+
+            Field taskInstanceMapField = masterExecThreadClass.getDeclaredField("taskInstanceMap");
+            taskInstanceMapField.setAccessible(true);
+            taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
 
             workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
             Assert.assertNotNull(taskInstance.getVarPool());
+
             taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
-            completeTaskList.put("test2", taskInstance2);
-            field.setAccessible(true);
-            field.set(workflowExecuteThread, completeTaskList);
+            completeTaskList.put(Long.toString(taskInstance2.getTaskCode()), taskInstance2.getId());
+
+            completeTaskMapField.setAccessible(true);
+            completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+            taskInstanceMapField.setAccessible(true);
+            taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
+
             workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
             Assert.assertNotNull(taskInstance.getVarPool());
         } catch (Exception e) {
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
deleted file mode 100644
index f609845..0000000
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.cache.impl;
-
-import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
-
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
-
-import java.util.Calendar;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class TaskInstanceCacheManagerImplTest {
-
-    @InjectMocks
-    private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
-
-    @Mock(name = "processService")
-    private ProcessService processService;
-
-    @Before
-    public void before() {
-
-        TaskExecuteAckCommand taskExecuteAckCommand = new TaskExecuteAckCommand();
-        taskExecuteAckCommand.setStatus(1);
-        taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker");
-        taskExecuteAckCommand.setHost("worker007");
-        taskExecuteAckCommand.setLogPath("/temp/worker.log");
-        taskExecuteAckCommand.setStartTime(new Date(1970, Calendar.AUGUST,7));
-        taskExecuteAckCommand.setTaskInstanceId(0);
-
-        taskInstanceCacheManager.cacheTaskInstance(taskExecuteAckCommand);
-
-    }
-
-    @Test
-    public void testInit() throws InterruptedException {
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(0);
-        taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
-        taskInstance.setExecutePath("/dolphinscheduler/worker");
-        taskInstance.setHost("worker007");
-        taskInstance.setLogPath("/temp/worker.log");
-        taskInstance.setProcessInstanceId(0);
-
-        Mockito.when(processService.findTaskInstanceById(0)).thenReturn(taskInstance);
-
-        taskInstanceCacheManager.init();
-        TimeUnit.MILLISECONDS.sleep(CACHE_REFRESH_TIME_MILLIS + 1000);
-
-        Assert.assertEquals(taskInstance.getState(), taskInstanceCacheManager.getByTaskInstanceId(0).getState());
-
-    }
-
-    @Test
-    public void getByTaskInstanceIdFromCache() {
-        TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(0);
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(0);
-        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
-        taskInstance.setExecutePath("/dolphinscheduler/worker");
-        taskInstance.setHost("worker007");
-        taskInstance.setLogPath("/temp/worker.log");
-        taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
-
-        Assert.assertEquals(taskInstance.toString(), instanceGot.toString());
-
-    }
-
-    @Test
-    public void getByTaskInstanceIdFromDatabase() {
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(1);
-        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
-        taskInstance.setExecutePath("/dolphinscheduler/worker");
-        taskInstance.setHost("worker007");
-        taskInstance.setLogPath("/temp/worker.log");
-        taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
-
-        Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
-
-        TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(1);
-
-        Assert.assertEquals(taskInstance, instanceGot);
-
-    }
-
-    @Test
-    public void cacheTaskInstanceByTaskExecutionContext() {
-        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
-        taskExecutionContext.setTaskInstanceId(2);
-        taskExecutionContext.setTaskName("blackberrier test");
-        taskExecutionContext.setStartTime(new Date(1970, Calendar.AUGUST,7));
-        taskExecutionContext.setTaskType(TaskType.SPARK.getDesc());
-        taskExecutionContext.setExecutePath("/tmp");
-
-        taskInstanceCacheManager.cacheTaskInstance(taskExecutionContext);
-
-        TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(2);
-
-        Assert.assertEquals(taskInstance.getId(), 2);
-        Assert.assertEquals(taskInstance.getName(), "blackberrier test");
-        Assert.assertEquals(taskInstance.getStartTime(), new Date(1970, Calendar.AUGUST, 7));
-        Assert.assertEquals(taskInstance.getTaskType(), TaskType.SPARK.getDesc());
-        Assert.assertEquals(taskInstance.getExecutePath(), "/tmp");
-
-    }
-
-    @Test
-    public void testCacheTaskInstanceByTaskExecuteAckCommand() {
-        TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
-
-        Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskInstance.getState());
-        Assert.assertEquals(new Date(1970, Calendar.AUGUST, 7), taskInstance.getStartTime());
-        Assert.assertEquals("worker007", taskInstance.getHost());
-        Assert.assertEquals("/dolphinscheduler/worker", taskInstance.getExecutePath());
-        Assert.assertEquals("/temp/worker.log", taskInstance.getLogPath());
-
-    }
-
-    @Test
-    public void testCacheTaskInstanceByTaskExecuteResponseCommand() {
-        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
-        responseCommand.setTaskInstanceId(0);
-        responseCommand.setStatus(9);
-        responseCommand.setEndTime(new Date(1970, Calendar.AUGUST, 8));
-
-        taskInstanceCacheManager.cacheTaskInstance(responseCommand);
-
-        TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
-
-        Assert.assertEquals(new Date(1970, Calendar.AUGUST, 8), taskInstance.getEndTime());
-        Assert.assertEquals(ExecutionStatus.KILL, taskInstance.getState());
-
-    }
-
-    @Test
-    public void removeByTaskInstanceId() {
-        taskInstanceCacheManager.removeByTaskInstanceId(0);
-        Assert.assertNull(taskInstanceCacheManager.getByTaskInstanceId(0));
-
-    }
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index f9d51a9..99b6f79 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -76,8 +76,6 @@ public class TaskPriorityQueueConsumerTest {
         tenant.setUpdateTime(new Date());
 
         Mockito.doReturn(tenant).when(processService).getTenantForProcess(1, 2);
-
-        Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1);
     }
 
     @Test
@@ -101,7 +99,6 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
 
-        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
         taskPriorityQueue.put(taskPriority);
 
@@ -129,7 +126,6 @@ public class TaskPriorityQueueConsumerTest {
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
-        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
         taskPriorityQueue.put(taskPriority);
 
@@ -171,7 +167,6 @@ public class TaskPriorityQueueConsumerTest {
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
-        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
         taskPriorityQueue.put(taskPriority);
 
@@ -211,7 +206,6 @@ public class TaskPriorityQueueConsumerTest {
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
-        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
         taskPriorityQueue.put(taskPriority);
 
@@ -271,7 +265,6 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
 
-        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
 
         TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
@@ -310,7 +303,6 @@ public class TaskPriorityQueueConsumerTest {
         taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
         taskInstance.setTaskDefine(taskDefinition);
 
-        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
 
         TaskPriority taskPriority = new TaskPriority();
@@ -342,7 +334,6 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
 
-        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
 
         TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
index e215d4c..823ffa2 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
@@ -18,20 +18,16 @@
 package org.apache.dolphinscheduler.server.master.processor;
 
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import java.net.InetSocketAddress;
 import java.util.Date;
 
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -39,7 +35,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import io.netty.channel.Channel;
 
 /**
- *  task ack processor test
+ * task ack processor test
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({SpringApplicationContext.class, TaskResponseEvent.class})
@@ -47,7 +43,6 @@ public class TaskAckProcessorTest {
 
     private TaskAckProcessor taskAckProcessor;
     private TaskResponseService taskResponseService;
-    private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
     private ProcessService processService;
     private TaskExecuteAckCommand taskExecuteAckCommand;
     private TaskResponseEvent taskResponseEvent;
@@ -60,9 +55,6 @@ public class TaskAckProcessorTest {
         taskResponseService = PowerMockito.mock(TaskResponseService.class);
         PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService);
 
-        taskInstanceCacheManager = PowerMockito.mock(TaskInstanceCacheManagerImpl.class);
-        PowerMockito.when(SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class)).thenReturn(taskInstanceCacheManager);
-
         processService = PowerMockito.mock(ProcessService.class);
         PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
index e7afa14..55828e1 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
@@ -86,7 +86,6 @@ public class CommonTaskProcessorTest {
         taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
         taskInstance.setTaskDefine(taskDefinition);
 
-        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
 
         TaskExecutionContext taskExecutionContext = commonTaskProcessor.getTaskExecutionContext(taskInstance);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
index 4429e7c..8d1faa8 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
@@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
 import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
@@ -68,11 +67,6 @@ public class DependencyConfig {
     }
 
     @Bean
-    public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl() {
-        return Mockito.mock(TaskInstanceCacheManagerImpl.class);
-    }
-
-    @Bean
     public ProcessService processService() {
         return Mockito.mock(ProcessService.class);
     }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
index f4876a6..0ac2372 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
@@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.mockito.Mockito;
@@ -60,11 +59,6 @@ public class TaskCallbackServiceTestConfig {
     }
 
     @Bean
-    public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl() {
-        return Mockito.mock(TaskInstanceCacheManagerImpl.class);
-    }
-
-    @Bean
     public ProcessService processService() {
         return Mockito.mock(ProcessService.class);
     }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
index c2db565..669bd4e 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
@@ -205,43 +205,54 @@ public class ProcessAlertManager {
                                          List<TaskInstance> taskInstances,
                                          ProjectUser projectUser) {
 
-        if (Flag.YES == processInstance.getIsSubProcess()) {
+        if (!isNeedToSendWarning(processInstance)) {
             return;
         }
-        boolean sendWarnning = false;
+
+        Alert alert = new Alert();
+
+        String cmdName = getCommandCnName(processInstance.getCommandType());
+        String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
+        alert.setTitle(cmdName + " " + success);
+        String content = getContentProcessInstance(processInstance, taskInstances,projectUser);
+        alert.setContent(content);
+        alert.setAlertGroupId(processInstance.getWarningGroupId());
+        alert.setCreateTime(new Date());
+        alertDao.addAlert(alert);
+        logger.info("add alert to db , alert: {}", alert);
+    }
+
+    /**
+     * check if need to be send warning
+     *
+     * @param processInstance
+     * @return
+     */
+    public boolean isNeedToSendWarning(ProcessInstance processInstance) {
+        if (Flag.YES == processInstance.getIsSubProcess()) {
+            return false;
+        }
+        boolean sendWarning = false;
         WarningType warningType = processInstance.getWarningType();
         switch (warningType) {
             case ALL:
                 if (processInstance.getState().typeIsFinished()) {
-                    sendWarnning = true;
+                    sendWarning = true;
                 }
                 break;
             case SUCCESS:
                 if (processInstance.getState().typeIsSuccess()) {
-                    sendWarnning = true;
+                    sendWarning = true;
                 }
                 break;
             case FAILURE:
                 if (processInstance.getState().typeIsFailure()) {
-                    sendWarnning = true;
+                    sendWarning = true;
                 }
                 break;
             default:
         }
-        if (!sendWarnning) {
-            return;
-        }
-        Alert alert = new Alert();
-
-        String cmdName = getCommandCnName(processInstance.getCommandType());
-        String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
-        alert.setTitle(cmdName + " " + success);
-        String content = getContentProcessInstance(processInstance, taskInstances,projectUser);
-        alert.setContent(content);
-        alert.setAlertGroupId(processInstance.getWarningGroupId());
-        alert.setCreateTime(new Date());
-        alertDao.addAlert(alert);
-        logger.info("add alert to db , alert: {}", alert);
+        return sendWarning;
     }
 
     /**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bf0706c..fbf29b3 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -100,6 +100,7 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@@ -1087,24 +1088,17 @@ public class ProcessService {
     /**
      * retry submit task to db
      */
-    public TaskInstance submitTask(TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
-
+    public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
         int retryTimes = 1;
-        boolean submitDB = false;
         TaskInstance task = null;
         while (retryTimes <= commitRetryTimes) {
             try {
-                if (!submitDB) {
-                    // submit task to db
-                    task = submitTask(taskInstance);
-                    if (task != null && task.getId() != 0) {
-                        submitDB = true;
-                        break;
-                    }
-                }
-                if (!submitDB) {
-                    logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
+                // submit task to db
+                task = SpringApplicationContext.getBean(ProcessService.class).submitTask(processInstance, taskInstance);
+                if (task != null && task.getId() != 0) {
+                    break;
                 }
+                logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
                 Thread.sleep(commitInterval);
             } catch (Exception e) {
                 logger.error("task commit to mysql failed", e);
@@ -1118,12 +1112,12 @@ public class ProcessService {
      * submit task to db
      * submit sub process to command
      *
+     * @param processInstance processInstance
      * @param taskInstance taskInstance
      * @return task instance
      */
     @Transactional(rollbackFor = Exception.class)
-    public TaskInstance submitTask(TaskInstance taskInstance) {
-        ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+    public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
         logger.info("start submit task : {}, instance id:{}, state: {}",
                 taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
         //submit to db
@@ -1131,8 +1125,9 @@ public class ProcessService {
         if (task == null) {
             logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
                     taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
-            return task;
+            return null;
         }
+
         if (!task.getState().typeIsFinished()) {
             createSubWorkProcess(processInstance, task);
         }
@@ -1383,7 +1378,7 @@ public class ProcessService {
         }
         taskInstance.setExecutorId(processInstance.getExecutorId());
         taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
-        taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
+        taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
         if (taskInstance.getSubmitTime() == null) {
             taskInstance.setSubmitTime(new Date());
         }
@@ -1406,10 +1401,10 @@ public class ProcessService {
      * if all of above are not satisfied, return submit success
      *
      * @param taskInstance taskInstance
-     * @param processInstanceState processInstanceState
+     * @param processInstance processInstance
      * @return process instance state
      */
-    public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) {
+    public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
         ExecutionStatus state = taskInstance.getState();
         // running, delayed or killed
         // the task already exists in task queue
@@ -1423,10 +1418,10 @@ public class ProcessService {
         }
         //return pasue /stop if process instance state is ready pause / stop
         // or return submit success
-        if (processInstanceState == ExecutionStatus.READY_PAUSE) {
+        if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
             state = ExecutionStatus.PAUSE;
-        } else if (processInstanceState == ExecutionStatus.READY_STOP
-                || !checkProcessStrategy(taskInstance)) {
+        } else if (processInstance.getState() == ExecutionStatus.READY_STOP
+                || !checkProcessStrategy(taskInstance, processInstance)) {
             state = ExecutionStatus.KILL;
         } else {
             state = ExecutionStatus.SUBMITTED_SUCCESS;
@@ -1440,8 +1435,7 @@ public class ProcessService {
      * @param taskInstance taskInstance
      * @return check strategy result
      */
-    private boolean checkProcessStrategy(TaskInstance taskInstance) {
-        ProcessInstance processInstance = this.findProcessInstanceById(taskInstance.getProcessInstanceId());
+    private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) {
         FailureStrategy failureStrategy = processInstance.getFailureStrategy();
         if (failureStrategy == FailureStrategy.CONTINUE) {
             return true;
@@ -1535,39 +1529,15 @@ public class ProcessService {
     }
 
     /**
-     * package task instanceļ¼Œassociate processInstance and processDefine
-     *
-     * @param taskInstId taskInstId
-     * @return task instance
+     * package task instance
      */
-    public TaskInstance getTaskInstanceDetailByTaskId(int taskInstId) {
-        // get task instance
-        TaskInstance taskInstance = findTaskInstanceById(taskInstId);
-        if (taskInstance == null) {
-            return null;
-        }
-        setTaskInstanceDetail(taskInstance);
-        return taskInstance;
-    }
-
-    /**
-     * package task instanceļ¼Œassociate processInstance and processDefine
-     *
-     * @param taskInstance taskInstance
-     * @return task instance
-     */
-    public void setTaskInstanceDetail(TaskInstance taskInstance) {
-        // get process instance
-        ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
-        // get process define
-        ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(),
-                processInstance.getProcessDefinitionVersion());
+    public void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance) {
         taskInstance.setProcessInstance(processInstance);
-        taskInstance.setProcessDefine(processDefine);
-        TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+        taskInstance.setProcessDefine(processInstance.getProcessDefinition());
+        TaskDefinition taskDefinition = this.findTaskDefinition(
                 taskInstance.getTaskCode(),
                 taskInstance.getTaskDefinitionVersion());
-        updateTaskDefinitionResources(taskDefinition);
+        this.updateTaskDefinitionResources(taskDefinition);
         taskInstance.setTaskDefine(taskDefinition);
     }
 
@@ -1576,7 +1546,7 @@ public class ProcessService {
      *
      * @param taskDefinition the given {@link TaskDefinition}
      */
-    private void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
+    public void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
         Map<String, Object> taskParameters = JSONUtils.parseObject(
                 taskDefinition.getTaskParams(),
                 new TypeReference<Map<String, Object>>() {
@@ -1757,12 +1727,10 @@ public class ProcessService {
      * @param host host
      * @param executePath executePath
      * @param logPath logPath
-     * @param taskInstId taskInstId
      */
     public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
                                 String executePath,
-                                String logPath,
-                                int taskInstId) {
+                                String logPath) {
         taskInstance.setState(state);
         taskInstance.setStartTime(startTime);
         taskInstance.setHost(host);
@@ -1786,14 +1754,12 @@ public class ProcessService {
      *
      * @param state state
      * @param endTime endTime
-     * @param taskInstId taskInstId
      * @param varPool varPool
      */
     public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
                                 Date endTime,
                                 int processId,
                                 String appIds,
-                                int taskInstId,
                                 String varPool) {
         taskInstance.setPid(processId);
         taskInstance.setAppLink(appIds);
@@ -2047,15 +2013,14 @@ public class ProcessService {
     }
 
     /**
-     * query user queue by process instance id
+     * query user queue by process instance
      *
-     * @param processInstanceId processInstanceId
+     * @param processInstance processInstance
      * @return queue
      */
-    public String queryUserQueueByProcessInstanceId(int processInstanceId) {
+    public String queryUserQueueByProcessInstance(ProcessInstance processInstance) {
 
         String queue = "";
-        ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
         if (processInstance == null) {
             return queue;
         }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index d78fb98..2cbd298 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Objects;
 
 /**
- *  task priority info
+ * task priority info
  */
 public class TaskPriority implements Comparable<TaskPriority> {
 
@@ -62,7 +62,14 @@ public class TaskPriority implements Comparable<TaskPriority> {
      */
     private Map<String, String> context;
 
-    public TaskPriority(){}
+    /**
+     * checkpoint
+     */
+    private long checkpoint;
+
+    public TaskPriority() {
+        this.checkpoint = System.currentTimeMillis();
+    }
 
     public TaskPriority(int processInstancePriority,
                         int processInstanceId,
@@ -73,6 +80,7 @@ public class TaskPriority implements Comparable<TaskPriority> {
         this.taskInstancePriority = taskInstancePriority;
         this.taskId = taskId;
         this.groupName = groupName;
+        this.checkpoint = System.currentTimeMillis();
     }
 
     public int getProcessInstancePriority() {
@@ -131,6 +139,14 @@ public class TaskPriority implements Comparable<TaskPriority> {
         this.taskExecutionContext = taskExecutionContext;
     }
 
+    public long getCheckpoint() {
+        return checkpoint;
+    }
+
+    public void setCheckpoint(long checkpoint) {
+        this.checkpoint = checkpoint;
+    }
+
     @Override
     public int compareTo(TaskPriority other) {
         if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) {
@@ -174,7 +190,7 @@ public class TaskPriority implements Comparable<TaskPriority> {
         }
         TaskPriority that = (TaskPriority) o;
         return processInstancePriority == that.processInstancePriority
-                &&  processInstanceId == that.processInstanceId
+                && processInstanceId == that.processInstanceId
                 && taskInstancePriority == that.taskInstancePriority
                 && taskId == that.taskId
                 && Objects.equals(groupName, that.groupName);