You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/10/26 03:52:05 UTC

[dolphinscheduler] branch dev updated: [Refactor]migrate some task dao functions from ProcessServiceImpl to dao modules. (#12505)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 04aa125ba2 [Refactor]migrate some task dao functions from ProcessServiceImpl to dao modules. (#12505)
04aa125ba2 is described below

commit 04aa125ba2b7762ccee5e9ee80ad14f3fe83caf3
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Wed Oct 26 11:51:58 2022 +0800

    [Refactor]migrate some task dao functions from ProcessServiceImpl to dao modules. (#12505)
    
    * migrate some task DAO to dao modules.
    
    * Update dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java
    
    Co-authored-by: caishunfeng <ca...@gmail.com>
---
 .../api/service/impl/LoggerServiceImpl.java        |  12 +-
 .../service/impl/ProcessDefinitionServiceImpl.java |  27 +-
 .../service/impl/ProcessInstanceServiceImpl.java   |   8 +-
 .../api/service/LoggerServiceTest.java             |  16 +-
 .../api/service/ProcessDefinitionServiceTest.java  |   9 +-
 .../api/service/ProcessInstanceServiceTest.java    |  12 +-
 .../dao/repository/ProcessInstanceMapDao.java      |  49 +++
 .../dao/repository/TaskDefinitionDao.java          |  44 +++
 .../dao/repository/TaskDefinitionLogDao.java       |  44 +++
 .../dao/repository/TaskInstanceDao.java            |  90 +++++
 .../repository/impl/ProcessInstanceMapDaoImpl.java |  52 +++
 .../dao/repository/impl/TaskDefinitionDaoImpl.java |  87 +++++
 .../repository/impl/TaskDefinitionLogDaoImpl.java  |  91 +++++
 .../dao/repository/impl/TaskInstanceDaoImpl.java   | 170 +++++++++
 .../master/consumer/TaskPriorityQueueConsumer.java |   9 +-
 .../server/master/event/TaskDelayEventHandler.java |   6 +-
 .../master/event/TaskDispatchEventHandler.java     |   6 +-
 .../master/event/TaskResultEventHandler.java       |   6 +-
 .../master/event/TaskRunningEventHandler.java      |   6 +-
 .../processor/TaskExecuteStartProcessor.java       |   6 +-
 .../master/runner/MasterSchedulerBootstrap.java    |  12 +-
 .../master/runner/StreamTaskExecuteRunnable.java   |  12 +-
 .../master/runner/WorkflowExecuteRunnable.java     |  35 +-
 .../master/runner/task/BaseTaskProcessor.java      |   6 +-
 .../master/runner/task/BlockingTaskProcessor.java  |  10 +-
 .../master/runner/task/CommonTaskProcessor.java    |   4 +-
 .../master/runner/task/ConditionTaskProcessor.java |  10 +-
 .../master/runner/task/DependentTaskProcessor.java |   8 +-
 .../master/runner/task/SubTaskProcessor.java       |   4 +-
 .../master/runner/task/SwitchTaskProcessor.java    |  10 +-
 .../master/service/MasterFailoverService.java      |  12 +-
 .../master/service/WorkerFailoverService.java      |   9 +-
 .../server/master/utils/DependentExecute.java      |   5 +-
 .../server/master/BlockingTaskTest.java            |  24 +-
 .../server/master/ConditionsTaskTest.java          |  23 +-
 .../server/master/DependentTaskTest.java           |  31 +-
 .../server/master/SubProcessTaskTest.java          |   8 +-
 .../server/master/SwitchTaskTest.java              |  21 +-
 .../consumer/TaskPriorityQueueConsumerTest.java    |  12 +-
 .../master/runner/WorkflowExecuteRunnableTest.java |  12 +-
 .../runner/task/CommonTaskProcessorTest.java       |  16 +-
 .../server/master/service/FailoverServiceTest.java |  11 +-
 .../service/process/ProcessService.java            |  35 --
 .../service/process/ProcessServiceImpl.java        | 378 +++------------------
 .../service/process/ProcessServiceTest.java        |  32 +-
 45 files changed, 972 insertions(+), 518 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
index eecb40cc63..239c9669f1 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
@@ -33,9 +33,9 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.log.LogClient;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -60,7 +60,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
     private static final String LOG_HEAD_FORMAT = "[LOG-PATH]: %s, [HOST]:  %s%s";
 
     @Autowired
-    private ProcessService processService;
+    private TaskInstanceDao taskInstanceDao;
 
     @Autowired
     private LogClient logClient;
@@ -86,7 +86,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
     @SuppressWarnings("unchecked")
     public Result<ResponseTaskLog> queryLog(int taskInstId, int skipLineNum, int limit) {
 
-        TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
+        TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId);
 
         if (taskInstance == null) {
             logger.error("Task instance does not exist, taskInstanceId:{}.", taskInstId);
@@ -111,7 +111,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
      */
     @Override
     public byte[] getLogBytes(int taskInstId) {
-        TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
+        TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId);
         if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
             throw new ServiceException("task instance is null or host is null");
         }
@@ -138,7 +138,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
             return result;
         }
         // check whether the task instance can be found
-        TaskInstance task = processService.findTaskInstanceById(taskInstId);
+        TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId);
         if (task == null || StringUtils.isBlank(task.getHost())) {
             putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
             return result;
@@ -171,7 +171,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
             throw new ServiceException("user has no permission");
         }
         // check whether the task instance can be found
-        TaskInstance task = processService.findTaskInstanceById(taskInstId);
+        TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId);
         if (task == null || StringUtils.isBlank(task.getHost())) {
             throw new ServiceException("task instance is null or host is null");
         }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index ca6ccf7d65..acb05b897e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -103,6 +103,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.model.PageListingResult;
 import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@@ -200,6 +201,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private TaskDefinitionLogDao taskDefinitionLogDao;
+
     @Autowired
     private ProcessTaskRelationMapper processTaskRelationMapper;
 
@@ -388,9 +392,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         if (insertVersion == 0) {
             logger.error("Save process definition error, processCode:{}.", processDefinition.getCode());
             throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
-        } else
+        } else {
             logger.info("Save process definition complete, processCode:{}, processVersion:{}.",
                     processDefinition.getCode(), insertVersion);
+        }
         int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
                 processDefinition.getCode(),
                 insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
@@ -398,9 +403,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
                     processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
             throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
-        } else
+        } else {
             logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
                     processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+        }
 
         saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
 
@@ -885,9 +891,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 logger.error("Update process definition error, processCode:{}.", processDefinition.getCode());
                 putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
                 throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
-            } else
+            } else {
                 logger.info("Update process definition complete, processCode:{}, processVersion:{}.",
                         processDefinition.getCode(), insertVersion);
+            }
 
             taskUsedInOtherTaskValid(processDefinition, taskRelationList);
             int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
@@ -1136,9 +1143,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                     projectCode, code, schedule.getId());
                             putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
                             throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
-                        } else
+                        } else {
                             logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
                                     projectCode, code, schedule.getId());
+                        }
                         schedulerService.deleteSchedule(project.getId(), schedule.getId());
                     }
                 }
@@ -1184,8 +1192,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         if (CollectionUtils.isNotEmpty(dagDataSchedules)) {
             logger.info("Start download process definition file, processDefinitionCodes:{}.", defineCodeSet);
             downloadProcessDefinitionFile(response, dagDataSchedules);
-        } else
+        } else {
             logger.error("There is no exported process dag data.");
+        }
     }
 
     /**
@@ -1856,7 +1865,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                         processDefinitions.get(0).getVersion());
 
         // query task definition log
-        List<TaskDefinitionLog> taskDefinitionLogsList = processService.genTaskDefineList(processTaskRelations);
+        List<TaskDefinitionLog> taskDefinitionLogsList =
+                taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations);
 
         List<DependentSimplifyDefinition> taskDefinitionList = new ArrayList<>();
         for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogsList) {
@@ -1906,7 +1916,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit);
         processInstanceList.forEach(processInstance -> processInstance
                 .setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())));
-        List<TaskDefinitionLog> taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper
+        List<TaskDefinitionLog> taskDefinitionList = taskDefinitionLogDao.getTaskDefineLogList(processTaskRelationMapper
                 .queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()));
         Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
                 .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
@@ -2144,7 +2154,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             String otherParamsJson = doOtherOperateProcess(loginUser, processDefinition);
             if (isCopy) {
                 logger.info("Copy process definition...");
-                List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations);
+                List<TaskDefinitionLog> taskDefinitionLogs =
+                        taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations);
                 Map<Long, Long> taskCodeMap = new HashMap<>();
                 for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
                     try {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index ce1747ca85..3d350151aa 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -71,6 +71,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.WorkflowUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -129,6 +130,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
     @Autowired
     ProcessService processService;
 
+    @Autowired
+    TaskInstanceDao taskInstanceDao;
+
     @Autowired
     ProcessInstanceMapper processInstanceMapper;
 
@@ -366,7 +370,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
             return result;
         }
         List<TaskInstance> taskInstanceList =
-                processService.findValidTaskListByProcessId(processId, processInstance.getTestFlag());
+                taskInstanceDao.findValidTaskListByProcessId(processId, processInstance.getTestFlag());
         addDependResultForTaskList(taskInstanceList);
         Map<String, Object> resultMap = new HashMap<>();
         resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
@@ -444,7 +448,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
             return result;
         }
 
-        TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
+        TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId);
         if (taskInstance == null) {
             logger.error("Task instance does not exist, projectCode:{}, taskInstanceId{}.", projectCode, taskId);
             putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
index 747903da38..ca34d74730 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
@@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.service.log.LogClient;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.text.MessageFormat;
 import java.util.HashMap;
@@ -63,7 +63,7 @@ public class LoggerServiceTest {
     private LoggerServiceImpl loggerService;
 
     @Mock
-    private ProcessService processService;
+    private TaskInstanceDao taskInstanceDao;
 
     @Mock
     private ProjectMapper projectMapper;
@@ -81,7 +81,7 @@ public class LoggerServiceTest {
     public void testQueryDataSourceList() {
 
         TaskInstance taskInstance = new TaskInstance();
-        Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+        Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
         Result result = loggerService.queryLog(2, 1, 1);
         // TASK_INSTANCE_NOT_FOUND
         Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue());
@@ -98,7 +98,7 @@ public class LoggerServiceTest {
         // SUCCESS
         taskInstance.setHost("127.0.0.1:8080");
         taskInstance.setLogPath("/temp/log");
-        Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+        Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
         result = loggerService.queryLog(1, 1, 1);
         Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
     }
@@ -107,7 +107,7 @@ public class LoggerServiceTest {
     public void testGetLogBytes() {
 
         TaskInstance taskInstance = new TaskInstance();
-        Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+        Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
 
         // task instance is null
         try {
@@ -146,7 +146,7 @@ public class LoggerServiceTest {
         Map<String, Object> result = new HashMap<>();
         putMsg(result, Status.SUCCESS, projectCode);
         TaskInstance taskInstance = new TaskInstance();
-        Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+        Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
         TaskDefinition taskDefinition = new TaskDefinition();
         taskDefinition.setProjectCode(projectCode);
         taskDefinition.setCode(1L);
@@ -156,7 +156,7 @@ public class LoggerServiceTest {
         taskInstance.setHost("127.0.0.1:8080");
         taskInstance.setLogPath("/temp/log");
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result);
-        Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+        Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
         Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
         result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
         Assertions.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
@@ -184,7 +184,7 @@ public class LoggerServiceTest {
         taskInstance.setLogPath("/temp/log");
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG))
                 .thenReturn(result);
-        Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+        Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
         Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
         Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
                 .thenReturn(new byte[0]);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 05c9a0b044..31502b917e 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -28,6 +28,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
 import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT;
 import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isA;
 
 import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
@@ -67,6 +68,7 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.model.PageListingResult;
 import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
@@ -149,6 +151,9 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
     @Mock
     private ProcessService processService;
 
+    @Mock
+    private TaskDefinitionLogDao taskDefinitionLogDao;
+
     @Mock
     private ProcessInstanceService processInstanceService;
 
@@ -756,8 +761,10 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
         result.put(Constants.STATUS, Status.SUCCESS);
         Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1);
         Mockito.when(projectService.checkProjectAndAuth(user, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
-
         Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
+        Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode))
+                .thenReturn(getProcessTaskRelation());
+        Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(new ArrayList<>());
         Map<String, Object> taskNotNuLLRes =
                 processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10);
         Assertions.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index feefd1f6f2..f6bbefd8e4 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -57,6 +57,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
@@ -104,6 +105,9 @@ public class ProcessInstanceServiceTest {
     @Mock
     ProcessService processService;
 
+    @Mock
+    TaskInstanceDao taskInstanceDao;
+
     @Mock
     ProcessInstanceDao processInstanceDao;
 
@@ -408,7 +412,7 @@ public class ProcessInstanceServiceTest {
         when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
         when(processService.findProcessInstanceDetailById(processInstance.getId()))
                 .thenReturn(Optional.of(processInstance));
-        when(processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
+        when(taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
                 .thenReturn(taskInstanceList);
         when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res);
         Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1);
@@ -451,7 +455,7 @@ public class ProcessInstanceServiceTest {
         putMsg(result, Status.SUCCESS, projectCode);
         when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
-        when(processService.findTaskInstanceById(1)).thenReturn(null);
+        when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(null);
         Map<String, Object> taskNullRes =
                 processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1);
         Assertions.assertEquals(Status.TASK_INSTANCE_NOT_EXISTS, taskNullRes.get(Constants.STATUS));
@@ -461,7 +465,7 @@ public class ProcessInstanceServiceTest {
         taskInstance.setTaskType("HTTP");
         taskInstance.setProcessInstanceId(1);
         putMsg(result, Status.SUCCESS, projectCode);
-        when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+        when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
         TaskDefinition taskDefinition = new TaskDefinition();
         taskDefinition.setProjectCode(projectCode);
         when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
@@ -481,7 +485,7 @@ public class ProcessInstanceServiceTest {
         subTask.setTaskType("SUB_PROCESS");
         subTask.setProcessInstanceId(1);
         putMsg(result, Status.SUCCESS, projectCode);
-        when(processService.findTaskInstanceById(subTask.getId())).thenReturn(subTask);
+        when(taskInstanceDao.findTaskInstanceById(subTask.getId())).thenReturn(subTask);
         when(processService.findSubProcessInstance(subTask.getProcessInstanceId(), subTask.getId())).thenReturn(null);
         Map<String, Object> subprocessNotExistRes =
                 processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java
new file mode 100644
index 0000000000..5465c6d40d
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+
+/**
+ * Process Instance Map DAO
+ */
+public interface ProcessInstanceMapDao {
+
+    /**
+     * Update process instance map
+     * @param processInstanceMap process instance map
+     * @return result
+     */
+    int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
+
+    /**
+     * Create process instance map to DB.
+     * @param processInstanceMap process instance map
+     * @return result
+     */
+    int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
+
+    /**
+     * find work process map by parent process id and parent task id.
+     * @param parentWorkProcessId parentWorkProcessId
+     * @param parentTaskId        parentTaskId
+     * @return process instance map
+     */
+    ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
new file mode 100644
index 0000000000..9aaed9e576
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+
+import java.util.List;
+
+/**
+ * Task Instance DAO
+ */
+public interface TaskDefinitionDao {
+
+    /**
+     * Get list of task definition by process definition code
+     * @param processDefinitionCode process definition code
+     * @return list of task definition
+     */
+    List<TaskDefinition> getTaskDefinitionListByDefinition(long processDefinitionCode);
+
+    /**
+     * Query task definition by code and version
+     * @param taskCode task code
+     * @param taskDefinitionVersion task definition version
+     * @return task definition
+     */
+    TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
new file mode 100644
index 0000000000..8b6f290f49
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+
+import java.util.List;
+
+/**
+ * Task Definition Log DAO
+ */
+public interface TaskDefinitionLogDao {
+
+    /**
+     * Get task definition log list
+     * @param processTaskRelations list of process task relation
+     * @return list of task definition
+     */
+    List<TaskDefinitionLog> getTaskDefineLogList(List<ProcessTaskRelation> processTaskRelations);
+
+    /**
+     * Query task definition log list by process task relation list
+     * @param processTaskRelations list of task relation
+     * @return list of task definition log
+     */
+    List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
new file mode 100644
index 0000000000..2b2adc9a3f
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.List;
+
+/**
+ * Task Instance DAO
+ */
+public interface TaskInstanceDao {
+
+    /**
+     * Update or Insert task instance to DB.
+     * ID is null -> Insert
+     * ID is not null -> Update
+     * @param taskInstance task instance
+     * @return result
+     */
+    boolean upsertTaskInstance(TaskInstance taskInstance);
+
+    /**
+     * Insert task instance to DB.
+     * @param taskInstance task instance
+     * @return result
+     */
+    boolean insertTaskInstance(TaskInstance taskInstance);
+
+    /**
+     * Update task instance to DB.
+     * @param taskInstance task instance
+     * @return result
+     */
+    boolean updateTaskInstance(TaskInstance taskInstance);
+
+    /**
+     * Submit a task instance to DB.
+     * @param taskInstance task instance
+     * @param processInstance process instance
+     * @return task instance
+     */
+    TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance);
+
+    /**
+     * Query list of valid task instance by process instance id
+     * @param processInstanceId processInstanceId
+     * @param testFlag test flag
+     * @return list of valid task instance
+     */
+    List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag);
+
+    /**
+     * find previous task list by work process id
+     * @param processInstanceId processInstanceId
+     * @return task instance list
+     */
+    List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId);
+
+    /**
+     * find task instance by id
+     * @param taskId task id
+     * @return task instance
+     */
+    TaskInstance findTaskInstanceById(Integer taskId);
+
+    /**
+     * find task instance list by id list
+     * @param idList task id list
+     * @return task instance list
+     */
+    List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java
new file mode 100644
index 0000000000..94dbc075bd
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
+
+import lombok.NonNull;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+/**
+ * Process Instance Map Dao implementation
+ */
+@Repository
+public class ProcessInstanceMapDaoImpl implements ProcessInstanceMapDao {
+
+    @Autowired
+    private ProcessInstanceMapMapper processInstanceMapMapper;
+
+    @Override
+    public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
+        return processInstanceMapMapper.updateById(processInstanceMap);
+    }
+
+    @Override
+    public int createWorkProcessInstanceMap(@NonNull ProcessInstanceMap processInstanceMap) {
+        return processInstanceMapMapper.insert(processInstanceMap);
+    }
+
+    @Override
+    public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
+        return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
new file mode 100644
index 0000000000..33b6033454
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Task Definition DAO Implementation
+ */
+@Repository
+public class TaskDefinitionDaoImpl implements TaskDefinitionDao {
+
+    private final Logger logger = LoggerFactory.getLogger(TaskDefinitionDaoImpl.class);
+
+    @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Autowired
+    private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Override
+    public List<TaskDefinition> getTaskDefinitionListByDefinition(long processDefinitionCode) {
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+        if (processDefinition == null) {
+            logger.error("Cannot find process definition, code: {}", processDefinitionCode);
+            return Lists.newArrayList();
+        }
+
+        List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper
+                .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
+        Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
+        processTaskRelations.stream().filter(p -> p.getPostTaskCode() > 0)
+                .forEach(p -> taskDefinitionSet.add(new TaskDefinition(p.getPostTaskCode(), p.getPostTaskVersion())));
+
+        if (taskDefinitionSet.isEmpty()) {
+            return Lists.newArrayList();
+        }
+        List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
+        return Lists.newArrayList(taskDefinitionLogs);
+    }
+
+    @Override
+    public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) {
+        return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
+    }
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
new file mode 100644
index 0000000000..efd000a410
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Task Definition Log DAP implementation
+ */
+@Repository
+public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao {
+
+    private final Logger logger = LoggerFactory.getLogger(TaskDefinitionLogDaoImpl.class);
+
+    @Autowired
+    private TaskDefinitionDao taskDefinitionDao;
+
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    @Override
+    public List<TaskDefinitionLog> getTaskDefineLogList(List<ProcessTaskRelation> processTaskRelations) {
+        Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+            if (processTaskRelation.getPreTaskCode() > 0) {
+                taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(),
+                        processTaskRelation.getPreTaskVersion()));
+            }
+            if (processTaskRelation.getPostTaskCode() > 0) {
+                taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(),
+                        processTaskRelation.getPostTaskVersion()));
+            }
+        }
+        if (taskDefinitionSet.isEmpty()) {
+            return Lists.newArrayList();
+        }
+        return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
+    }
+
+    @Override
+    public List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
+        List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+        Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+            if (processTaskRelation.getPreTaskCode() > 0) {
+                taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
+            }
+            if (processTaskRelation.getPostTaskCode() > 0) {
+                taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
+            }
+        }
+        taskCodeVersionMap.forEach((code, version) -> {
+            taskDefinitionLogs.add((TaskDefinitionLog) taskDefinitionDao.findTaskDefinition(code, version));
+        });
+        return taskDefinitionLogs;
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
new file mode 100644
index 0000000000..648085cbd3
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+/**
+ * Task Instance DAO implementation
+ */
+@Repository
+public class TaskInstanceDaoImpl implements TaskInstanceDao {
+
+    private final Logger logger = LoggerFactory.getLogger(TaskInstanceDaoImpl.class);
+
+    @Autowired
+    private TaskInstanceMapper taskInstanceMapper;
+
+    @Autowired
+    private ProcessInstanceMapper processInstanceMapper;
+
+    @Autowired
+    private ProcessInstanceMapDao processInstanceMapDao;
+
+    @Override
+    public boolean upsertTaskInstance(TaskInstance taskInstance) {
+        if (taskInstance.getId() != null) {
+            return updateTaskInstance(taskInstance);
+        } else {
+            return insertTaskInstance(taskInstance);
+        }
+    }
+
+    @Override
+    public boolean insertTaskInstance(TaskInstance taskInstance) {
+        int count = taskInstanceMapper.insert(taskInstance);
+        return count > 0;
+    }
+
+    @Override
+    public boolean updateTaskInstance(TaskInstance taskInstance) {
+        int count = taskInstanceMapper.updateById(taskInstance);
+        return count > 0;
+    }
+
+    @Override
+    public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
+        WorkflowExecutionStatus processInstanceState = processInstance.getState();
+        if (processInstanceState.isFinished() || processInstanceState == WorkflowExecutionStatus.READY_STOP) {
+            logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}",
+                    processInstance.getId(),
+                    processInstanceState,
+                    taskInstance.getTaskCode());
+            return null;
+        }
+        if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) {
+            taskInstance.setState(TaskExecutionStatus.PAUSE);
+        }
+        taskInstance.setExecutorId(processInstance.getExecutorId());
+        taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
+        if (taskInstance.getSubmitTime() == null) {
+            taskInstance.setSubmitTime(new Date());
+        }
+        if (taskInstance.getFirstSubmitTime() == null) {
+            taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
+        }
+        boolean saveResult = upsertTaskInstance(taskInstance);
+        if (!saveResult) {
+            return null;
+        }
+        return taskInstance;
+    }
+
+    private TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
+        TaskExecutionStatus state = taskInstance.getState();
+        if (state == TaskExecutionStatus.RUNNING_EXECUTION
+                || state == TaskExecutionStatus.DELAY_EXECUTION
+                || state == TaskExecutionStatus.KILL
+                || state == TaskExecutionStatus.DISPATCH) {
+            return state;
+        }
+
+        if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
+            state = TaskExecutionStatus.PAUSE;
+        } else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP
+                || !checkProcessStrategy(taskInstance, processInstance)) {
+            state = TaskExecutionStatus.KILL;
+        } else {
+            state = TaskExecutionStatus.SUBMITTED_SUCCESS;
+        }
+        return state;
+    }
+
+    private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) {
+        FailureStrategy failureStrategy = processInstance.getFailureStrategy();
+        if (failureStrategy == FailureStrategy.CONTINUE) {
+            return true;
+        }
+        List<TaskInstance> taskInstances =
+                this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag());
+
+        for (TaskInstance task : taskInstances) {
+            if (task.getState() == TaskExecutionStatus.FAILURE
+                    && task.getRetryTimes() >= task.getMaxRetryTimes()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag) {
+        return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag);
+    }
+
+    @Override
+    public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) {
+        ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
+        return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO,
+                processInstance.getTestFlag());
+    }
+
+    @Override
+    public TaskInstance findTaskInstanceById(Integer taskId) {
+        return taskInstanceMapper.selectById(taskId);
+    }
+
+    @Override
+    public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
+        if (CollectionUtils.isEmpty(idList)) {
+            return new ArrayList<>();
+        }
+        return taskInstanceMapper.selectBatchIds(idList);
+    }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 87c802b163..c12c1d62e4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
@@ -36,7 +37,6 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 
@@ -75,11 +75,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
     @Autowired
     private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
 
-    /**
-     * processService
-     */
     @Autowired
-    private ProcessService processService;
+    private TaskInstanceDao taskInstanceDao;
 
     /**
      * executor dispatcher
@@ -264,7 +261,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
      * @return taskInstance is final state
      */
     public boolean taskInstanceIsFinalState(int taskInstanceId) {
-        TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
+        TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
         return taskInstance.getState().isFinished();
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
index 51a0830561..18168056f0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.event;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -46,6 +47,9 @@ public class TaskDelayEventHandler implements TaskEventHandler {
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
     @Autowired
     private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
@@ -85,7 +89,7 @@ public class TaskDelayEventHandler implements TaskEventHandler {
             taskInstance.setExecutePath(taskEvent.getExecutePath());
             taskInstance.setPid(taskEvent.getProcessId());
             taskInstance.setAppLink(taskEvent.getAppIds());
-            if (!processService.updateTaskInstance(taskInstance)) {
+            if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
                 throw new TaskEventHandleError("Handle task delay event error, update taskInstance to db failed");
             }
             sendAckToWorker(taskEvent);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
index 83d11db80d..08c13f2de1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
@@ -19,12 +19,12 @@ package org.apache.dolphinscheduler.server.master.event;
 
 import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +40,7 @@ public class TaskDispatchEventHandler implements TaskEventHandler {
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
     @Autowired
-    private ProcessService processService;
+    private TaskInstanceDao taskInstanceDao;
 
     @Override
     public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
@@ -68,7 +68,7 @@ public class TaskDispatchEventHandler implements TaskEventHandler {
         taskInstance.setState(TaskExecutionStatus.DISPATCH);
         taskInstance.setHost(taskEvent.getWorkerAddress());
         try {
-            if (!processService.updateTaskInstance(taskInstance)) {
+            if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
                 throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed");
             }
         } catch (Exception ex) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
index 5f36248d9e..a185037ae4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.event;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -50,6 +51,9 @@ public class TaskResultEventHandler implements TaskEventHandler {
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
     @Autowired
     private MasterConfig masterConfig;
 
@@ -92,7 +96,7 @@ public class TaskResultEventHandler implements TaskEventHandler {
             taskInstance.setEndTime(taskEvent.getEndTime());
             taskInstance.setVarPool(taskEvent.getVarPool());
             processService.changeOutParam(taskInstance);
-            processService.updateTaskInstance(taskInstance);
+            taskInstanceDao.updateTaskInstance(taskInstance);
             sendAckToWorker(taskEvent);
         } catch (Exception ex) {
             TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
index 3dc86b3a37..85a10da40d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
@@ -20,13 +20,13 @@ package org.apache.dolphinscheduler.server.master.event;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.Optional;
 
@@ -47,7 +47,7 @@ public class TaskRunningEventHandler implements TaskEventHandler {
     private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
     @Autowired
-    private ProcessService processService;
+    private TaskInstanceDao taskInstanceDao;
 
     @Override
     public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
@@ -84,7 +84,7 @@ public class TaskRunningEventHandler implements TaskEventHandler {
             taskInstance.setExecutePath(taskEvent.getExecutePath());
             taskInstance.setPid(taskEvent.getProcessId());
             taskInstance.setAppLink(taskEvent.getAppIds());
-            if (!processService.updateTaskInstance(taskInstance)) {
+            if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
                 throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed");
             }
             sendAckToWorker(taskEvent);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java
index c1520b07df..cf80bfbd19 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java
@@ -19,13 +19,13 @@ package org.apache.dolphinscheduler.server.master.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteStartCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteThreadPool;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@ public class TaskExecuteStartProcessor implements NettyRequestProcessor {
     private StreamTaskExecuteThreadPool streamTaskExecuteThreadPool;
 
     @Autowired
-    private ProcessService processService;
+    private TaskDefinitionDao taskDefinitionDao;
 
     @Override
     public void process(Channel channel, Command command) {
@@ -57,7 +57,7 @@ public class TaskExecuteStartProcessor implements NettyRequestProcessor {
                 JSONUtils.parseObject(command.getBody(), TaskExecuteStartCommand.class);
         logger.info("taskExecuteStartCommand: {}", taskExecuteStartCommand);
 
-        TaskDefinition taskDefinition = processService.findTaskDefinition(
+        TaskDefinition taskDefinition = taskDefinitionDao.findTaskDefinition(
                 taskExecuteStartCommand.getTaskDefinitionCode(), taskExecuteStartCommand.getTaskDefinitionVersion());
         if (taskDefinition == null) {
             logger.error("Task definition can not be found, taskDefinitionCode:{}, taskDefinitionVersion:{}",
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 7debb1f895..236dde1760 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@@ -73,6 +75,12 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
     @Autowired
     private ProcessInstanceDao processInstanceDao;
 
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    @Autowired
+    private TaskDefinitionLogDao taskDefinitionLogDao;
+
     @Autowired
     private MasterConfig masterConfig;
 
@@ -183,7 +191,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
                                 processAlertManager,
                                 masterConfig,
                                 stateWheelExecuteThread,
-                                curingGlobalParamsService);
+                                curingGlobalParamsService,
+                                taskInstanceDao,
+                                taskDefinitionLogDao);
                         processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
                         workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
                                 processInstance.getId()));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 15abef270d..ddecc35043 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -89,6 +90,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
 
     protected ProcessService processService;
 
+    protected TaskInstanceDao taskInstanceDao;
+
     protected ExecutorDispatcher dispatcher;
 
     protected ProcessTaskRelationMapper processTaskRelationMapper;
@@ -118,6 +121,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
         this.dispatcher = SpringApplicationContext.getBean(ExecutorDispatcher.class);
         this.taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
         this.processTaskRelationMapper = SpringApplicationContext.getBean(ProcessTaskRelationMapper.class);
+        this.taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
         this.streamTaskInstanceExecCacheManager =
                 SpringApplicationContext.getBean(StreamTaskInstanceExecCacheManager.class);
         this.taskDefinition = taskDefinition;
@@ -133,7 +137,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
         // submit task
         processService.updateTaskDefinitionResources(taskDefinition);
         taskInstance = newTaskInstance(taskDefinition);
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
 
         // add cache
         streamTaskInstanceExecCacheManager.cache(taskInstance.getId(), this);
@@ -148,7 +152,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
         TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance);
         if (taskExecutionContext == null) {
             taskInstance.setState(TaskExecutionStatus.FAILURE);
-            processService.saveTaskInstance(taskInstance);
+            taskInstanceDao.upsertTaskInstance(taskInstance);
             return;
         }
 
@@ -175,7 +179,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
 
             // set task instance fail
             taskInstance.setState(TaskExecutionStatus.FAILURE);
-            processService.saveTaskInstance(taskInstance);
+            taskInstanceDao.upsertTaskInstance(taskInstance);
             return;
         }
 
@@ -416,7 +420,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
         taskInstance.setEndTime(taskEvent.getEndTime());
         taskInstance.setVarPool(taskEvent.getVarPool());
         processService.changeOutParam(taskInstance);
-        processService.updateTaskInstance(taskInstance);
+        taskInstanceDao.updateTaskInstance(taskInstance);
 
         // send ack
         sendAckToWorker(taskEvent);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 175416949f..6bca587ac0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -58,6 +58,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -134,6 +136,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     private ProcessInstanceDao processInstanceDao;
 
+    private TaskInstanceDao taskInstanceDao;
+
+    private TaskDefinitionLogDao taskDefinitionLogDao;
+
     private final ProcessAlertManager processAlertManager;
 
     private final NettyExecutorManager nettyExecutorManager;
@@ -245,7 +251,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                                    @NonNull ProcessAlertManager processAlertManager,
                                    @NonNull MasterConfig masterConfig,
                                    @NonNull StateWheelExecuteThread stateWheelExecuteThread,
-                                   @NonNull CuringParamsService curingParamsService) {
+                                   @NonNull CuringParamsService curingParamsService,
+                                   @NonNull TaskInstanceDao taskInstanceDao,
+                                   @NonNull TaskDefinitionLogDao taskDefinitionLogDao) {
         this.processService = processService;
         this.commandService = commandService;
         this.processInstanceDao = processInstanceDao;
@@ -254,6 +262,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         this.processAlertManager = processAlertManager;
         this.stateWheelExecuteThread = stateWheelExecuteThread;
         this.curingParamsService = curingParamsService;
+        this.taskInstanceDao = taskInstanceDao;
+        this.taskDefinitionLogDao = taskDefinitionLogDao;
         this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
         TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
     }
@@ -347,7 +357,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
         TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
         if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
-            TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+            TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId());
             ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
             taskProcessor.action(TaskAction.DISPATCH);
             this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
@@ -357,7 +367,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
             boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
             if (acquireTaskGroup) {
-                TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+                TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId());
                 ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
                 taskProcessor.action(TaskAction.DISPATCH);
                 return true;
@@ -502,7 +512,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      */
     public void refreshTaskInstance(int taskInstanceId) {
         logger.info("task instance update: {} ", taskInstanceId);
-        TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
+        TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
         if (taskInstance == null) {
             logger.error("can not find task instance, id:{}", taskInstanceId);
             return;
@@ -775,7 +785,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         List<ProcessTaskRelation> processTaskRelations =
                 processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
         List<TaskDefinitionLog> taskDefinitionLogs =
-                processService.getTaskDefineLogListByRelation(processTaskRelations);
+                taskDefinitionLogDao.getTaskDefineLogListByRelation(processTaskRelations);
         List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
         forbiddenTaskMap.clear();
 
@@ -815,7 +825,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                     processInstance.getRunTimes(),
                     processInstance.getRecovery());
             List<TaskInstance> validTaskInstanceList =
-                    processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
+                    taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
+                            processInstance.getTestFlag());
             for (TaskInstance task : validTaskInstanceList) {
                 try {
                     LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());
@@ -831,7 +842,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                         TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
                         if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) {
                             task.setFlag(Flag.NO);
-                            processService.updateTaskInstance(task);
+                            taskInstanceDao.updateTaskInstance(task);
                             continue;
                         }
                     }
@@ -852,7 +863,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                         if (task.getState().isNeedFaultTolerance()) {
                             logger.info("TaskInstance needs fault tolerance, will be added to standby list.");
                             task.setFlag(Flag.NO);
-                            processService.updateTaskInstance(task);
+                            taskInstanceDao.updateTaskInstance(task);
 
                             // tolerantTaskInstance add to standby list directly
                             TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
@@ -953,7 +964,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 if (taskInstance.getId() != oldTaskInstanceId) {
                     TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
                     oldTaskInstance.setFlag(Flag.NO);
-                    processService.updateTaskInstance(oldTaskInstance);
+                    taskInstanceDao.updateTaskInstance(oldTaskInstance);
                     validTaskMap.remove(taskInstance.getTaskCode());
                     activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
                 }
@@ -1780,7 +1791,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             if (taskInstanceId == null || taskInstanceId.equals(0)) {
                 continue;
             }
-            TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
+            TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
             if (taskInstance == null || taskInstance.getState().isFinished()) {
                 continue;
             }
@@ -1813,7 +1824,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             }
             // stop tasks which is retrying if forced success happens
             if (task.taskCanRetry()) {
-                TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
+                TaskInstance retryTask = taskInstanceDao.findTaskInstanceById(task.getId());
                 if (retryTask != null && retryTask.getState().isForceSuccess()) {
                     task.setState(retryTask.getState());
                     logger.info(
@@ -1888,7 +1899,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                     .map(Integer::valueOf)
                     .collect(Collectors.toList());
             if (CollectionUtils.isNotEmpty(startTaskInstanceIds)) {
-                return processService.findTaskInstanceByIdList(startTaskInstanceIds);
+                return taskInstanceDao.findTaskInstanceByIdList(startTaskInstanceIds);
             }
         }
         return Collections.emptyList();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 029ae7471a..ff39833142 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -49,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
@@ -123,6 +124,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     protected ProcessService processService;
 
+    protected TaskInstanceDao taskInstanceDao;
+
     protected ProcessInstanceDao processInstanceDao;
 
     protected MasterConfig masterConfig;
@@ -140,6 +143,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
         taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
         curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class);
+        taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
         this.taskInstance = taskInstance;
         this.processInstance = processInstance;
         this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
@@ -306,7 +310,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         if (verifyTenantIsNull(tenant, taskInstance)) {
             logger.info("Task state changes to {}", TaskExecutionStatus.FAILURE);
             taskInstance.setState(TaskExecutionStatus.FAILURE);
-            processService.saveTaskInstance(taskInstance);
+            taskInstanceDao.upsertTaskInstance(taskInstance);
             return null;
         }
         // set queue for process instance, user-specified queue takes precedence over tenant queue
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index f7eac540d9..0b4848b1df 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -77,7 +77,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
         this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
         this.taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
         this.taskInstance.setStartTime(new Date());
-        this.processService.saveTaskInstance(taskInstance);
+        this.taskInstanceDao.upsertTaskInstance(taskInstance);
         this.dependentParameters = taskInstance.getDependency();
         this.blockingParam = JSONUtils.parseObject(taskInstance.getTaskParams(), BlockingParameters.class);
     }
@@ -87,7 +87,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
         // todo: task cannot be pause
         taskInstance.setState(TaskExecutionStatus.PAUSE);
         taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
         logger.info("blocking task has been paused");
         return true;
     }
@@ -96,7 +96,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
     protected boolean killTask() {
         taskInstance.setState(TaskExecutionStatus.KILL);
         taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
         logger.info("blocking task has been killed");
         return true;
     }
@@ -171,7 +171,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
 
     private void setConditionResult() {
 
-        List<TaskInstance> taskInstances = processService
+        List<TaskInstance> taskInstances = taskInstanceDao
                 .findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
         for (TaskInstance task : taskInstances) {
             completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
@@ -204,7 +204,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
         }
         taskInstance.setState(TaskExecutionStatus.SUCCESS);
         taskInstance.setEndTime(new Date());
-        processService.updateTaskInstance(taskInstance);
+        taskInstanceDao.updateTaskInstance(taskInstance);
         logger.info("blocking task execute complete, blocking:{}", isBlocked);
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index aa57548a06..cddc86ddaf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -146,7 +146,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
     public boolean killTask() {
 
         try {
-            taskInstance = processService.findTaskInstanceById(taskInstance.getId());
+            taskInstance = taskInstanceDao.findTaskInstanceById(taskInstance.getId());
             if (taskInstance == null) {
                 return true;
             }
@@ -156,7 +156,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
             // we don't wait the kill response
             taskInstance.setState(TaskExecutionStatus.KILL);
             taskInstance.setEndTime(new Date());
-            processService.updateTaskInstance(taskInstance);
+            taskInstanceDao.updateTaskInstance(taskInstance);
             if (StringUtils.isNotEmpty(taskInstance.getHost())) {
                 killRemoteTask();
             }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index f04e950223..23e0e65071 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -99,7 +99,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
     protected boolean pauseTask() {
         this.taskInstance.setState(TaskExecutionStatus.PAUSE);
         this.taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
         return true;
     }
 
@@ -120,7 +120,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
     protected boolean killTask() {
         this.taskInstance.setState(TaskExecutionStatus.KILL);
         this.taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
         return true;
     }
 
@@ -138,13 +138,13 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
         this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
         taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
         taskInstance.setStartTime(new Date());
-        this.processService.saveTaskInstance(taskInstance);
+        this.taskInstanceDao.upsertTaskInstance(taskInstance);
         this.dependentParameters = taskInstance.getDependency();
     }
 
     private void setConditionResult() {
 
-        List<TaskInstance> taskInstances = processService
+        List<TaskInstance> taskInstances = taskInstanceDao
                 .findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
         for (TaskInstance task : taskInstances) {
             completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
@@ -194,6 +194,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
                 (conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
         taskInstance.setState(status);
         taskInstance.setEndTime(new Date());
-        processService.updateTaskInstance(taskInstance);
+        taskInstanceDao.updateTaskInstance(taskInstance);
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index ddcdc33c58..2edd0e97ea 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -114,7 +114,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
             taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
             taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
             taskInstance.setStartTime(new Date());
-            processService.updateTaskInstance(taskInstance);
+            taskInstanceDao.updateTaskInstance(taskInstance);
             initDependParameters();
             logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate);
             return true;
@@ -231,7 +231,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
     protected boolean pauseTask() {
         this.taskInstance.setState(TaskExecutionStatus.PAUSE);
         this.taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
         return true;
     }
 
@@ -239,7 +239,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
     protected boolean killTask() {
         this.taskInstance.setState(TaskExecutionStatus.KILL);
         this.taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
         return true;
     }
 
@@ -290,7 +290,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
         status = (result == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
         taskInstance.setState(status);
         taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
     }
 
     @Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index fe1896ec85..1828e269ea 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -133,7 +133,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
             taskInstance.setState(TaskExecutionStatus.of(subProcessInstance.getState().getCode()));
             taskInstance.setEndTime(new Date());
             dealFinish();
-            processService.saveTaskInstance(taskInstance);
+            taskInstanceDao.upsertTaskInstance(taskInstance);
         }
     }
 
@@ -201,7 +201,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
         taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
         taskInstance.setStartTime(new Date());
-        processService.updateTaskInstance(taskInstance);
+        taskInstanceDao.updateTaskInstance(taskInstance);
         logger.info("set sub work flow {} task {} state: {}",
                 processInstance.getId(),
                 taskInstance.getId(),
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index de571cb657..dfd5a02fb7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -80,7 +80,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
         taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
         taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
         taskInstance.setStartTime(new Date());
-        processService.updateTaskInstance(taskInstance);
+        taskInstanceDao.updateTaskInstance(taskInstance);
 
         if (!this.taskInstance().getState().isFinished()) {
             setSwitchResult();
@@ -104,7 +104,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     protected boolean pauseTask() {
         this.taskInstance.setState(TaskExecutionStatus.PAUSE);
         this.taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
         return true;
     }
 
@@ -112,7 +112,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     protected boolean killTask() {
         this.taskInstance.setState(TaskExecutionStatus.KILL);
         this.taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
         return true;
     }
 
@@ -127,7 +127,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     }
 
     private boolean setSwitchResult() {
-        List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
+        List<TaskInstance> taskInstances = taskInstanceDao.findValidTaskListByProcessId(
                 taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
         Map<String, TaskExecutionStatus> completeTaskList = new HashMap<>();
         for (TaskInstance task : taskInstances) {
@@ -188,7 +188,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
                 (conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
         taskInstance.setEndTime(new Date());
         taskInstance.setState(status);
-        processService.updateTaskInstance(taskInstance);
+        taskInstanceDao.updateTaskInstance(taskInstance);
     }
 
     public String setTaskParams(String content, String rgex) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index cf28a6bc8e..5d6048c208 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
@@ -75,12 +76,15 @@ public class MasterFailoverService {
 
     private final LogClient logClient;
 
+    private final TaskInstanceDao taskInstanceDao;
+
     public MasterFailoverService(@NonNull RegistryClient registryClient,
                                  @NonNull MasterConfig masterConfig,
                                  @NonNull ProcessService processService,
                                  @NonNull NettyExecutorManager nettyExecutorManager,
                                  @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager,
-                                 @NonNull LogClient logClient) {
+                                 @NonNull LogClient logClient,
+                                 @NonNull TaskInstanceDao taskInstanceDao) {
         this.registryClient = registryClient;
         this.masterConfig = masterConfig;
         this.processService = processService;
@@ -88,7 +92,7 @@ public class MasterFailoverService {
         this.localAddress = masterConfig.getMasterAddress();
         this.processInstanceExecCacheManager = processInstanceExecCacheManager;
         this.logClient = logClient;
-
+        this.taskInstanceDao = taskInstanceDao;
     }
 
     /**
@@ -164,7 +168,7 @@ public class MasterFailoverService {
                 processInstance.setProcessDefinition(processDefinition);
                 int processInstanceId = processInstance.getId();
                 List<TaskInstance> taskInstanceList =
-                        processService.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
+                        taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
                 for (TaskInstance taskInstance : taskInstanceList) {
                     try {
                         LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());
@@ -249,7 +253,7 @@ public class MasterFailoverService {
         }
 
         taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
     }
 
     private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index 18de752b19..30522b8e4a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
@@ -73,12 +74,15 @@ public class WorkerFailoverService {
     private final LogClient logClient;
     private final String localAddress;
 
+    private final TaskInstanceDao taskInstanceDao;
+
     public WorkerFailoverService(@NonNull RegistryClient registryClient,
                                  @NonNull MasterConfig masterConfig,
                                  @NonNull ProcessService processService,
                                  @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
                                  @NonNull ProcessInstanceExecCacheManager cacheManager,
-                                 @NonNull LogClient logClient) {
+                                 @NonNull LogClient logClient,
+                                 @NonNull TaskInstanceDao taskInstanceDao) {
         this.registryClient = registryClient;
         this.masterConfig = masterConfig;
         this.processService = processService;
@@ -86,6 +90,7 @@ public class WorkerFailoverService {
         this.cacheManager = cacheManager;
         this.logClient = logClient;
         this.localAddress = masterConfig.getMasterAddress();
+        this.taskInstanceDao = taskInstanceDao;
     }
 
     /**
@@ -183,7 +188,7 @@ public class WorkerFailoverService {
 
         taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
         taskInstance.setFlag(Flag.NO);
-        processService.saveTaskInstance(taskInstance);
+        taskInstanceDao.upsertTaskInstance(taskInstance);
 
         TaskStateEvent stateEvent = TaskStateEvent.builder()
                 .processInstanceId(processInstance.getId())
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
index 3932dd693f..a28ee36cb0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.utils;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -48,6 +49,8 @@ public class DependentExecute {
      */
     private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
 
+    private final TaskInstanceDao taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
+
     /**
      * depend item list
      */
@@ -154,7 +157,7 @@ public class DependentExecute {
         DependResult result;
         TaskInstance taskInstance = null;
         List<TaskInstance> taskInstanceList =
-                processService.findValidTaskListByProcessId(processInstance.getId(), testFlag);
+                taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), testFlag);
 
         for (TaskInstance task : taskInstanceList) {
             if (task.getTaskCode() == taskCode) {
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
index 1429ec1567..c36f38d8f2 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -62,6 +64,10 @@ public class BlockingTaskTest {
 
     private ProcessService processService;
 
+    private TaskInstanceDao taskInstanceDao;
+
+    private TaskDefinitionDao taskDefinitionDao;
+
     private ProcessInstance processInstance;
 
     private MasterConfig config;
@@ -80,9 +86,14 @@ public class BlockingTaskTest {
 
         // mock process service
         processService = Mockito.mock(ProcessService.class);
-
         Mockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
+        taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+        Mockito.when(SpringApplicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
+        taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
+        Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
+
         // mock process instance
         processInstance = getProcessInstance();
         Mockito.when(processService
@@ -93,7 +104,7 @@ public class BlockingTaskTest {
         taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
         taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
         taskDefinition.setTimeout(0);
-        Mockito.when(processService.findTaskDefinition(1L, 1))
+        Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
                 .thenReturn(taskDefinition);
     }
 
@@ -187,24 +198,23 @@ public class BlockingTaskTest {
                 .submitTask(processInstance, taskInstance))
                 .thenReturn(taskInstance);
 
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .findTaskInstanceById(taskInstance.getId()))
                 .thenReturn(taskInstance);
 
         // for BlockingTaskExecThread.initTaskParameters
-        Mockito.when(processService
-                .saveTaskInstance(taskInstance))
+        Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
                 .thenReturn(true);
 
         // for BlockingTaskExecThread.updateTaskState
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .updateTaskInstance(taskInstance))
                 .thenReturn(true);
 
         // for BlockingTaskExecThread.waitTaskQuit
         List<TaskInstance> conditions = getTaskInstanceForValidTaskList(expectResults);
         Mockito.when(
-                processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
+                taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
                 .thenReturn(conditions);
         return taskInstance;
     }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
index 035a811f76..ae8b6022a0 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -61,6 +63,10 @@ public class ConditionsTaskTest {
 
     private ProcessInstance processInstance;
 
+    private TaskInstanceDao taskInstanceDao;
+
+    private TaskDefinitionDao taskDefinitionDao;
+
     @BeforeEach
     public void before() {
         ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
@@ -75,6 +81,12 @@ public class ConditionsTaskTest {
         processService = Mockito.mock(ProcessService.class);
         Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
+        taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+        Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
+        taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
+        Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
+
         processInstance = getProcessInstance();
         Mockito.when(processService
                 .findProcessInstanceById(processInstance.getId()))
@@ -84,7 +96,7 @@ public class ConditionsTaskTest {
         taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
         taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
         taskDefinition.setTimeout(0);
-        Mockito.when(processService.findTaskDefinition(1L, 1))
+        Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
                 .thenReturn(taskDefinition);
     }
 
@@ -96,22 +108,21 @@ public class ConditionsTaskTest {
                 .submitTask(processInstance, taskInstance))
                 .thenReturn(taskInstance);
         // for MasterBaseTaskExecThread.call
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .findTaskInstanceById(taskInstance.getId()))
                 .thenReturn(taskInstance);
         // for ConditionsTaskExecThread.initTaskParameters
-        Mockito.when(processService
-                .saveTaskInstance(taskInstance))
+        Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
                 .thenReturn(true);
         // for ConditionsTaskExecThread.updateTaskState
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .updateTaskInstance(taskInstance))
                 .thenReturn(true);
 
         // for ConditionsTaskExecThread.waitTaskQuit
         List<TaskInstance> conditions = Stream.of(
                 getTaskInstanceForValidTaskList(expectResult)).collect(Collectors.toList());
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
                 .thenReturn(conditions);
         return taskInstance;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index e20e7c5572..0cfed4545c 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -70,6 +72,10 @@ public class DependentTaskTest {
 
     private ProcessService processService;
 
+    private TaskInstanceDao taskInstanceDao;
+
+    private TaskDefinitionDao taskDefinitionDao;
+
     /**
      * the dependent task to be tested
      * ProcessDefinition  id=1
@@ -95,6 +101,12 @@ public class DependentTaskTest {
         processService = Mockito.mock(ProcessService.class);
         Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
+        taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+        Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
+        taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
+        Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
+
         processInstance = getProcessInstance();
         taskInstance = getTaskInstance();
 
@@ -110,20 +122,19 @@ public class DependentTaskTest {
                 .thenAnswer(i -> taskInstance);
 
         // for DependentTaskExecThread.initTaskParameters
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .updateTaskInstance(Mockito.any()))
                 .thenReturn(true);
         // for DependentTaskExecThread.updateTaskState
-        Mockito.when(processService
-                .saveTaskInstance(Mockito.any()))
+        Mockito.when(taskInstanceDao.upsertTaskInstance(Mockito.any()))
                 .thenReturn(true);
 
         // for DependentTaskExecThread.waitTaskQuit
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .findTaskInstanceById(1000))
                 .thenAnswer(i -> taskInstance);
 
-        Mockito.when(processService.findTaskDefinition(TASK_CODE, TASK_VERSION))
+        Mockito.when(taskDefinitionDao.findTaskDefinition(TASK_CODE, TASK_VERSION))
                 .thenReturn(getTaskDefinition());
     }
 
@@ -155,7 +166,7 @@ public class DependentTaskTest {
                 .thenReturn(dependentProcessInstance);
 
         // for DependentExecute.getDependTaskResult
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .findValidTaskListByProcessId(200, 0))
                 .thenReturn(Stream.of(
                         getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.SUCCESS, DEPEND_TASK_CODE_A,
@@ -177,7 +188,7 @@ public class DependentTaskTest {
                 .thenReturn(dependentProcessInstance);
 
         // for DependentExecute.getDependTaskResult
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .findValidTaskListByProcessId(200, 0))
                 .thenReturn(Stream.of(
                         getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.FAILURE, DEPEND_TASK_CODE_A,
@@ -228,13 +239,13 @@ public class DependentTaskTest {
                 .thenReturn(processInstance300);
 
         // for DependentExecute.getDependTaskResult
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .findValidTaskListByProcessId(200, 0))
                 .thenReturn(Stream.of(
                         getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.FAILURE, DEPEND_TASK_CODE_A,
                                 processInstance200))
                         .collect(Collectors.toList()));
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .findValidTaskListByProcessId(300, 0))
                 .thenReturn(Stream.of(
                         getTaskInstanceForValidTaskList(3000, TaskExecutionStatus.SUCCESS, DEPEND_TASK_CODE_B,
@@ -321,7 +332,7 @@ public class DependentTaskTest {
         // DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
 
         // for DependentExecute.getDependTaskResult
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .findValidTaskListByProcessId(200, 0))
                 .thenAnswer(i -> {
                     processInstance.setState(WorkflowExecutionStatus.READY_STOP);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
index 5694988847..4c2fafd63c 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -55,6 +56,8 @@ public class SubProcessTaskTest {
 
     private ProcessService processService;
 
+    private TaskInstanceDao taskInstanceDao;
+
     private ProcessInstance processInstance;
 
     private MockedStatic<ServerLifeCycleManager> mockedStaticServerLifeCycleManager;
@@ -72,11 +75,14 @@ public class SubProcessTaskTest {
         processService = Mockito.mock(ProcessService.class);
         Mockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
+        taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+        Mockito.when(SpringApplicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
         mockedStaticServerLifeCycleManager = Mockito.mockStatic(ServerLifeCycleManager.class);
         Mockito.when(ServerLifeCycleManager.isStopped()).thenReturn(false);
 
         processInstance = getProcessInstance();
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .updateTaskInstance(Mockito.any()))
                 .thenReturn(true);
 
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
index f5cd14decd..9bc05d283d 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
@@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@@ -53,6 +55,10 @@ public class SwitchTaskTest {
 
     private ProcessInstance processInstance;
 
+    private TaskInstanceDao taskInstanceDao;
+
+    private TaskDefinitionDao taskDefinitionDao;
+
     @BeforeEach
     public void before() {
         ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
@@ -67,6 +73,12 @@ public class SwitchTaskTest {
         processService = Mockito.mock(ProcessService.class);
         Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
+        taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+        Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
+        taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
+        Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
+
         processInstance = getProcessInstance();
         Mockito.when(processService
                 .findProcessInstanceById(processInstance.getId()))
@@ -78,7 +90,7 @@ public class SwitchTaskTest {
         taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
         taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
         taskDefinition.setTimeout(0);
-        Mockito.when(processService.findTaskDefinition(1L, 1))
+        Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
                 .thenReturn(taskDefinition);
         TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance);
 
@@ -87,15 +99,14 @@ public class SwitchTaskTest {
                 .submitTask(processInstance, taskInstance))
                 .thenReturn(taskInstance);
         // for MasterBaseTaskExecThread.call
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .findTaskInstanceById(taskInstance.getId()))
                 .thenReturn(taskInstance);
         // for SwitchTaskExecThread.initTaskParameters
-        Mockito.when(processService
-                .saveTaskInstance(taskInstance))
+        Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
                 .thenReturn(true);
         // for SwitchTaskExecThread.updateTaskState
-        Mockito.when(processService
+        Mockito.when(taskInstanceDao
                 .updateTaskInstance(taskInstance))
                 .thenReturn(true);
 
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index bbd4a321cd..0af8f73e7b 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -60,6 +61,9 @@ public class TaskPriorityQueueConsumerTest {
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
     @Autowired
     private ExecutorDispatcher dispatcher;
 
@@ -236,7 +240,7 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
 
-        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+        Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
 
         Boolean state = taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
         Assertions.assertNotNull(state);
@@ -264,7 +268,7 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
 
-        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+        Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
 
         TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup");
         taskPriorityQueue.put(taskPriority);
@@ -302,7 +306,7 @@ public class TaskPriorityQueueConsumerTest {
         taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
         taskInstance.setTaskDefine(taskDefinition);
 
-        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+        Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
 
         TaskPriority taskPriority = new TaskPriority();
         taskPriority.setTaskId(1);
@@ -333,7 +337,7 @@ public class TaskPriorityQueueConsumerTest {
         processDefinition.setUserId(2);
         taskInstance.setProcessDefine(processDefinition);
 
-        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+        Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
 
         TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup");
         taskPriorityQueue.put(taskPriority);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 4cf5900f99..f76109325c 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@@ -71,6 +73,9 @@ public class WorkflowExecuteRunnableTest {
 
     private ProcessInstance processInstance;
 
+    private TaskInstanceDao taskInstanceDao;
+
+    private TaskDefinitionLogDao taskDefinitionLogDao;
     private ProcessService processService;
 
     private CommandService commandService;
@@ -96,6 +101,8 @@ public class WorkflowExecuteRunnableTest {
         commandService = Mockito.mock(CommandService.class);
         processInstanceDao = Mockito.mock(ProcessInstanceDao.class);
         processInstance = Mockito.mock(ProcessInstance.class);
+        taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+        taskDefinitionLogDao = Mockito.mock(TaskDefinitionLogDao.class);
         Map<String, String> cmdParam = new HashMap<>();
         cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00");
         cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, "2020-01-20 23:00:00");
@@ -111,7 +118,8 @@ public class WorkflowExecuteRunnableTest {
         workflowExecuteThread = Mockito.spy(
                 new WorkflowExecuteRunnable(processInstance, commandService, processService, processInstanceDao,
                         nettyExecutorManager,
-                        processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
+                        processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService,
+                        taskInstanceDao, taskDefinitionLogDao));
         Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
         dag.setAccessible(true);
         dag.set(workflowExecuteThread, new DAG());
@@ -146,7 +154,7 @@ public class WorkflowExecuteRunnableTest {
             taskInstance4.setId(4);
             Map<String, String> cmdParam = new HashMap<>();
             cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
-            Mockito.when(processService.findTaskInstanceByIdList(
+            Mockito.when(taskInstanceDao.findTaskInstanceByIdList(
                     Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(),
                             taskInstance4.getId())))
                     .thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
index 0217748fa4..0f96a98f85 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
@@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -56,6 +58,10 @@ public class CommonTaskProcessorTest {
 
     private ProcessService processService;
 
+    private TaskInstanceDao taskInstanceDao;
+
+    private TaskDefinitionDao taskDefinitionDao;
+
     private CommonTaskProcessor commonTaskProcessor;
 
     @BeforeEach
@@ -69,6 +75,12 @@ public class CommonTaskProcessorTest {
         processService = Mockito.mock(ProcessService.class);
         Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
+        taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+        Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
+        taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
+        Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
+
         commonTaskProcessor = Mockito.mock(CommonTaskProcessor.class);
         Mockito.when(applicationContext.getBean(CommonTaskProcessor.class)).thenReturn(commonTaskProcessor);
 
@@ -76,7 +88,7 @@ public class CommonTaskProcessorTest {
         taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
         taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
         taskDefinition.setTimeout(0);
-        Mockito.when(processService.findTaskDefinition(1L, 1))
+        Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
                 .thenReturn(taskDefinition);
     }
 
@@ -107,7 +119,7 @@ public class CommonTaskProcessorTest {
         TaskDefinition taskDefinition = new TaskDefinition();
         taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
         taskInstance.setTaskDefine(taskDefinition);
-        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+        Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
         TaskExecutionContext taskExecutionContext = commonTaskProcessor.getTaskExecutionContext(taskInstance);
         Assertions.assertNull(taskExecutionContext);
     }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index f7b41467bd..2e9dc23936 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -73,6 +74,9 @@ public class FailoverServiceTest {
     @Mock
     private ProcessService processService;
 
+    @Mock
+    private TaskInstanceDao taskInstanceDao;
+
     @Mock
     private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
@@ -108,13 +112,14 @@ public class FailoverServiceTest {
         given(masterConfig.getMasterAddress()).willReturn(testMasterHost);
         MasterFailoverService masterFailoverService =
                 new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
-                        processInstanceExecCacheManager, logClient);
+                        processInstanceExecCacheManager, logClient, taskInstanceDao);
         WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
                 masterConfig,
                 processService,
                 workflowExecuteThreadPool,
                 cacheManager,
-                logClient);
+                logClient,
+                taskInstanceDao);
 
         failoverService = new FailoverService(masterFailoverService, workerFailoverService);
 
@@ -150,7 +155,7 @@ public class FailoverServiceTest {
         given(processService.queryNeedFailoverProcessInstances(Mockito.anyString()))
                 .willReturn(Arrays.asList(processInstance));
         doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
-        given(processService.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt()))
+        given(taskInstanceDao.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt()))
                 .willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance));
 
         Thread.sleep(1000);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 6f85ed3e0b..76a6e6ea52 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.dao.entity.Environment;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.dolphinscheduler.dao.entity.Project;
@@ -70,8 +69,6 @@ public interface ProcessService {
 
     Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
 
-    List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);
-
     ProcessInstance findProcessInstanceById(int processId);
 
     ProcessDefinition findProcessDefineById(int processDefinitionId);
@@ -104,36 +101,12 @@ public interface ProcessService {
 
     void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task);
 
-    TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance);
-
-    TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance);
-
-    boolean saveTaskInstance(TaskInstance taskInstance);
-
-    boolean createTaskInstance(TaskInstance taskInstance);
-
-    boolean updateTaskInstance(TaskInstance taskInstance);
-
-    TaskInstance findTaskInstanceById(Integer taskId);
-
-    List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList);
-
     void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance);
 
     void updateTaskDefinitionResources(TaskDefinition taskDefinition);
 
     List<Integer> findTaskIdByInstanceState(int instanceId, TaskExecutionStatus state);
 
-    List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag);
-
-    List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId);
-
-    int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
-
-    int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
-
-    ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId);
-
     int deleteWorkProcessMapByParentId(int parentWorkProcessId);
 
     ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId);
@@ -157,8 +130,6 @@ public interface ProcessService {
     @Transactional
     void processNeedFailoverProcessInstances(ProcessInstance processInstance);
 
-    List<TaskInstance> queryNeedFailoverTaskInstances(String host);
-
     DataSource findDataSourceById(int id);
 
     ProcessInstance findProcessInstanceByTaskId(int taskId);
@@ -216,12 +187,6 @@ public interface ProcessService {
 
     DagData genDagData(ProcessDefinition processDefinition);
 
-    List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations);
-
-    List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations);
-
-    TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion);
-
     List<ProcessTaskRelation> findRelationByCode(long processDefinitionCode, int processDefinitionVersion);
 
     List<TaskNode> transformTask(List<ProcessTaskRelation> taskRelationList,
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index dd65097585..ea3f5fd7a1 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
@@ -110,6 +109,10 @@ import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -193,6 +196,18 @@ public class ProcessServiceImpl implements ProcessService {
     @Autowired
     private ProcessInstanceDao processInstanceDao;
 
+    @Autowired
+    private TaskDefinitionDao taskDefinitionDao;
+
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    @Autowired
+    private TaskDefinitionLogDao taskDefinitionLogDao;
+
+    @Autowired
+    private ProcessInstanceMapDao processInstanceMapDao;
+
     @Autowired
     private DataSourceMapper dataSourceMapper;
 
@@ -399,32 +414,6 @@ public class ProcessServiceImpl implements ProcessService {
         return Optional.ofNullable(processInstanceMapper.queryDetailById(processId));
     }
 
-    /**
-     * get task node list by definitionId
-     */
-    @Override
-    public List<TaskDefinition> getTaskNodeListByDefinition(long defineCode) {
-        ProcessDefinition processDefinition = processDefineMapper.queryByCode(defineCode);
-        if (processDefinition == null) {
-            logger.error("process define not exists");
-            return Lists.newArrayList();
-        }
-        List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper
-                .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
-        Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
-        for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
-            if (processTaskRelation.getPostTaskCode() > 0) {
-                taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(),
-                        processTaskRelation.getPostTaskVersion()));
-            }
-        }
-        if (taskDefinitionSet.isEmpty()) {
-            return Lists.newArrayList();
-        }
-        List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
-        return Lists.newArrayList(taskDefinitionLogs);
-    }
-
     /**
      * find process instance by id
      *
@@ -516,7 +505,7 @@ public class ProcessServiceImpl implements ProcessService {
     public void removeTaskLogFile(Integer processInstanceId) {
         ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
         List<TaskInstance> taskInstanceList =
-                findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
+                taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
         if (CollectionUtils.isEmpty(taskInstanceList)) {
             return;
         }
@@ -538,7 +527,7 @@ public class ProcessServiceImpl implements ProcessService {
     public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) {
         ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
         List<TaskInstance> taskInstanceList =
-                findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
+                taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
         if (CollectionUtils.isEmpty(taskInstanceList)) {
             return;
         }
@@ -560,7 +549,7 @@ public class ProcessServiceImpl implements ProcessService {
      */
     @Override
     public void recurseFindSubProcess(long parentCode, List<Long> ids) {
-        List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinition(parentCode);
+        List<TaskDefinition> taskNodeList = taskDefinitionDao.getTaskDefinitionListByDefinition(parentCode);
 
         if (taskNodeList != null && !taskNodeList.isEmpty()) {
 
@@ -862,7 +851,7 @@ public class ProcessServiceImpl implements ProcessService {
                 failedList.addAll(killedList);
                 failedList.addAll(toleranceList);
                 for (Integer taskId : failedList) {
-                    initTaskInstance(this.findTaskInstanceById(taskId));
+                    initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId));
                 }
                 cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
                         String.join(Constants.COMMA, convertIntListToString(failedList)));
@@ -880,7 +869,7 @@ public class ProcessServiceImpl implements ProcessService {
                         TaskExecutionStatus.KILL);
                 for (Integer taskId : stopNodeList) {
                     // initialize the pause state
-                    initTaskInstance(this.findTaskInstanceById(taskId));
+                    initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId));
                 }
                 cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
                         String.join(Constants.COMMA, convertIntListToString(stopNodeList)));
@@ -897,10 +886,11 @@ public class ProcessServiceImpl implements ProcessService {
                 // delete all the valid tasks when complement data if id is not null
                 if (processInstance.getId() != null) {
                     List<TaskInstance> taskInstanceList =
-                            this.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
+                            taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
+                                    processInstance.getTestFlag());
                     for (TaskInstance taskInstance : taskInstanceList) {
                         taskInstance.setFlag(Flag.NO);
-                        this.updateTaskInstance(taskInstance);
+                        taskInstanceDao.updateTaskInstance(taskInstance);
                     }
                 }
                 break;
@@ -912,10 +902,11 @@ public class ProcessServiceImpl implements ProcessService {
                 }
                 // delete all the valid tasks when repeat running
                 List<TaskInstance> validTaskList =
-                        findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
+                        taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
+                                processInstance.getTestFlag());
                 for (TaskInstance taskInstance : validTaskList) {
                     taskInstance.setFlag(Flag.NO);
-                    updateTaskInstance(taskInstance);
+                    taskInstanceDao.updateTaskInstance(taskInstance);
                 }
                 processInstance.setStartTime(new Date());
                 processInstance.setRestartTime(processInstance.getStartTime());
@@ -1067,7 +1058,7 @@ public class ProcessServiceImpl implements ProcessService {
         // update sub process id to process map table
         processInstanceMap.setProcessInstanceId(subProcessInstance.getId());
 
-        this.updateWorkProcessInstanceMap(processInstanceMap);
+        processInstanceMapDao.updateWorkProcessInstanceMap(processInstanceMap);
     }
 
     /**
@@ -1129,11 +1120,11 @@ public class ProcessServiceImpl implements ProcessService {
         if (!taskInstance.isSubProcess()
                 && (taskInstance.getState().isKill() || taskInstance.getState().isFailure())) {
             taskInstance.setFlag(Flag.NO);
-            updateTaskInstance(taskInstance);
+            taskInstanceDao.updateTaskInstance(taskInstance);
             return;
         }
         taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
-        updateTaskInstance(taskInstance);
+        taskInstanceDao.updateTaskInstance(taskInstance);
     }
 
     /**
@@ -1183,7 +1174,7 @@ public class ProcessServiceImpl implements ProcessService {
                 taskInstance.getProcessInstanceId(),
                 processInstance.getState());
         // submit to db
-        TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
+        TaskInstance task = taskInstanceDao.submitTaskInstanceToDB(taskInstance, processInstance);
         if (task == null) {
             logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ",
                     taskInstance.getName(),
@@ -1227,7 +1218,7 @@ public class ProcessServiceImpl implements ProcessService {
             processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
             if (processMap != null) {
                 processMap.setParentTaskInstanceId(parentTask.getId());
-                updateWorkProcessInstanceMap(processMap);
+                processInstanceMapDao.updateWorkProcessInstanceMap(processMap);
                 return processMap;
             }
         }
@@ -1235,7 +1226,7 @@ public class ProcessServiceImpl implements ProcessService {
         processMap = new ProcessInstanceMap();
         processMap.setParentProcessInstanceId(parentInstance.getId());
         processMap.setParentTaskInstanceId(parentTask.getId());
-        createWorkProcessInstanceMap(processMap);
+        processInstanceMapDao.createWorkProcessInstanceMap(processMap);
         return processMap;
     }
 
@@ -1250,11 +1241,13 @@ public class ProcessServiceImpl implements ProcessService {
                                                           TaskInstance parentTask) {
 
         Integer preTaskId = 0;
-        List<TaskInstance> preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
+        List<TaskInstance> preTaskList =
+                taskInstanceDao.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
         for (TaskInstance task : preTaskList) {
             if (task.getName().equals(parentTask.getName())) {
                 preTaskId = task.getId();
-                ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
+                ProcessInstanceMap map =
+                        processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
                 if (map != null) {
                     return map;
                 }
@@ -1277,7 +1270,8 @@ public class ProcessServiceImpl implements ProcessService {
             return;
         }
         // check create sub work flow firstly
-        ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
+        ProcessInstanceMap instanceMap =
+                processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
         if (null != instanceMap
                 && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) {
             // recover failover tolerance would not create a new command when the sub command already have been created
@@ -1347,165 +1341,6 @@ public class ProcessServiceImpl implements ProcessService {
         }
     }
 
-    /**
-     * submit task to mysql
-     *
-     * @param taskInstance    taskInstance
-     * @param processInstance processInstance
-     * @return task instance
-     */
-    @Override
-    public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
-        WorkflowExecutionStatus processInstanceState = processInstance.getState();
-        if (processInstanceState.isFinished() || processInstanceState == WorkflowExecutionStatus.READY_STOP) {
-            logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}",
-                    processInstance.getId(),
-                    processInstanceState,
-                    taskInstance.getTaskCode());
-            return null;
-        }
-        if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) {
-            taskInstance.setState(TaskExecutionStatus.PAUSE);
-        }
-        taskInstance.setExecutorId(processInstance.getExecutorId());
-        taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
-        if (taskInstance.getSubmitTime() == null) {
-            taskInstance.setSubmitTime(new Date());
-        }
-        if (taskInstance.getFirstSubmitTime() == null) {
-            taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
-        }
-        boolean saveResult = saveTaskInstance(taskInstance);
-        if (!saveResult) {
-            return null;
-        }
-        return taskInstance;
-    }
-
-    /**
-     * get submit task instance state by the work process state
-     * cannot modify the task state when running/kill/submit success, or this
-     * task instance is already exists in task queue .
-     * return pause if work process state is ready pause
-     * return stop if work process state is ready stop
-     * if all of above are not satisfied, return submit success
-     *
-     * @param taskInstance    taskInstance
-     * @param processInstance processInstance
-     * @return process instance state
-     */
-    @Override
-    public TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
-        TaskExecutionStatus state = taskInstance.getState();
-        // running, delayed or killed
-        // the task already exists in task queue
-        // return state
-        if (state == TaskExecutionStatus.RUNNING_EXECUTION
-                || state == TaskExecutionStatus.DELAY_EXECUTION
-                || state == TaskExecutionStatus.KILL
-                || state == TaskExecutionStatus.DISPATCH) {
-            return state;
-        }
-        // return pasue /stop if process instance state is ready pause / stop
-        // or return submit success
-        if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
-            state = TaskExecutionStatus.PAUSE;
-        } else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP
-                || !checkProcessStrategy(taskInstance, processInstance)) {
-            state = TaskExecutionStatus.KILL;
-        } else {
-            state = TaskExecutionStatus.SUBMITTED_SUCCESS;
-        }
-        return state;
-    }
-
-    /**
-     * check process instance strategy
-     *
-     * @param taskInstance taskInstance
-     * @return check strategy result
-     */
-    private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) {
-        FailureStrategy failureStrategy = processInstance.getFailureStrategy();
-        if (failureStrategy == FailureStrategy.CONTINUE) {
-            return true;
-        }
-        List<TaskInstance> taskInstances =
-                this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag());
-
-        for (TaskInstance task : taskInstances) {
-            if (task.getState() == TaskExecutionStatus.FAILURE
-                    && task.getRetryTimes() >= task.getMaxRetryTimes()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * insert or update task instance
-     *
-     * @param taskInstance taskInstance
-     * @return save task instance result
-     */
-    @Override
-    public boolean saveTaskInstance(TaskInstance taskInstance) {
-        if (taskInstance.getId() != null) {
-            return updateTaskInstance(taskInstance);
-        } else {
-            return createTaskInstance(taskInstance);
-        }
-    }
-
-    /**
-     * insert task instance
-     *
-     * @param taskInstance taskInstance
-     * @return create task instance result
-     */
-    @Override
-    public boolean createTaskInstance(TaskInstance taskInstance) {
-        int count = taskInstanceMapper.insert(taskInstance);
-        return count > 0;
-    }
-
-    /**
-     * update task instance
-     *
-     * @param taskInstance taskInstance
-     * @return update task instance result
-     */
-    @Override
-    public boolean updateTaskInstance(TaskInstance taskInstance) {
-        int count = taskInstanceMapper.updateById(taskInstance);
-        return count > 0;
-    }
-
-    /**
-     * find task instance by id
-     *
-     * @param taskId task id
-     * @return task instance
-     */
-    @Override
-    public TaskInstance findTaskInstanceById(Integer taskId) {
-        return taskInstanceMapper.selectById(taskId);
-    }
-
-    /**
-     * find task instance list by id list
-     *
-     * @param idList task id list
-     * @return task instance list
-     */
-    @Override
-    public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
-        if (CollectionUtils.isEmpty(idList)) {
-            return new ArrayList<>();
-        }
-        return taskInstanceMapper.selectBatchIds(idList);
-    }
-
     /**
      * package task instance
      */
@@ -1514,7 +1349,7 @@ public class ProcessServiceImpl implements ProcessService {
         taskInstance.setProcessInstance(processInstance);
         taskInstance.setProcessDefine(processInstance.getProcessDefinition());
         taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
-        TaskDefinition taskDefinition = this.findTaskDefinition(
+        TaskDefinition taskDefinition = taskDefinitionDao.findTaskDefinition(
                 taskInstance.getTaskCode(),
                 taskInstance.getTaskDefinitionVersion());
         this.updateTaskDefinitionResources(taskDefinition);
@@ -1603,69 +1438,6 @@ public class ProcessServiceImpl implements ProcessService {
         return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.getCode());
     }
 
-    /**
-     * find valid task list by process definition id
-     *
-     * @param processInstanceId processInstanceId
-     * @param testFlag          testFlag
-     * @return task instance list
-     */
-    @Override
-    public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag) {
-        return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag);
-    }
-
-    /**
-     * find previous task list by work process id
-     *
-     * @param processInstanceId processInstanceId
-     * @return task instance list
-     */
-    @Override
-    public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) {
-        ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
-        return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO,
-                processInstance.getTestFlag());
-    }
-
-    /**
-     * update work process instance map
-     *
-     * @param processInstanceMap processInstanceMap
-     * @return update process instance result
-     */
-    @Override
-    public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
-        return processInstanceMapMapper.updateById(processInstanceMap);
-    }
-
-    /**
-     * create work process instance map
-     *
-     * @param processInstanceMap processInstanceMap
-     * @return create process instance result
-     */
-    @Override
-    public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
-        int count = 0;
-        if (processInstanceMap != null) {
-            return processInstanceMapMapper.insert(processInstanceMap);
-        }
-        return count;
-    }
-
-    /**
-     * find work process map by parent process id and parent task id.
-     *
-     * @param parentWorkProcessId parentWorkProcessId
-     * @param parentTaskId        parentTaskId
-     * @return process instance map
-     */
-    @Override
-    public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
-        return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
-    }
-
     /**
      * delete work process map by parent process id
      *
@@ -1861,18 +1633,6 @@ public class ProcessServiceImpl implements ProcessService {
         commandService.createCommand(cmd);
     }
 
-    /**
-     * query all need failover task instances by host
-     *
-     * @param host host
-     * @return task instance list
-     */
-    @Override
-    public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
-        return taskInstanceMapper.queryByHostAndStatus(host,
-                TaskExecutionStatus.getNeedFailoverWorkflowInstanceState());
-    }
-
     /**
      * find data source by id
      *
@@ -2252,8 +2012,8 @@ public class ProcessServiceImpl implements ProcessService {
 
         if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
             resourceIds = params.getResourceFilesList().stream()
-                    .filter(t -> t.getId() != null)
                     .map(ResourceInfo::getId)
+                    .filter(Objects::nonNull)
                     .collect(toSet());
         }
         if (CollectionUtils.isEmpty(resourceIds)) {
@@ -2477,57 +2237,12 @@ public class ProcessServiceImpl implements ProcessService {
     public DagData genDagData(ProcessDefinition processDefinition) {
         List<ProcessTaskRelation> taskRelations =
                 this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
-        List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(taskRelations);
+        List<TaskDefinitionLog> taskDefinitionLogList = taskDefinitionLogDao.getTaskDefineLogList(taskRelations);
         List<TaskDefinition> taskDefinitions =
                 taskDefinitionLogList.stream().map(t -> (TaskDefinition) t).collect(Collectors.toList());
         return new DagData(processDefinition, taskRelations, taskDefinitions);
     }
 
-    @Override
-    public List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations) {
-        Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
-        for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
-            if (processTaskRelation.getPreTaskCode() > 0) {
-                taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(),
-                        processTaskRelation.getPreTaskVersion()));
-            }
-            if (processTaskRelation.getPostTaskCode() > 0) {
-                taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(),
-                        processTaskRelation.getPostTaskVersion()));
-            }
-        }
-        if (taskDefinitionSet.isEmpty()) {
-            return Lists.newArrayList();
-        }
-        return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
-    }
-
-    @Override
-    public List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
-        List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
-        Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
-        for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
-            if (processTaskRelation.getPreTaskCode() > 0) {
-                taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
-            }
-            if (processTaskRelation.getPostTaskCode() > 0) {
-                taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
-            }
-        }
-        taskCodeVersionMap.forEach((code, version) -> {
-            taskDefinitionLogs.add((TaskDefinitionLog) this.findTaskDefinition(code, version));
-        });
-        return taskDefinitionLogs;
-    }
-
-    /**
-     * find task definition by code and version
-     */
-    @Override
-    public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) {
-        return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
-    }
-
     /**
      * find process task relation list by process
      */
@@ -2571,7 +2286,7 @@ public class ProcessServiceImpl implements ProcessService {
             });
         }
         if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
-            taskDefinitionLogs = genTaskDefineList(taskRelationList);
+            taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelationList);
         }
         Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
                 .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
@@ -2626,7 +2341,7 @@ public class ProcessServiceImpl implements ProcessService {
             return processTaskMap;
         }
         ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
-        TaskInstance fatherTask = this.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId());
+        TaskInstance fatherTask = taskInstanceDao.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId());
 
         if (fatherProcess != null) {
             processTaskMap.put(fatherProcess, fatherTask);
@@ -2936,12 +2651,13 @@ public class ProcessServiceImpl implements ProcessService {
         if (processInstance != null
                 && (processInstance.getState().isFailure() || processInstance.getState().isStop())) {
             List<TaskInstance> validTaskList =
-                    findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
+                    taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
+                            processInstance.getTestFlag());
             List<Long> instanceTaskCodeList =
                     validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
             List<ProcessTaskRelation> taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(),
                     processInstance.getProcessDefinitionVersion());
-            List<TaskDefinitionLog> taskDefinitionLogs = genTaskDefineList(taskRelations);
+            List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelations);
             List<Long> definiteTaskCodeList =
                     taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES)
                             .map(TaskDefinitionLog::getCode).collect(Collectors.toList());
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 96c153ec10..5ad590a24f 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -66,6 +66,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
@@ -121,6 +124,16 @@ public class ProcessServiceTest {
     private ProcessInstanceMapper processInstanceMapper;
     @Mock
     private ProcessInstanceDao processInstanceDao;
+
+    @Mock
+    private TaskInstanceDao taskInstanceDao;
+
+    @Mock
+    private TaskDefinitionLogDao taskDefinitionLogDao;
+
+    @Mock
+    private TaskDefinitionDao taskDefinitionDao;
+
     @Mock
     private UserMapper userMapper;
     @Mock
@@ -652,7 +665,7 @@ public class ProcessServiceTest {
         taskDefinitionLogs.add(taskDefinition);
         taskDefinitionLogs.add(td2);
 
-        Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs);
+        Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(taskDefinitionLogs);
         Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt()))
                 .thenReturn(list);
 
@@ -729,23 +742,6 @@ public class ProcessServiceTest {
 
     }
 
-    @Test
-    public void testFindTaskInstanceByIdList() {
-        List<Integer> emptyList = new ArrayList<>();
-        Mockito.when(taskInstanceMapper.selectBatchIds(emptyList)).thenReturn(new ArrayList<>());
-        Assertions.assertEquals(0, processService.findTaskInstanceByIdList(emptyList).size());
-
-        List<Integer> idList = Collections.singletonList(1);
-        TaskInstance instance = new TaskInstance();
-        instance.setId(1);
-
-        Mockito.when(taskInstanceMapper.selectBatchIds(idList)).thenReturn(Collections.singletonList(instance));
-        List<TaskInstance> taskInstanceByIdList = processService.findTaskInstanceByIdList(idList);
-
-        Assertions.assertEquals(1, taskInstanceByIdList.size());
-        Assertions.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
-    }
-
     @Test
     public void testFindLastManualProcessInterval() {
         long definitionCode = 1L;