You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/10/26 03:52:05 UTC
[dolphinscheduler] branch dev updated: [Refactor]migrate some task dao functions from ProcessServiceImpl to dao modules. (#12505)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 04aa125ba2 [Refactor]migrate some task dao functions from ProcessServiceImpl to dao modules. (#12505)
04aa125ba2 is described below
commit 04aa125ba2b7762ccee5e9ee80ad14f3fe83caf3
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Wed Oct 26 11:51:58 2022 +0800
[Refactor]migrate some task dao functions from ProcessServiceImpl to dao modules. (#12505)
* migrate some task DAO to dao modules.
* Update dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java
Co-authored-by: caishunfeng <ca...@gmail.com>
---
.../api/service/impl/LoggerServiceImpl.java | 12 +-
.../service/impl/ProcessDefinitionServiceImpl.java | 27 +-
.../service/impl/ProcessInstanceServiceImpl.java | 8 +-
.../api/service/LoggerServiceTest.java | 16 +-
.../api/service/ProcessDefinitionServiceTest.java | 9 +-
.../api/service/ProcessInstanceServiceTest.java | 12 +-
.../dao/repository/ProcessInstanceMapDao.java | 49 +++
.../dao/repository/TaskDefinitionDao.java | 44 +++
.../dao/repository/TaskDefinitionLogDao.java | 44 +++
.../dao/repository/TaskInstanceDao.java | 90 +++++
.../repository/impl/ProcessInstanceMapDaoImpl.java | 52 +++
.../dao/repository/impl/TaskDefinitionDaoImpl.java | 87 +++++
.../repository/impl/TaskDefinitionLogDaoImpl.java | 91 +++++
.../dao/repository/impl/TaskInstanceDaoImpl.java | 170 +++++++++
.../master/consumer/TaskPriorityQueueConsumer.java | 9 +-
.../server/master/event/TaskDelayEventHandler.java | 6 +-
.../master/event/TaskDispatchEventHandler.java | 6 +-
.../master/event/TaskResultEventHandler.java | 6 +-
.../master/event/TaskRunningEventHandler.java | 6 +-
.../processor/TaskExecuteStartProcessor.java | 6 +-
.../master/runner/MasterSchedulerBootstrap.java | 12 +-
.../master/runner/StreamTaskExecuteRunnable.java | 12 +-
.../master/runner/WorkflowExecuteRunnable.java | 35 +-
.../master/runner/task/BaseTaskProcessor.java | 6 +-
.../master/runner/task/BlockingTaskProcessor.java | 10 +-
.../master/runner/task/CommonTaskProcessor.java | 4 +-
.../master/runner/task/ConditionTaskProcessor.java | 10 +-
.../master/runner/task/DependentTaskProcessor.java | 8 +-
.../master/runner/task/SubTaskProcessor.java | 4 +-
.../master/runner/task/SwitchTaskProcessor.java | 10 +-
.../master/service/MasterFailoverService.java | 12 +-
.../master/service/WorkerFailoverService.java | 9 +-
.../server/master/utils/DependentExecute.java | 5 +-
.../server/master/BlockingTaskTest.java | 24 +-
.../server/master/ConditionsTaskTest.java | 23 +-
.../server/master/DependentTaskTest.java | 31 +-
.../server/master/SubProcessTaskTest.java | 8 +-
.../server/master/SwitchTaskTest.java | 21 +-
.../consumer/TaskPriorityQueueConsumerTest.java | 12 +-
.../master/runner/WorkflowExecuteRunnableTest.java | 12 +-
.../runner/task/CommonTaskProcessorTest.java | 16 +-
.../server/master/service/FailoverServiceTest.java | 11 +-
.../service/process/ProcessService.java | 35 --
.../service/process/ProcessServiceImpl.java | 378 +++------------------
.../service/process/ProcessServiceTest.java | 32 +-
45 files changed, 972 insertions(+), 518 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
index eecb40cc63..239c9669f1 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
@@ -33,9 +33,9 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClient;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils;
@@ -60,7 +60,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
private static final String LOG_HEAD_FORMAT = "[LOG-PATH]: %s, [HOST]: %s%s";
@Autowired
- private ProcessService processService;
+ private TaskInstanceDao taskInstanceDao;
@Autowired
private LogClient logClient;
@@ -86,7 +86,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
@SuppressWarnings("unchecked")
public Result<ResponseTaskLog> queryLog(int taskInstId, int skipLineNum, int limit) {
- TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
+ TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId);
if (taskInstance == null) {
logger.error("Task instance does not exist, taskInstanceId:{}.", taskInstId);
@@ -111,7 +111,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
*/
@Override
public byte[] getLogBytes(int taskInstId) {
- TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
+ TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
@@ -138,7 +138,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
return result;
}
// check whether the task instance can be found
- TaskInstance task = processService.findTaskInstanceById(taskInstId);
+ TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
@@ -171,7 +171,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
throw new ServiceException("user has no permission");
}
// check whether the task instance can be found
- TaskInstance task = processService.findTaskInstanceById(taskInstId);
+ TaskInstance task = taskInstanceDao.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index ca6ccf7d65..acb05b897e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -103,6 +103,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@@ -200,6 +201,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private ProcessService processService;
+ @Autowired
+ private TaskDefinitionLogDao taskDefinitionLogDao;
+
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@@ -388,9 +392,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (insertVersion == 0) {
logger.error("Save process definition error, processCode:{}.", processDefinition.getCode());
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
- } else
+ } else {
logger.info("Save process definition complete, processCode:{}, processVersion:{}.",
processDefinition.getCode(), insertVersion);
+ }
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(),
insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
@@ -398,9 +403,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
- } else
+ } else {
logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+ }
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
@@ -885,9 +891,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.error("Update process definition error, processCode:{}.", processDefinition.getCode());
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
- } else
+ } else {
logger.info("Update process definition complete, processCode:{}, processVersion:{}.",
processDefinition.getCode(), insertVersion);
+ }
taskUsedInOtherTaskValid(processDefinition, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
@@ -1136,9 +1143,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
projectCode, code, schedule.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
- } else
+ } else {
logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, schedule.getId());
+ }
schedulerService.deleteSchedule(project.getId(), schedule.getId());
}
}
@@ -1184,8 +1192,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (CollectionUtils.isNotEmpty(dagDataSchedules)) {
logger.info("Start download process definition file, processDefinitionCodes:{}.", defineCodeSet);
downloadProcessDefinitionFile(response, dagDataSchedules);
- } else
+ } else {
logger.error("There is no exported process dag data.");
+ }
}
/**
@@ -1856,7 +1865,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinitions.get(0).getVersion());
// query task definition log
- List<TaskDefinitionLog> taskDefinitionLogsList = processService.genTaskDefineList(processTaskRelations);
+ List<TaskDefinitionLog> taskDefinitionLogsList =
+ taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations);
List<DependentSimplifyDefinition> taskDefinitionList = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogsList) {
@@ -1906,7 +1916,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit);
processInstanceList.forEach(processInstance -> processInstance
.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())));
- List<TaskDefinitionLog> taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper
+ List<TaskDefinitionLog> taskDefinitionList = taskDefinitionLogDao.getTaskDefineLogList(processTaskRelationMapper
.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()));
Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
@@ -2144,7 +2154,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
String otherParamsJson = doOtherOperateProcess(loginUser, processDefinition);
if (isCopy) {
logger.info("Copy process definition...");
- List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations);
+ List<TaskDefinitionLog> taskDefinitionLogs =
+ taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations);
Map<Long, Long> taskCodeMap = new HashMap<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
try {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index ce1747ca85..3d350151aa 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -71,6 +71,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.WorkflowUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -129,6 +130,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
ProcessService processService;
+ @Autowired
+ TaskInstanceDao taskInstanceDao;
+
@Autowired
ProcessInstanceMapper processInstanceMapper;
@@ -366,7 +370,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
List<TaskInstance> taskInstanceList =
- processService.findValidTaskListByProcessId(processId, processInstance.getTestFlag());
+ taskInstanceDao.findValidTaskListByProcessId(processId, processInstance.getTestFlag());
addDependResultForTaskList(taskInstanceList);
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
@@ -444,7 +448,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
- TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
+ TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId);
if (taskInstance == null) {
logger.error("Task instance does not exist, projectCode:{}, taskInstanceId{}.", projectCode, taskId);
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
index 747903da38..ca34d74730 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
@@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.service.log.LogClient;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.HashMap;
@@ -63,7 +63,7 @@ public class LoggerServiceTest {
private LoggerServiceImpl loggerService;
@Mock
- private ProcessService processService;
+ private TaskInstanceDao taskInstanceDao;
@Mock
private ProjectMapper projectMapper;
@@ -81,7 +81,7 @@ public class LoggerServiceTest {
public void testQueryDataSourceList() {
TaskInstance taskInstance = new TaskInstance();
- Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+ Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
Result result = loggerService.queryLog(2, 1, 1);
// TASK_INSTANCE_NOT_FOUND
Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue());
@@ -98,7 +98,7 @@ public class LoggerServiceTest {
// SUCCESS
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
- Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+ Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
result = loggerService.queryLog(1, 1, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
}
@@ -107,7 +107,7 @@ public class LoggerServiceTest {
public void testGetLogBytes() {
TaskInstance taskInstance = new TaskInstance();
- Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+ Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
// task instance is null
try {
@@ -146,7 +146,7 @@ public class LoggerServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
TaskInstance taskInstance = new TaskInstance();
- Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+ Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L);
@@ -156,7 +156,7 @@ public class LoggerServiceTest {
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result);
- Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+ Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
@@ -184,7 +184,7 @@ public class LoggerServiceTest {
taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG))
.thenReturn(result);
- Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+ Mockito.when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
.thenReturn(new byte[0]);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 05c9a0b044..31502b917e 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -28,6 +28,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT;
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
@@ -67,6 +68,7 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -149,6 +151,9 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
@Mock
private ProcessService processService;
+ @Mock
+ private TaskDefinitionLogDao taskDefinitionLogDao;
+
@Mock
private ProcessInstanceService processInstanceService;
@@ -756,8 +761,10 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
result.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1);
Mockito.when(projectService.checkProjectAndAuth(user, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
-
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
+ Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode))
+ .thenReturn(getProcessTaskRelation());
+ Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(new ArrayList<>());
Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(user, processDefinition.getProjectCode(), 46, 10);
Assertions.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index feefd1f6f2..f6bbefd8e4 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -57,6 +57,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
@@ -104,6 +105,9 @@ public class ProcessInstanceServiceTest {
@Mock
ProcessService processService;
+ @Mock
+ TaskInstanceDao taskInstanceDao;
+
@Mock
ProcessInstanceDao processInstanceDao;
@@ -408,7 +412,7 @@ public class ProcessInstanceServiceTest {
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId()))
.thenReturn(Optional.of(processInstance));
- when(processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
+ when(taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
.thenReturn(taskInstanceList);
when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res);
Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1);
@@ -451,7 +455,7 @@ public class ProcessInstanceServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
- when(processService.findTaskInstanceById(1)).thenReturn(null);
+ when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(null);
Map<String, Object> taskNullRes =
processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1);
Assertions.assertEquals(Status.TASK_INSTANCE_NOT_EXISTS, taskNullRes.get(Constants.STATUS));
@@ -461,7 +465,7 @@ public class ProcessInstanceServiceTest {
taskInstance.setTaskType("HTTP");
taskInstance.setProcessInstanceId(1);
putMsg(result, Status.SUCCESS, projectCode);
- when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+ when(taskInstanceDao.findTaskInstanceById(1)).thenReturn(taskInstance);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
@@ -481,7 +485,7 @@ public class ProcessInstanceServiceTest {
subTask.setTaskType("SUB_PROCESS");
subTask.setProcessInstanceId(1);
putMsg(result, Status.SUCCESS, projectCode);
- when(processService.findTaskInstanceById(subTask.getId())).thenReturn(subTask);
+ when(taskInstanceDao.findTaskInstanceById(subTask.getId())).thenReturn(subTask);
when(processService.findSubProcessInstance(subTask.getProcessInstanceId(), subTask.getId())).thenReturn(null);
Map<String, Object> subprocessNotExistRes =
processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java
new file mode 100644
index 0000000000..5465c6d40d
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+
+/**
+ * Process Instance Map DAO
+ */
+public interface ProcessInstanceMapDao {
+
+ /**
+ * Update process instance map
+ * @param processInstanceMap process instance map
+ * @return result
+ */
+ int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
+
+ /**
+ * Create process instance map to DB.
+ * @param processInstanceMap process instance map
+ * @return result
+ */
+ int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
+
+ /**
+ * find work process map by parent process id and parent task id.
+ * @param parentWorkProcessId parentWorkProcessId
+ * @param parentTaskId parentTaskId
+ * @return process instance map
+ */
+ ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
new file mode 100644
index 0000000000..9aaed9e576
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+
+import java.util.List;
+
+/**
+ * Task Instance DAO
+ */
+public interface TaskDefinitionDao {
+
+ /**
+ * Get list of task definition by process definition code
+ * @param processDefinitionCode process definition code
+ * @return list of task definition
+ */
+ List<TaskDefinition> getTaskDefinitionListByDefinition(long processDefinitionCode);
+
+ /**
+ * Query task definition by code and version
+ * @param taskCode task code
+ * @param taskDefinitionVersion task definition version
+ * @return task definition
+ */
+ TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
new file mode 100644
index 0000000000..8b6f290f49
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+
+import java.util.List;
+
+/**
+ * Task Definition Log DAO
+ */
+public interface TaskDefinitionLogDao {
+
+ /**
+ * Get task definition log list
+ * @param processTaskRelations list of process task relation
+ * @return list of task definition
+ */
+ List<TaskDefinitionLog> getTaskDefineLogList(List<ProcessTaskRelation> processTaskRelations);
+
+ /**
+ * Query task definition log list by process task relation list
+ * @param processTaskRelations list of task relation
+ * @return list of task definition log
+ */
+ List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
new file mode 100644
index 0000000000..2b2adc9a3f
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.List;
+
+/**
+ * Task Instance DAO
+ */
+public interface TaskInstanceDao {
+
+ /**
+ * Update or Insert task instance to DB.
+ * ID is null -> Insert
+ * ID is not null -> Update
+ * @param taskInstance task instance
+ * @return result
+ */
+ boolean upsertTaskInstance(TaskInstance taskInstance);
+
+ /**
+ * Insert task instance to DB.
+ * @param taskInstance task instance
+ * @return result
+ */
+ boolean insertTaskInstance(TaskInstance taskInstance);
+
+ /**
+ * Update task instance to DB.
+ * @param taskInstance task instance
+ * @return result
+ */
+ boolean updateTaskInstance(TaskInstance taskInstance);
+
+ /**
+ * Submit a task instance to DB.
+ * @param taskInstance task instance
+ * @param processInstance process instance
+ * @return task instance
+ */
+ TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance);
+
+ /**
+ * Query list of valid task instance by process instance id
+ * @param processInstanceId processInstanceId
+ * @param testFlag test flag
+ * @return list of valid task instance
+ */
+ List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag);
+
+ /**
+ * find previous task list by work process id
+ * @param processInstanceId processInstanceId
+ * @return task instance list
+ */
+ List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId);
+
+ /**
+ * find task instance by id
+ * @param taskId task id
+ * @return task instance
+ */
+ TaskInstance findTaskInstanceById(Integer taskId);
+
+ /**
+ * find task instance list by id list
+ * @param idList task id list
+ * @return task instance list
+ */
+ List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java
new file mode 100644
index 0000000000..94dbc075bd
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
+
+import lombok.NonNull;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+/**
+ * Process Instance Map Dao implementation
+ */
+@Repository
+public class ProcessInstanceMapDaoImpl implements ProcessInstanceMapDao {
+
+ @Autowired
+ private ProcessInstanceMapMapper processInstanceMapMapper;
+
+ @Override
+ public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
+ return processInstanceMapMapper.updateById(processInstanceMap);
+ }
+
+ @Override
+ public int createWorkProcessInstanceMap(@NonNull ProcessInstanceMap processInstanceMap) {
+ return processInstanceMapMapper.insert(processInstanceMap);
+ }
+
+ @Override
+ public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
+ return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
+ }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
new file mode 100644
index 0000000000..33b6033454
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Task Definition DAO Implementation
+ */
+@Repository
+public class TaskDefinitionDaoImpl implements TaskDefinitionDao {
+
+ private final Logger logger = LoggerFactory.getLogger(TaskDefinitionDaoImpl.class);
+
+ @Autowired
+ private ProcessDefinitionMapper processDefinitionMapper;
+
+ @Autowired
+ private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
+
+ @Autowired
+ private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+ @Autowired
+ private TaskDefinitionMapper taskDefinitionMapper;
+
+ @Override
+ public List<TaskDefinition> getTaskDefinitionListByDefinition(long processDefinitionCode) {
+ ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
+ if (processDefinition == null) {
+ logger.error("Cannot find process definition, code: {}", processDefinitionCode);
+ return Lists.newArrayList();
+ }
+
+ List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper
+ .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
+ Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
+ processTaskRelations.stream().filter(p -> p.getPostTaskCode() > 0)
+ .forEach(p -> taskDefinitionSet.add(new TaskDefinition(p.getPostTaskCode(), p.getPostTaskVersion())));
+
+ if (taskDefinitionSet.isEmpty()) {
+ return Lists.newArrayList();
+ }
+ List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
+ return Lists.newArrayList(taskDefinitionLogs);
+ }
+
+ @Override
+ public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) {
+ return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
+ }
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
new file mode 100644
index 0000000000..efd000a410
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Task Definition Log DAP implementation
+ */
+@Repository
+public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao {
+
+ private final Logger logger = LoggerFactory.getLogger(TaskDefinitionLogDaoImpl.class);
+
+ @Autowired
+ private TaskDefinitionDao taskDefinitionDao;
+
+ @Autowired
+ private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+ @Override
+ public List<TaskDefinitionLog> getTaskDefineLogList(List<ProcessTaskRelation> processTaskRelations) {
+ Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+ if (processTaskRelation.getPreTaskCode() > 0) {
+ taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(),
+ processTaskRelation.getPreTaskVersion()));
+ }
+ if (processTaskRelation.getPostTaskCode() > 0) {
+ taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(),
+ processTaskRelation.getPostTaskVersion()));
+ }
+ }
+ if (taskDefinitionSet.isEmpty()) {
+ return Lists.newArrayList();
+ }
+ return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
+ }
+
+ @Override
+ public List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
+ List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+ Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+ if (processTaskRelation.getPreTaskCode() > 0) {
+ taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
+ }
+ if (processTaskRelation.getPostTaskCode() > 0) {
+ taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
+ }
+ }
+ taskCodeVersionMap.forEach((code, version) -> {
+ taskDefinitionLogs.add((TaskDefinitionLog) taskDefinitionDao.findTaskDefinition(code, version));
+ });
+ return taskDefinitionLogs;
+ }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
new file mode 100644
index 0000000000..648085cbd3
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+/**
+ * Task Instance DAO implementation
+ */
+@Repository
+public class TaskInstanceDaoImpl implements TaskInstanceDao {
+
+ private final Logger logger = LoggerFactory.getLogger(TaskInstanceDaoImpl.class);
+
+ @Autowired
+ private TaskInstanceMapper taskInstanceMapper;
+
+ @Autowired
+ private ProcessInstanceMapper processInstanceMapper;
+
+ @Autowired
+ private ProcessInstanceMapDao processInstanceMapDao;
+
+ @Override
+ public boolean upsertTaskInstance(TaskInstance taskInstance) {
+ if (taskInstance.getId() != null) {
+ return updateTaskInstance(taskInstance);
+ } else {
+ return insertTaskInstance(taskInstance);
+ }
+ }
+
+ @Override
+ public boolean insertTaskInstance(TaskInstance taskInstance) {
+ int count = taskInstanceMapper.insert(taskInstance);
+ return count > 0;
+ }
+
+ @Override
+ public boolean updateTaskInstance(TaskInstance taskInstance) {
+ int count = taskInstanceMapper.updateById(taskInstance);
+ return count > 0;
+ }
+
+ @Override
+ public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
+ WorkflowExecutionStatus processInstanceState = processInstance.getState();
+ if (processInstanceState.isFinished() || processInstanceState == WorkflowExecutionStatus.READY_STOP) {
+ logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}",
+ processInstance.getId(),
+ processInstanceState,
+ taskInstance.getTaskCode());
+ return null;
+ }
+ if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) {
+ taskInstance.setState(TaskExecutionStatus.PAUSE);
+ }
+ taskInstance.setExecutorId(processInstance.getExecutorId());
+ taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
+ if (taskInstance.getSubmitTime() == null) {
+ taskInstance.setSubmitTime(new Date());
+ }
+ if (taskInstance.getFirstSubmitTime() == null) {
+ taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
+ }
+ boolean saveResult = upsertTaskInstance(taskInstance);
+ if (!saveResult) {
+ return null;
+ }
+ return taskInstance;
+ }
+
+ private TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
+ TaskExecutionStatus state = taskInstance.getState();
+ if (state == TaskExecutionStatus.RUNNING_EXECUTION
+ || state == TaskExecutionStatus.DELAY_EXECUTION
+ || state == TaskExecutionStatus.KILL
+ || state == TaskExecutionStatus.DISPATCH) {
+ return state;
+ }
+
+ if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
+ state = TaskExecutionStatus.PAUSE;
+ } else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP
+ || !checkProcessStrategy(taskInstance, processInstance)) {
+ state = TaskExecutionStatus.KILL;
+ } else {
+ state = TaskExecutionStatus.SUBMITTED_SUCCESS;
+ }
+ return state;
+ }
+
+ private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) {
+ FailureStrategy failureStrategy = processInstance.getFailureStrategy();
+ if (failureStrategy == FailureStrategy.CONTINUE) {
+ return true;
+ }
+ List<TaskInstance> taskInstances =
+ this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag());
+
+ for (TaskInstance task : taskInstances) {
+ if (task.getState() == TaskExecutionStatus.FAILURE
+ && task.getRetryTimes() >= task.getMaxRetryTimes()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag) {
+ return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag);
+ }
+
+ @Override
+ public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) {
+ ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
+ return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO,
+ processInstance.getTestFlag());
+ }
+
+ @Override
+ public TaskInstance findTaskInstanceById(Integer taskId) {
+ return taskInstanceMapper.selectById(taskId);
+ }
+
+ @Override
+ public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
+ if (CollectionUtils.isEmpty(idList)) {
+ return new ArrayList<>();
+ }
+ return taskInstanceMapper.selectBatchIds(idList);
+ }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 87c802b163..c12c1d62e4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
@@ -36,7 +37,6 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
@@ -75,11 +75,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
@Autowired
private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
- /**
- * processService
- */
@Autowired
- private ProcessService processService;
+ private TaskInstanceDao taskInstanceDao;
/**
* executor dispatcher
@@ -264,7 +261,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
* @return taskInstance is final state
*/
public boolean taskInstanceIsFinalState(int taskInstanceId) {
- TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
+ TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
return taskInstance.getState().isFinished();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
index 51a0830561..18168056f0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -46,6 +47,9 @@ public class TaskDelayEventHandler implements TaskEventHandler {
@Autowired
private ProcessService processService;
+ @Autowired
+ private TaskInstanceDao taskInstanceDao;
+
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@@ -85,7 +89,7 @@ public class TaskDelayEventHandler implements TaskEventHandler {
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
- if (!processService.updateTaskInstance(taskInstance)) {
+ if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
throw new TaskEventHandleError("Handle task delay event error, update taskInstance to db failed");
}
sendAckToWorker(taskEvent);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
index 83d11db80d..08c13f2de1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
@@ -19,12 +19,12 @@ package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +40,7 @@ public class TaskDispatchEventHandler implements TaskEventHandler {
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
- private ProcessService processService;
+ private TaskInstanceDao taskInstanceDao;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
@@ -68,7 +68,7 @@ public class TaskDispatchEventHandler implements TaskEventHandler {
taskInstance.setState(TaskExecutionStatus.DISPATCH);
taskInstance.setHost(taskEvent.getWorkerAddress());
try {
- if (!processService.updateTaskInstance(taskInstance)) {
+ if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed");
}
} catch (Exception ex) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
index 5f36248d9e..a185037ae4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -50,6 +51,9 @@ public class TaskResultEventHandler implements TaskEventHandler {
@Autowired
private ProcessService processService;
+ @Autowired
+ private TaskInstanceDao taskInstanceDao;
+
@Autowired
private MasterConfig masterConfig;
@@ -92,7 +96,7 @@ public class TaskResultEventHandler implements TaskEventHandler {
taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
- processService.updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
sendAckToWorker(taskEvent);
} catch (Exception ex) {
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
index 3dc86b3a37..85a10da40d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
@@ -20,13 +20,13 @@ package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Optional;
@@ -47,7 +47,7 @@ public class TaskRunningEventHandler implements TaskEventHandler {
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Autowired
- private ProcessService processService;
+ private TaskInstanceDao taskInstanceDao;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
@@ -84,7 +84,7 @@ public class TaskRunningEventHandler implements TaskEventHandler {
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
- if (!processService.updateTaskInstance(taskInstance)) {
+ if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed");
}
sendAckToWorker(taskEvent);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java
index c1520b07df..cf80bfbd19 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteStartProcessor.java
@@ -19,13 +19,13 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteStartCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteThreadPool;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@ public class TaskExecuteStartProcessor implements NettyRequestProcessor {
private StreamTaskExecuteThreadPool streamTaskExecuteThreadPool;
@Autowired
- private ProcessService processService;
+ private TaskDefinitionDao taskDefinitionDao;
@Override
public void process(Channel channel, Command command) {
@@ -57,7 +57,7 @@ public class TaskExecuteStartProcessor implements NettyRequestProcessor {
JSONUtils.parseObject(command.getBody(), TaskExecuteStartCommand.class);
logger.info("taskExecuteStartCommand: {}", taskExecuteStartCommand);
- TaskDefinition taskDefinition = processService.findTaskDefinition(
+ TaskDefinition taskDefinition = taskDefinitionDao.findTaskDefinition(
taskExecuteStartCommand.getTaskDefinitionCode(), taskExecuteStartCommand.getTaskDefinitionVersion());
if (taskDefinition == null) {
logger.error("Task definition can not be found, taskDefinitionCode:{}, taskDefinitionVersion:{}",
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 7debb1f895..236dde1760 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@@ -73,6 +75,12 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired
private ProcessInstanceDao processInstanceDao;
+ @Autowired
+ private TaskInstanceDao taskInstanceDao;
+
+ @Autowired
+ private TaskDefinitionLogDao taskDefinitionLogDao;
+
@Autowired
private MasterConfig masterConfig;
@@ -183,7 +191,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
processAlertManager,
masterConfig,
stateWheelExecuteThread,
- curingGlobalParamsService);
+ curingGlobalParamsService,
+ taskInstanceDao,
+ taskDefinitionLogDao);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 15abef270d..ddecc35043 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -89,6 +90,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
protected ProcessService processService;
+ protected TaskInstanceDao taskInstanceDao;
+
protected ExecutorDispatcher dispatcher;
protected ProcessTaskRelationMapper processTaskRelationMapper;
@@ -118,6 +121,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
this.dispatcher = SpringApplicationContext.getBean(ExecutorDispatcher.class);
this.taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
this.processTaskRelationMapper = SpringApplicationContext.getBean(ProcessTaskRelationMapper.class);
+ this.taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
this.streamTaskInstanceExecCacheManager =
SpringApplicationContext.getBean(StreamTaskInstanceExecCacheManager.class);
this.taskDefinition = taskDefinition;
@@ -133,7 +137,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
// submit task
processService.updateTaskDefinitionResources(taskDefinition);
taskInstance = newTaskInstance(taskDefinition);
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
// add cache
streamTaskInstanceExecCacheManager.cache(taskInstance.getId(), this);
@@ -148,7 +152,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance);
if (taskExecutionContext == null) {
taskInstance.setState(TaskExecutionStatus.FAILURE);
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
return;
}
@@ -175,7 +179,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
// set task instance fail
taskInstance.setState(TaskExecutionStatus.FAILURE);
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
return;
}
@@ -416,7 +420,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
- processService.updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
// send ack
sendAckToWorker(taskEvent);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 175416949f..6bca587ac0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -58,6 +58,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -134,6 +136,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private ProcessInstanceDao processInstanceDao;
+ private TaskInstanceDao taskInstanceDao;
+
+ private TaskDefinitionLogDao taskDefinitionLogDao;
+
private final ProcessAlertManager processAlertManager;
private final NettyExecutorManager nettyExecutorManager;
@@ -245,7 +251,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
@NonNull ProcessAlertManager processAlertManager,
@NonNull MasterConfig masterConfig,
@NonNull StateWheelExecuteThread stateWheelExecuteThread,
- @NonNull CuringParamsService curingParamsService) {
+ @NonNull CuringParamsService curingParamsService,
+ @NonNull TaskInstanceDao taskInstanceDao,
+ @NonNull TaskDefinitionLogDao taskDefinitionLogDao) {
this.processService = processService;
this.commandService = commandService;
this.processInstanceDao = processInstanceDao;
@@ -254,6 +262,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
this.curingParamsService = curingParamsService;
+ this.taskInstanceDao = taskInstanceDao;
+ this.taskDefinitionLogDao = taskDefinitionLogDao;
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
}
@@ -347,7 +357,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
- TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+ TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.DISPATCH);
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
@@ -357,7 +367,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
if (acquireTaskGroup) {
- TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+ TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.DISPATCH);
return true;
@@ -502,7 +512,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
public void refreshTaskInstance(int taskInstanceId) {
logger.info("task instance update: {} ", taskInstanceId);
- TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
+ TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
if (taskInstance == null) {
logger.error("can not find task instance, id:{}", taskInstanceId);
return;
@@ -775,7 +785,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
List<ProcessTaskRelation> processTaskRelations =
processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogs =
- processService.getTaskDefineLogListByRelation(processTaskRelations);
+ taskDefinitionLogDao.getTaskDefineLogListByRelation(processTaskRelations);
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
forbiddenTaskMap.clear();
@@ -815,7 +825,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
processInstance.getRunTimes(),
processInstance.getRecovery());
List<TaskInstance> validTaskInstanceList =
- processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
+ taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
+ processInstance.getTestFlag());
for (TaskInstance task : validTaskInstanceList) {
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());
@@ -831,7 +842,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) {
task.setFlag(Flag.NO);
- processService.updateTaskInstance(task);
+ taskInstanceDao.updateTaskInstance(task);
continue;
}
}
@@ -852,7 +863,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (task.getState().isNeedFaultTolerance()) {
logger.info("TaskInstance needs fault tolerance, will be added to standby list.");
task.setFlag(Flag.NO);
- processService.updateTaskInstance(task);
+ taskInstanceDao.updateTaskInstance(task);
// tolerantTaskInstance add to standby list directly
TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
@@ -953,7 +964,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (taskInstance.getId() != oldTaskInstanceId) {
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
oldTaskInstance.setFlag(Flag.NO);
- processService.updateTaskInstance(oldTaskInstance);
+ taskInstanceDao.updateTaskInstance(oldTaskInstance);
validTaskMap.remove(taskInstance.getTaskCode());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
}
@@ -1780,7 +1791,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (taskInstanceId == null || taskInstanceId.equals(0)) {
continue;
}
- TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
+ TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
if (taskInstance == null || taskInstance.getState().isFinished()) {
continue;
}
@@ -1813,7 +1824,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
// stop tasks which is retrying if forced success happens
if (task.taskCanRetry()) {
- TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
+ TaskInstance retryTask = taskInstanceDao.findTaskInstanceById(task.getId());
if (retryTask != null && retryTask.getState().isForceSuccess()) {
task.setState(retryTask.getState());
logger.info(
@@ -1888,7 +1899,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
.map(Integer::valueOf)
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(startTaskInstanceIds)) {
- return processService.findTaskInstanceByIdList(startTaskInstanceIds);
+ return taskInstanceDao.findTaskInstanceByIdList(startTaskInstanceIds);
}
}
return Collections.emptyList();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 029ae7471a..ff39833142 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -49,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
@@ -123,6 +124,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessService processService;
+ protected TaskInstanceDao taskInstanceDao;
+
protected ProcessInstanceDao processInstanceDao;
protected MasterConfig masterConfig;
@@ -140,6 +143,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class);
+ taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
this.taskInstance = taskInstance;
this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
@@ -306,7 +310,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
if (verifyTenantIsNull(tenant, taskInstance)) {
logger.info("Task state changes to {}", TaskExecutionStatus.FAILURE);
taskInstance.setState(TaskExecutionStatus.FAILURE);
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index f7eac540d9..0b4848b1df 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -77,7 +77,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
this.taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
this.taskInstance.setStartTime(new Date());
- this.processService.saveTaskInstance(taskInstance);
+ this.taskInstanceDao.upsertTaskInstance(taskInstance);
this.dependentParameters = taskInstance.getDependency();
this.blockingParam = JSONUtils.parseObject(taskInstance.getTaskParams(), BlockingParameters.class);
}
@@ -87,7 +87,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
// todo: task cannot be pause
taskInstance.setState(TaskExecutionStatus.PAUSE);
taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
logger.info("blocking task has been paused");
return true;
}
@@ -96,7 +96,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
taskInstance.setState(TaskExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
logger.info("blocking task has been killed");
return true;
}
@@ -171,7 +171,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
private void setConditionResult() {
- List<TaskInstance> taskInstances = processService
+ List<TaskInstance> taskInstances = taskInstanceDao
.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
@@ -204,7 +204,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
}
taskInstance.setState(TaskExecutionStatus.SUCCESS);
taskInstance.setEndTime(new Date());
- processService.updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
logger.info("blocking task execute complete, blocking:{}", isBlocked);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index aa57548a06..cddc86ddaf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -146,7 +146,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
public boolean killTask() {
try {
- taskInstance = processService.findTaskInstanceById(taskInstance.getId());
+ taskInstance = taskInstanceDao.findTaskInstanceById(taskInstance.getId());
if (taskInstance == null) {
return true;
}
@@ -156,7 +156,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
// we don't wait the kill response
taskInstance.setState(TaskExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
- processService.updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
if (StringUtils.isNotEmpty(taskInstance.getHost())) {
killRemoteTask();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index f04e950223..23e0e65071 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -99,7 +99,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
protected boolean pauseTask() {
this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@@ -120,7 +120,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(TaskExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@@ -138,13 +138,13 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
- this.processService.saveTaskInstance(taskInstance);
+ this.taskInstanceDao.upsertTaskInstance(taskInstance);
this.dependentParameters = taskInstance.getDependency();
}
private void setConditionResult() {
- List<TaskInstance> taskInstances = processService
+ List<TaskInstance> taskInstances = taskInstanceDao
.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
@@ -194,6 +194,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
(conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
- processService.updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index ddcdc33c58..2edd0e97ea 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -114,7 +114,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
- processService.updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
initDependParameters();
logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate);
return true;
@@ -231,7 +231,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
protected boolean pauseTask() {
this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@@ -239,7 +239,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(TaskExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@@ -290,7 +290,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
status = (result == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index fe1896ec85..1828e269ea 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -133,7 +133,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
taskInstance.setState(TaskExecutionStatus.of(subProcessInstance.getState().getCode()));
taskInstance.setEndTime(new Date());
dealFinish();
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
}
}
@@ -201,7 +201,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
- processService.updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
logger.info("set sub work flow {} task {} state: {}",
processInstance.getId(),
taskInstance.getId(),
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index de571cb657..dfd5a02fb7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -80,7 +80,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
- processService.updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
if (!this.taskInstance().getState().isFinished()) {
setSwitchResult();
@@ -104,7 +104,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
protected boolean pauseTask() {
this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@@ -112,7 +112,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(TaskExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
return true;
}
@@ -127,7 +127,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
}
private boolean setSwitchResult() {
- List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
+ List<TaskInstance> taskInstances = taskInstanceDao.findValidTaskListByProcessId(
taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
Map<String, TaskExecutionStatus> completeTaskList = new HashMap<>();
for (TaskInstance task : taskInstances) {
@@ -188,7 +188,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
(conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
taskInstance.setEndTime(new Date());
taskInstance.setState(status);
- processService.updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
}
public String setTaskParams(String content, String rgex) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index cf28a6bc8e..5d6048c208 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
@@ -75,12 +76,15 @@ public class MasterFailoverService {
private final LogClient logClient;
+ private final TaskInstanceDao taskInstanceDao;
+
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull NettyExecutorManager nettyExecutorManager,
@NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager,
- @NonNull LogClient logClient) {
+ @NonNull LogClient logClient,
+ @NonNull TaskInstanceDao taskInstanceDao) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
@@ -88,7 +92,7 @@ public class MasterFailoverService {
this.localAddress = masterConfig.getMasterAddress();
this.processInstanceExecCacheManager = processInstanceExecCacheManager;
this.logClient = logClient;
-
+ this.taskInstanceDao = taskInstanceDao;
}
/**
@@ -164,7 +168,7 @@ public class MasterFailoverService {
processInstance.setProcessDefinition(processDefinition);
int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList =
- processService.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
+ taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
for (TaskInstance taskInstance : taskInstanceList) {
try {
LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());
@@ -249,7 +253,7 @@ public class MasterFailoverService {
}
taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
}
private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index 18de752b19..30522b8e4a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
@@ -73,12 +74,15 @@ public class WorkerFailoverService {
private final LogClient logClient;
private final String localAddress;
+ private final TaskInstanceDao taskInstanceDao;
+
public WorkerFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
@NonNull ProcessInstanceExecCacheManager cacheManager,
- @NonNull LogClient logClient) {
+ @NonNull LogClient logClient,
+ @NonNull TaskInstanceDao taskInstanceDao) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
@@ -86,6 +90,7 @@ public class WorkerFailoverService {
this.cacheManager = cacheManager;
this.logClient = logClient;
this.localAddress = masterConfig.getMasterAddress();
+ this.taskInstanceDao = taskInstanceDao;
}
/**
@@ -183,7 +188,7 @@ public class WorkerFailoverService {
taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstance.setFlag(Flag.NO);
- processService.saveTaskInstance(taskInstance);
+ taskInstanceDao.upsertTaskInstance(taskInstance);
TaskStateEvent stateEvent = TaskStateEvent.builder()
.processInstanceId(processInstance.getId())
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
index 3932dd693f..a28ee36cb0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.utils;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -48,6 +49,8 @@ public class DependentExecute {
*/
private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
+ private final TaskInstanceDao taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
+
/**
* depend item list
*/
@@ -154,7 +157,7 @@ public class DependentExecute {
DependResult result;
TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList =
- processService.findValidTaskListByProcessId(processInstance.getId(), testFlag);
+ taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), testFlag);
for (TaskInstance task : taskInstanceList) {
if (task.getTaskCode() == taskCode) {
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
index 1429ec1567..c36f38d8f2 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -62,6 +64,10 @@ public class BlockingTaskTest {
private ProcessService processService;
+ private TaskInstanceDao taskInstanceDao;
+
+ private TaskDefinitionDao taskDefinitionDao;
+
private ProcessInstance processInstance;
private MasterConfig config;
@@ -80,9 +86,14 @@ public class BlockingTaskTest {
// mock process service
processService = Mockito.mock(ProcessService.class);
-
Mockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
+ taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+ Mockito.when(SpringApplicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
+ taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
+ Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
+
// mock process instance
processInstance = getProcessInstance();
Mockito.when(processService
@@ -93,7 +104,7 @@ public class BlockingTaskTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
- Mockito.when(processService.findTaskDefinition(1L, 1))
+ Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
}
@@ -187,24 +198,23 @@ public class BlockingTaskTest {
.submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.findTaskInstanceById(taskInstance.getId()))
.thenReturn(taskInstance);
// for BlockingTaskExecThread.initTaskParameters
- Mockito.when(processService
- .saveTaskInstance(taskInstance))
+ Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
.thenReturn(true);
// for BlockingTaskExecThread.updateTaskState
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.updateTaskInstance(taskInstance))
.thenReturn(true);
// for BlockingTaskExecThread.waitTaskQuit
List<TaskInstance> conditions = getTaskInstanceForValidTaskList(expectResults);
Mockito.when(
- processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
+ taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
.thenReturn(conditions);
return taskInstance;
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
index 035a811f76..ae8b6022a0 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -61,6 +63,10 @@ public class ConditionsTaskTest {
private ProcessInstance processInstance;
+ private TaskInstanceDao taskInstanceDao;
+
+ private TaskDefinitionDao taskDefinitionDao;
+
@BeforeEach
public void before() {
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
@@ -75,6 +81,12 @@ public class ConditionsTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+ taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+ Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
+ taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
+ Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
+
processInstance = getProcessInstance();
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
@@ -84,7 +96,7 @@ public class ConditionsTaskTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
- Mockito.when(processService.findTaskDefinition(1L, 1))
+ Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
}
@@ -96,22 +108,21 @@ public class ConditionsTaskTest {
.submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
// for MasterBaseTaskExecThread.call
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.findTaskInstanceById(taskInstance.getId()))
.thenReturn(taskInstance);
// for ConditionsTaskExecThread.initTaskParameters
- Mockito.when(processService
- .saveTaskInstance(taskInstance))
+ Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
.thenReturn(true);
// for ConditionsTaskExecThread.updateTaskState
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.updateTaskInstance(taskInstance))
.thenReturn(true);
// for ConditionsTaskExecThread.waitTaskQuit
List<TaskInstance> conditions = Stream.of(
getTaskInstanceForValidTaskList(expectResult)).collect(Collectors.toList());
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
.thenReturn(conditions);
return taskInstance;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index e20e7c5572..0cfed4545c 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -70,6 +72,10 @@ public class DependentTaskTest {
private ProcessService processService;
+ private TaskInstanceDao taskInstanceDao;
+
+ private TaskDefinitionDao taskDefinitionDao;
+
/**
* the dependent task to be tested
* ProcessDefinition id=1
@@ -95,6 +101,12 @@ public class DependentTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+ taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+ Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
+ taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
+ Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
+
processInstance = getProcessInstance();
taskInstance = getTaskInstance();
@@ -110,20 +122,19 @@ public class DependentTaskTest {
.thenAnswer(i -> taskInstance);
// for DependentTaskExecThread.initTaskParameters
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.updateTaskInstance(Mockito.any()))
.thenReturn(true);
// for DependentTaskExecThread.updateTaskState
- Mockito.when(processService
- .saveTaskInstance(Mockito.any()))
+ Mockito.when(taskInstanceDao.upsertTaskInstance(Mockito.any()))
.thenReturn(true);
// for DependentTaskExecThread.waitTaskQuit
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.findTaskInstanceById(1000))
.thenAnswer(i -> taskInstance);
- Mockito.when(processService.findTaskDefinition(TASK_CODE, TASK_VERSION))
+ Mockito.when(taskDefinitionDao.findTaskDefinition(TASK_CODE, TASK_VERSION))
.thenReturn(getTaskDefinition());
}
@@ -155,7 +166,7 @@ public class DependentTaskTest {
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(200, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.SUCCESS, DEPEND_TASK_CODE_A,
@@ -177,7 +188,7 @@ public class DependentTaskTest {
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(200, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.FAILURE, DEPEND_TASK_CODE_A,
@@ -228,13 +239,13 @@ public class DependentTaskTest {
.thenReturn(processInstance300);
// for DependentExecute.getDependTaskResult
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(200, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.FAILURE, DEPEND_TASK_CODE_A,
processInstance200))
.collect(Collectors.toList()));
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(300, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(3000, TaskExecutionStatus.SUCCESS, DEPEND_TASK_CODE_B,
@@ -321,7 +332,7 @@ public class DependentTaskTest {
// DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
// for DependentExecute.getDependTaskResult
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.findValidTaskListByProcessId(200, 0))
.thenAnswer(i -> {
processInstance.setState(WorkflowExecutionStatus.READY_STOP);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
index 5694988847..4c2fafd63c 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -55,6 +56,8 @@ public class SubProcessTaskTest {
private ProcessService processService;
+ private TaskInstanceDao taskInstanceDao;
+
private ProcessInstance processInstance;
private MockedStatic<ServerLifeCycleManager> mockedStaticServerLifeCycleManager;
@@ -72,11 +75,14 @@ public class SubProcessTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
+ taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+ Mockito.when(SpringApplicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
mockedStaticServerLifeCycleManager = Mockito.mockStatic(ServerLifeCycleManager.class);
Mockito.when(ServerLifeCycleManager.isStopped()).thenReturn(false);
processInstance = getProcessInstance();
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.updateTaskInstance(Mockito.any()))
.thenReturn(true);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
index f5cd14decd..9bc05d283d 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
@@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@@ -53,6 +55,10 @@ public class SwitchTaskTest {
private ProcessInstance processInstance;
+ private TaskInstanceDao taskInstanceDao;
+
+ private TaskDefinitionDao taskDefinitionDao;
+
@BeforeEach
public void before() {
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
@@ -67,6 +73,12 @@ public class SwitchTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+ taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+ Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
+ taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
+ Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
+
processInstance = getProcessInstance();
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
@@ -78,7 +90,7 @@ public class SwitchTaskTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
- Mockito.when(processService.findTaskDefinition(1L, 1))
+ Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance);
@@ -87,15 +99,14 @@ public class SwitchTaskTest {
.submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
// for MasterBaseTaskExecThread.call
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.findTaskInstanceById(taskInstance.getId()))
.thenReturn(taskInstance);
// for SwitchTaskExecThread.initTaskParameters
- Mockito.when(processService
- .saveTaskInstance(taskInstance))
+ Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
.thenReturn(true);
// for SwitchTaskExecThread.updateTaskState
- Mockito.when(processService
+ Mockito.when(taskInstanceDao
.updateTaskInstance(taskInstance))
.thenReturn(true);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index bbd4a321cd..0af8f73e7b 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -60,6 +61,9 @@ public class TaskPriorityQueueConsumerTest {
@Autowired
private ProcessService processService;
+ @Autowired
+ private TaskInstanceDao taskInstanceDao;
+
@Autowired
private ExecutorDispatcher dispatcher;
@@ -236,7 +240,7 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
- Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+ Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
Boolean state = taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
Assertions.assertNotNull(state);
@@ -264,7 +268,7 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
- Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+ Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup");
taskPriorityQueue.put(taskPriority);
@@ -302,7 +306,7 @@ public class TaskPriorityQueueConsumerTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
- Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+ Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority();
taskPriority.setTaskId(1);
@@ -333,7 +337,7 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
- Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+ Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup");
taskPriorityQueue.put(taskPriority);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 4cf5900f99..f76109325c 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@@ -71,6 +73,9 @@ public class WorkflowExecuteRunnableTest {
private ProcessInstance processInstance;
+ private TaskInstanceDao taskInstanceDao;
+
+ private TaskDefinitionLogDao taskDefinitionLogDao;
private ProcessService processService;
private CommandService commandService;
@@ -96,6 +101,8 @@ public class WorkflowExecuteRunnableTest {
commandService = Mockito.mock(CommandService.class);
processInstanceDao = Mockito.mock(ProcessInstanceDao.class);
processInstance = Mockito.mock(ProcessInstance.class);
+ taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+ taskDefinitionLogDao = Mockito.mock(TaskDefinitionLogDao.class);
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00");
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, "2020-01-20 23:00:00");
@@ -111,7 +118,8 @@ public class WorkflowExecuteRunnableTest {
workflowExecuteThread = Mockito.spy(
new WorkflowExecuteRunnable(processInstance, commandService, processService, processInstanceDao,
nettyExecutorManager,
- processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
+ processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService,
+ taskInstanceDao, taskDefinitionLogDao));
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
dag.set(workflowExecuteThread, new DAG());
@@ -146,7 +154,7 @@ public class WorkflowExecuteRunnableTest {
taskInstance4.setId(4);
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
- Mockito.when(processService.findTaskInstanceByIdList(
+ Mockito.when(taskInstanceDao.findTaskInstanceByIdList(
Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(),
taskInstance4.getId())))
.thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
index 0217748fa4..0f96a98f85 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
@@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -56,6 +58,10 @@ public class CommonTaskProcessorTest {
private ProcessService processService;
+ private TaskInstanceDao taskInstanceDao;
+
+ private TaskDefinitionDao taskDefinitionDao;
+
private CommonTaskProcessor commonTaskProcessor;
@BeforeEach
@@ -69,6 +75,12 @@ public class CommonTaskProcessorTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+ taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
+ Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
+
+ taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
+ Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
+
commonTaskProcessor = Mockito.mock(CommonTaskProcessor.class);
Mockito.when(applicationContext.getBean(CommonTaskProcessor.class)).thenReturn(commonTaskProcessor);
@@ -76,7 +88,7 @@ public class CommonTaskProcessorTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
- Mockito.when(processService.findTaskDefinition(1L, 1))
+ Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
}
@@ -107,7 +119,7 @@ public class CommonTaskProcessorTest {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
- Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+ Mockito.doReturn(taskInstance).when(taskInstanceDao).findTaskInstanceById(1);
TaskExecutionContext taskExecutionContext = commonTaskProcessor.getTaskExecutionContext(taskInstance);
Assertions.assertNull(taskExecutionContext);
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index f7b41467bd..2e9dc23936 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -73,6 +74,9 @@ public class FailoverServiceTest {
@Mock
private ProcessService processService;
+ @Mock
+ private TaskInstanceDao taskInstanceDao;
+
@Mock
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@@ -108,13 +112,14 @@ public class FailoverServiceTest {
given(masterConfig.getMasterAddress()).willReturn(testMasterHost);
MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
- processInstanceExecCacheManager, logClient);
+ processInstanceExecCacheManager, logClient, taskInstanceDao);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,
workflowExecuteThreadPool,
cacheManager,
- logClient);
+ logClient,
+ taskInstanceDao);
failoverService = new FailoverService(masterFailoverService, workerFailoverService);
@@ -150,7 +155,7 @@ public class FailoverServiceTest {
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString()))
.willReturn(Arrays.asList(processInstance));
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
- given(processService.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt()))
+ given(taskInstanceDao.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt()))
.willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance));
Thread.sleep(1000);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 6f85ed3e0b..76a6e6ea52 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
@@ -70,8 +69,6 @@ public interface ProcessService {
Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
- List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);
-
ProcessInstance findProcessInstanceById(int processId);
ProcessDefinition findProcessDefineById(int processDefinitionId);
@@ -104,36 +101,12 @@ public interface ProcessService {
void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task);
- TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance);
-
- TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance);
-
- boolean saveTaskInstance(TaskInstance taskInstance);
-
- boolean createTaskInstance(TaskInstance taskInstance);
-
- boolean updateTaskInstance(TaskInstance taskInstance);
-
- TaskInstance findTaskInstanceById(Integer taskId);
-
- List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList);
-
void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance);
void updateTaskDefinitionResources(TaskDefinition taskDefinition);
List<Integer> findTaskIdByInstanceState(int instanceId, TaskExecutionStatus state);
- List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag);
-
- List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId);
-
- int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
-
- int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap);
-
- ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId);
-
int deleteWorkProcessMapByParentId(int parentWorkProcessId);
ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId);
@@ -157,8 +130,6 @@ public interface ProcessService {
@Transactional
void processNeedFailoverProcessInstances(ProcessInstance processInstance);
- List<TaskInstance> queryNeedFailoverTaskInstances(String host);
-
DataSource findDataSourceById(int id);
ProcessInstance findProcessInstanceByTaskId(int taskId);
@@ -216,12 +187,6 @@ public interface ProcessService {
DagData genDagData(ProcessDefinition processDefinition);
- List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations);
-
- List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations);
-
- TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion);
-
List<ProcessTaskRelation> findRelationByCode(long processDefinitionCode, int processDefinitionVersion);
List<TaskNode> transformTask(List<ProcessTaskRelation> taskRelationList,
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index dd65097585..ea3f5fd7a1 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
@@ -110,6 +109,10 @@ import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -193,6 +196,18 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private ProcessInstanceDao processInstanceDao;
+ @Autowired
+ private TaskDefinitionDao taskDefinitionDao;
+
+ @Autowired
+ private TaskInstanceDao taskInstanceDao;
+
+ @Autowired
+ private TaskDefinitionLogDao taskDefinitionLogDao;
+
+ @Autowired
+ private ProcessInstanceMapDao processInstanceMapDao;
+
@Autowired
private DataSourceMapper dataSourceMapper;
@@ -399,32 +414,6 @@ public class ProcessServiceImpl implements ProcessService {
return Optional.ofNullable(processInstanceMapper.queryDetailById(processId));
}
- /**
- * get task node list by definitionId
- */
- @Override
- public List<TaskDefinition> getTaskNodeListByDefinition(long defineCode) {
- ProcessDefinition processDefinition = processDefineMapper.queryByCode(defineCode);
- if (processDefinition == null) {
- logger.error("process define not exists");
- return Lists.newArrayList();
- }
- List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper
- .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
- Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
- for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) {
- if (processTaskRelation.getPostTaskCode() > 0) {
- taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(),
- processTaskRelation.getPostTaskVersion()));
- }
- }
- if (taskDefinitionSet.isEmpty()) {
- return Lists.newArrayList();
- }
- List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
- return Lists.newArrayList(taskDefinitionLogs);
- }
-
/**
* find process instance by id
*
@@ -516,7 +505,7 @@ public class ProcessServiceImpl implements ProcessService {
public void removeTaskLogFile(Integer processInstanceId) {
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
List<TaskInstance> taskInstanceList =
- findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
+ taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
@@ -538,7 +527,7 @@ public class ProcessServiceImpl implements ProcessService {
public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) {
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
List<TaskInstance> taskInstanceList =
- findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
+ taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
@@ -560,7 +549,7 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
public void recurseFindSubProcess(long parentCode, List<Long> ids) {
- List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinition(parentCode);
+ List<TaskDefinition> taskNodeList = taskDefinitionDao.getTaskDefinitionListByDefinition(parentCode);
if (taskNodeList != null && !taskNodeList.isEmpty()) {
@@ -862,7 +851,7 @@ public class ProcessServiceImpl implements ProcessService {
failedList.addAll(killedList);
failedList.addAll(toleranceList);
for (Integer taskId : failedList) {
- initTaskInstance(this.findTaskInstanceById(taskId));
+ initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId));
}
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(failedList)));
@@ -880,7 +869,7 @@ public class ProcessServiceImpl implements ProcessService {
TaskExecutionStatus.KILL);
for (Integer taskId : stopNodeList) {
// initialize the pause state
- initTaskInstance(this.findTaskInstanceById(taskId));
+ initTaskInstance(taskInstanceDao.findTaskInstanceById(taskId));
}
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(stopNodeList)));
@@ -897,10 +886,11 @@ public class ProcessServiceImpl implements ProcessService {
// delete all the valid tasks when complement data if id is not null
if (processInstance.getId() != null) {
List<TaskInstance> taskInstanceList =
- this.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
+ taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
+ processInstance.getTestFlag());
for (TaskInstance taskInstance : taskInstanceList) {
taskInstance.setFlag(Flag.NO);
- this.updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
}
}
break;
@@ -912,10 +902,11 @@ public class ProcessServiceImpl implements ProcessService {
}
// delete all the valid tasks when repeat running
List<TaskInstance> validTaskList =
- findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
+ taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
+ processInstance.getTestFlag());
for (TaskInstance taskInstance : validTaskList) {
taskInstance.setFlag(Flag.NO);
- updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
}
processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime());
@@ -1067,7 +1058,7 @@ public class ProcessServiceImpl implements ProcessService {
// update sub process id to process map table
processInstanceMap.setProcessInstanceId(subProcessInstance.getId());
- this.updateWorkProcessInstanceMap(processInstanceMap);
+ processInstanceMapDao.updateWorkProcessInstanceMap(processInstanceMap);
}
/**
@@ -1129,11 +1120,11 @@ public class ProcessServiceImpl implements ProcessService {
if (!taskInstance.isSubProcess()
&& (taskInstance.getState().isKill() || taskInstance.getState().isFailure())) {
taskInstance.setFlag(Flag.NO);
- updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
return;
}
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
- updateTaskInstance(taskInstance);
+ taskInstanceDao.updateTaskInstance(taskInstance);
}
/**
@@ -1183,7 +1174,7 @@ public class ProcessServiceImpl implements ProcessService {
taskInstance.getProcessInstanceId(),
processInstance.getState());
// submit to db
- TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
+ TaskInstance task = taskInstanceDao.submitTaskInstanceToDB(taskInstance, processInstance);
if (task == null) {
logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ",
taskInstance.getName(),
@@ -1227,7 +1218,7 @@ public class ProcessServiceImpl implements ProcessService {
processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
if (processMap != null) {
processMap.setParentTaskInstanceId(parentTask.getId());
- updateWorkProcessInstanceMap(processMap);
+ processInstanceMapDao.updateWorkProcessInstanceMap(processMap);
return processMap;
}
}
@@ -1235,7 +1226,7 @@ public class ProcessServiceImpl implements ProcessService {
processMap = new ProcessInstanceMap();
processMap.setParentProcessInstanceId(parentInstance.getId());
processMap.setParentTaskInstanceId(parentTask.getId());
- createWorkProcessInstanceMap(processMap);
+ processInstanceMapDao.createWorkProcessInstanceMap(processMap);
return processMap;
}
@@ -1250,11 +1241,13 @@ public class ProcessServiceImpl implements ProcessService {
TaskInstance parentTask) {
Integer preTaskId = 0;
- List<TaskInstance> preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
+ List<TaskInstance> preTaskList =
+ taskInstanceDao.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
for (TaskInstance task : preTaskList) {
if (task.getName().equals(parentTask.getName())) {
preTaskId = task.getId();
- ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
+ ProcessInstanceMap map =
+ processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
if (map != null) {
return map;
}
@@ -1277,7 +1270,8 @@ public class ProcessServiceImpl implements ProcessService {
return;
}
// check create sub work flow firstly
- ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
+ ProcessInstanceMap instanceMap =
+ processInstanceMapDao.findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
if (null != instanceMap
&& CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) {
// recover failover tolerance would not create a new command when the sub command already have been created
@@ -1347,165 +1341,6 @@ public class ProcessServiceImpl implements ProcessService {
}
}
- /**
- * submit task to mysql
- *
- * @param taskInstance taskInstance
- * @param processInstance processInstance
- * @return task instance
- */
- @Override
- public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
- WorkflowExecutionStatus processInstanceState = processInstance.getState();
- if (processInstanceState.isFinished() || processInstanceState == WorkflowExecutionStatus.READY_STOP) {
- logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}",
- processInstance.getId(),
- processInstanceState,
- taskInstance.getTaskCode());
- return null;
- }
- if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) {
- taskInstance.setState(TaskExecutionStatus.PAUSE);
- }
- taskInstance.setExecutorId(processInstance.getExecutorId());
- taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
- if (taskInstance.getSubmitTime() == null) {
- taskInstance.setSubmitTime(new Date());
- }
- if (taskInstance.getFirstSubmitTime() == null) {
- taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
- }
- boolean saveResult = saveTaskInstance(taskInstance);
- if (!saveResult) {
- return null;
- }
- return taskInstance;
- }
-
- /**
- * get submit task instance state by the work process state
- * cannot modify the task state when running/kill/submit success, or this
- * task instance is already exists in task queue .
- * return pause if work process state is ready pause
- * return stop if work process state is ready stop
- * if all of above are not satisfied, return submit success
- *
- * @param taskInstance taskInstance
- * @param processInstance processInstance
- * @return process instance state
- */
- @Override
- public TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
- TaskExecutionStatus state = taskInstance.getState();
- // running, delayed or killed
- // the task already exists in task queue
- // return state
- if (state == TaskExecutionStatus.RUNNING_EXECUTION
- || state == TaskExecutionStatus.DELAY_EXECUTION
- || state == TaskExecutionStatus.KILL
- || state == TaskExecutionStatus.DISPATCH) {
- return state;
- }
- // return pasue /stop if process instance state is ready pause / stop
- // or return submit success
- if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
- state = TaskExecutionStatus.PAUSE;
- } else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP
- || !checkProcessStrategy(taskInstance, processInstance)) {
- state = TaskExecutionStatus.KILL;
- } else {
- state = TaskExecutionStatus.SUBMITTED_SUCCESS;
- }
- return state;
- }
-
- /**
- * check process instance strategy
- *
- * @param taskInstance taskInstance
- * @return check strategy result
- */
- private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) {
- FailureStrategy failureStrategy = processInstance.getFailureStrategy();
- if (failureStrategy == FailureStrategy.CONTINUE) {
- return true;
- }
- List<TaskInstance> taskInstances =
- this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag());
-
- for (TaskInstance task : taskInstances) {
- if (task.getState() == TaskExecutionStatus.FAILURE
- && task.getRetryTimes() >= task.getMaxRetryTimes()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * insert or update task instance
- *
- * @param taskInstance taskInstance
- * @return save task instance result
- */
- @Override
- public boolean saveTaskInstance(TaskInstance taskInstance) {
- if (taskInstance.getId() != null) {
- return updateTaskInstance(taskInstance);
- } else {
- return createTaskInstance(taskInstance);
- }
- }
-
- /**
- * insert task instance
- *
- * @param taskInstance taskInstance
- * @return create task instance result
- */
- @Override
- public boolean createTaskInstance(TaskInstance taskInstance) {
- int count = taskInstanceMapper.insert(taskInstance);
- return count > 0;
- }
-
- /**
- * update task instance
- *
- * @param taskInstance taskInstance
- * @return update task instance result
- */
- @Override
- public boolean updateTaskInstance(TaskInstance taskInstance) {
- int count = taskInstanceMapper.updateById(taskInstance);
- return count > 0;
- }
-
- /**
- * find task instance by id
- *
- * @param taskId task id
- * @return task instance
- */
- @Override
- public TaskInstance findTaskInstanceById(Integer taskId) {
- return taskInstanceMapper.selectById(taskId);
- }
-
- /**
- * find task instance list by id list
- *
- * @param idList task id list
- * @return task instance list
- */
- @Override
- public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
- if (CollectionUtils.isEmpty(idList)) {
- return new ArrayList<>();
- }
- return taskInstanceMapper.selectBatchIds(idList);
- }
-
/**
* package task instance
*/
@@ -1514,7 +1349,7 @@ public class ProcessServiceImpl implements ProcessService {
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processInstance.getProcessDefinition());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
- TaskDefinition taskDefinition = this.findTaskDefinition(
+ TaskDefinition taskDefinition = taskDefinitionDao.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
this.updateTaskDefinitionResources(taskDefinition);
@@ -1603,69 +1438,6 @@ public class ProcessServiceImpl implements ProcessService {
return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.getCode());
}
- /**
- * find valid task list by process definition id
- *
- * @param processInstanceId processInstanceId
- * @param testFlag testFlag
- * @return task instance list
- */
- @Override
- public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag) {
- return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag);
- }
-
- /**
- * find previous task list by work process id
- *
- * @param processInstanceId processInstanceId
- * @return task instance list
- */
- @Override
- public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) {
- ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
- return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO,
- processInstance.getTestFlag());
- }
-
- /**
- * update work process instance map
- *
- * @param processInstanceMap processInstanceMap
- * @return update process instance result
- */
- @Override
- public int updateWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
- return processInstanceMapMapper.updateById(processInstanceMap);
- }
-
- /**
- * create work process instance map
- *
- * @param processInstanceMap processInstanceMap
- * @return create process instance result
- */
- @Override
- public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) {
- int count = 0;
- if (processInstanceMap != null) {
- return processInstanceMapMapper.insert(processInstanceMap);
- }
- return count;
- }
-
- /**
- * find work process map by parent process id and parent task id.
- *
- * @param parentWorkProcessId parentWorkProcessId
- * @param parentTaskId parentTaskId
- * @return process instance map
- */
- @Override
- public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
- return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
- }
-
/**
* delete work process map by parent process id
*
@@ -1861,18 +1633,6 @@ public class ProcessServiceImpl implements ProcessService {
commandService.createCommand(cmd);
}
- /**
- * query all need failover task instances by host
- *
- * @param host host
- * @return task instance list
- */
- @Override
- public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
- return taskInstanceMapper.queryByHostAndStatus(host,
- TaskExecutionStatus.getNeedFailoverWorkflowInstanceState());
- }
-
/**
* find data source by id
*
@@ -2252,8 +2012,8 @@ public class ProcessServiceImpl implements ProcessService {
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList().stream()
- .filter(t -> t.getId() != null)
.map(ResourceInfo::getId)
+ .filter(Objects::nonNull)
.collect(toSet());
}
if (CollectionUtils.isEmpty(resourceIds)) {
@@ -2477,57 +2237,12 @@ public class ProcessServiceImpl implements ProcessService {
public DagData genDagData(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> taskRelations =
this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
- List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(taskRelations);
+ List<TaskDefinitionLog> taskDefinitionLogList = taskDefinitionLogDao.getTaskDefineLogList(taskRelations);
List<TaskDefinition> taskDefinitions =
taskDefinitionLogList.stream().map(t -> (TaskDefinition) t).collect(Collectors.toList());
return new DagData(processDefinition, taskRelations, taskDefinitions);
}
- @Override
- public List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations) {
- Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
- for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
- if (processTaskRelation.getPreTaskCode() > 0) {
- taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(),
- processTaskRelation.getPreTaskVersion()));
- }
- if (processTaskRelation.getPostTaskCode() > 0) {
- taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(),
- processTaskRelation.getPostTaskVersion()));
- }
- }
- if (taskDefinitionSet.isEmpty()) {
- return Lists.newArrayList();
- }
- return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
- }
-
- @Override
- public List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
- List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
- Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
- for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
- if (processTaskRelation.getPreTaskCode() > 0) {
- taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
- }
- if (processTaskRelation.getPostTaskCode() > 0) {
- taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
- }
- }
- taskCodeVersionMap.forEach((code, version) -> {
- taskDefinitionLogs.add((TaskDefinitionLog) this.findTaskDefinition(code, version));
- });
- return taskDefinitionLogs;
- }
-
- /**
- * find task definition by code and version
- */
- @Override
- public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) {
- return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
- }
-
/**
* find process task relation list by process
*/
@@ -2571,7 +2286,7 @@ public class ProcessServiceImpl implements ProcessService {
});
}
if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
- taskDefinitionLogs = genTaskDefineList(taskRelationList);
+ taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelationList);
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
@@ -2626,7 +2341,7 @@ public class ProcessServiceImpl implements ProcessService {
return processTaskMap;
}
ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId());
- TaskInstance fatherTask = this.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId());
+ TaskInstance fatherTask = taskInstanceDao.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId());
if (fatherProcess != null) {
processTaskMap.put(fatherProcess, fatherTask);
@@ -2936,12 +2651,13 @@ public class ProcessServiceImpl implements ProcessService {
if (processInstance != null
&& (processInstance.getState().isFailure() || processInstance.getState().isStop())) {
List<TaskInstance> validTaskList =
- findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
+ taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(),
+ processInstance.getTestFlag());
List<Long> instanceTaskCodeList =
validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
List<ProcessTaskRelation> taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
- List<TaskDefinitionLog> taskDefinitionLogs = genTaskDefineList(taskRelations);
+ List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.getTaskDefineLogList(taskRelations);
List<Long> definiteTaskCodeList =
taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES)
.map(TaskDefinitionLog::getCode).collect(Collectors.toList());
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 96c153ec10..5ad590a24f 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -66,6 +66,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
@@ -121,6 +124,16 @@ public class ProcessServiceTest {
private ProcessInstanceMapper processInstanceMapper;
@Mock
private ProcessInstanceDao processInstanceDao;
+
+ @Mock
+ private TaskInstanceDao taskInstanceDao;
+
+ @Mock
+ private TaskDefinitionLogDao taskDefinitionLogDao;
+
+ @Mock
+ private TaskDefinitionDao taskDefinitionDao;
+
@Mock
private UserMapper userMapper;
@Mock
@@ -652,7 +665,7 @@ public class ProcessServiceTest {
taskDefinitionLogs.add(taskDefinition);
taskDefinitionLogs.add(td2);
- Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs);
+ Mockito.when(taskDefinitionLogDao.getTaskDefineLogList(any())).thenReturn(taskDefinitionLogs);
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(list);
@@ -729,23 +742,6 @@ public class ProcessServiceTest {
}
- @Test
- public void testFindTaskInstanceByIdList() {
- List<Integer> emptyList = new ArrayList<>();
- Mockito.when(taskInstanceMapper.selectBatchIds(emptyList)).thenReturn(new ArrayList<>());
- Assertions.assertEquals(0, processService.findTaskInstanceByIdList(emptyList).size());
-
- List<Integer> idList = Collections.singletonList(1);
- TaskInstance instance = new TaskInstance();
- instance.setId(1);
-
- Mockito.when(taskInstanceMapper.selectBatchIds(idList)).thenReturn(Collections.singletonList(instance));
- List<TaskInstance> taskInstanceByIdList = processService.findTaskInstanceByIdList(idList);
-
- Assertions.assertEquals(1, taskInstanceByIdList.size());
- Assertions.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
- }
-
@Test
public void testFindLastManualProcessInterval() {
long definitionCode = 1L;