You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2023/03/29 10:18:13 UTC
[dolphinscheduler] branch dev updated: [improvement] support self-dependent (#13818)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9d9ae9ad54 [improvement] support self-dependent (#13818)
9d9ae9ad54 is described below
commit 9d9ae9ad540a9f2a8bf93345d13418ef417f7227
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Wed Mar 29 18:18:03 2023 +0800
[improvement] support self-dependent (#13818)
---
.../dao/mapper/ProcessInstanceMapper.java | 16 +++++
.../dao/repository/ProcessInstanceDao.java | 48 ++++++++++++++
.../repository/impl/ProcessInstanceDaoImpl.java | 73 ++++++++++++++++++++++
.../dao/mapper/ProcessInstanceMapper.xml | 19 ++++++
.../master/runner/task/DependentTaskProcessor.java | 3 +-
.../server/master/utils/DependentExecute.java | 70 ++++++++++++++++++---
.../server/master/DependentTaskTest.java | 60 +++++++++++++++---
.../service/process/ProcessService.java | 8 ---
.../service/process/ProcessServiceImpl.java | 49 ---------------
.../service/process/ProcessServiceTest.java | 19 ------
10 files changed, 273 insertions(+), 92 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index 7807998b16..ea9fe453b8 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -227,6 +227,22 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("endTime") Date endTime,
@Param("testFlag") int testFlag);
+ /**
+ * query first schedule process instance
+ *
+ * @param definitionCode definitionCode
+ * @return process instance
+ */
+ ProcessInstance queryFirstScheduleProcessInstance(@Param("processDefinitionCode") Long definitionCode);
+
+ /**
+ * query first manual process instance
+ *
+ * @param definitionCode definitionCode
+ * @return process instance
+ */
+ ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode);
+
/**
* query top n process instance order by running duration
*
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index b249e265e7..81064e7f84 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -18,7 +18,11 @@
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.Date;
import java.util.List;
public interface ProcessInstanceDao {
@@ -39,4 +43,48 @@ public interface ProcessInstanceDao {
void deleteById(Integer workflowInstanceId);
ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId);
+
+ /**
+ * find last scheduler process instance in the date interval
+ *
+ * @param definitionCode definitionCode
+ * @param dateInterval dateInterval
+ * @return process instance
+ */
+ ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
+
+ /**
+ * find last manual process instance interval
+ *
+ * @param definitionCode process definition code
+ * @param dateInterval dateInterval
+ * @return process instance
+ */
+ ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
+
+ /**
+ * find last running process instance
+ *
+ * @param definitionCode process definition code
+ * @param startTime start time
+ * @param endTime end time
+ * @return process instance
+ */
+ ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag);
+
+ /**
+ * query first schedule process instance
+ *
+ * @param definitionCode definitionCode
+ * @return process instance
+ */
+ ProcessInstance queryFirstScheduleProcessInstance(@Param("processDefinitionCode") Long definitionCode);
+
+ /**
+ * query first manual process instance
+ *
+ * @param definitionCode definitionCode
+ * @return process instance
+ */
+ ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index ae36ca924e..d3d6f66280 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -17,12 +17,15 @@
package org.apache.dolphinscheduler.dao.repository.impl;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.commons.collections4.CollectionUtils;
+import java.util.Date;
import java.util.List;
import lombok.NonNull;
@@ -74,4 +77,74 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
public ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId) {
return processInstanceMapper.selectById(workflowInstanceId);
}
+
+ /**
+ * find last scheduler process instance in the date interval
+ *
+ * @param definitionCode definitionCode
+ * @param dateInterval dateInterval
+ * @return process instance
+ */
+ @Override
+ public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval,
+ int testFlag) {
+ return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
+ dateInterval.getStartTime(),
+ dateInterval.getEndTime(),
+ testFlag);
+ }
+
+ /**
+ * find last manual process instance interval
+ *
+ * @param definitionCode process definition code
+ * @param dateInterval dateInterval
+ * @return process instance
+ */
+ @Override
+ public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
+ return processInstanceMapper.queryLastManualProcess(definitionCode,
+ dateInterval.getStartTime(),
+ dateInterval.getEndTime(),
+ testFlag);
+ }
+
+ /**
+ * find last running process instance
+ *
+ * @param definitionCode process definition code
+ * @param startTime start time
+ * @param endTime end time
+ * @return process instance
+ */
+ @Override
+ public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag) {
+ return processInstanceMapper.queryLastRunningProcess(definitionCode,
+ startTime,
+ endTime,
+ testFlag,
+ WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
+ }
+
+ /**
+ * query first schedule process instance
+ *
+ * @param definitionCode definitionCode
+ * @return process instance
+ */
+ @Override
+ public ProcessInstance queryFirstScheduleProcessInstance(Long definitionCode) {
+ return processInstanceMapper.queryFirstScheduleProcessInstance(definitionCode);
+ }
+
+ /**
+ * query first manual process instance
+ *
+ * @param definitionCode definitionCode
+ * @return process instance
+ */
+ @Override
+ public ProcessInstance queryFirstStartProcessInstance(Long definitionCode) {
+ return processInstanceMapper.queryFirstStartProcessInstance(definitionCode);
+ }
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 7c64c035c6..77f7b4e3ee 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -242,6 +242,25 @@
</if>
order by end_time desc limit 1
</select>
+
+ <select id="queryFirstScheduleProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_instance
+ where process_definition_code = #{processDefinitionCode} and schedule_time is not null
+ order by schedule_time
+ limit 1
+ </select>
+
+ <select id="queryFirstStartProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_instance
+ where process_definition_code = #{processDefinitionCode} and start_time is not null
+ order by start_time
+ limit 1
+ </select>
+
<select id="queryByProcessDefineCodeAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 5b97a88696..372305c996 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -222,7 +222,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
dependentItem.getKey());
}
}
- this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
+ this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation(),
+ processInstance, taskInstance));
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
index bdf6a2957f..38a71d00ec 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.utils;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
@@ -28,7 +29,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Date;
@@ -36,15 +36,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
/**
* dependent item execute
*/
+@Slf4j
public class DependentExecute {
- /**
- * process service
- */
- private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
+ private final ProcessInstanceDao processInstanceDao = SpringApplicationContext.getBean(ProcessInstanceDao.class);
private final TaskInstanceDao taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
@@ -58,6 +58,10 @@ public class DependentExecute {
*/
private DependentRelation relation;
+ private ProcessInstance processInstance;
+
+ private TaskInstance taskInstance;
+
/**
* depend result
*/
@@ -74,9 +78,12 @@ public class DependentExecute {
* @param itemList item list
* @param relation relation
*/
- public DependentExecute(List<DependentItem> itemList, DependentRelation relation) {
+ public DependentExecute(List<DependentItem> itemList, DependentRelation relation, ProcessInstance processInstance,
+ TaskInstance taskInstance) {
this.dependItemList = itemList;
this.relation = relation;
+ this.processInstance = processInstance;
+ this.taskInstance = taskInstance;
}
/**
@@ -185,10 +192,10 @@ public class DependentExecute {
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
ProcessInstance lastSchedulerProcess =
- processService.findLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
+ processInstanceDao.findLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
ProcessInstance lastManualProcess =
- processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
+ processInstanceDao.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
if (lastManualProcess == null) {
return lastSchedulerProcess;
@@ -243,6 +250,15 @@ public class DependentExecute {
List<DependResult> dependResultList = new ArrayList<>();
for (DependentItem dependentItem : dependItemList) {
+ if (isSelfDependent(dependentItem) && isFirstProcessInstance(dependentItem)) {
+ // if self-dependent, default success at first time
+ dependResultMap.put(dependentItem.getKey(), DependResult.SUCCESS);
+ dependResultList.add(DependResult.SUCCESS);
+ log.info(
+ "This dependent item is self-dependent and run at first time, default success, processDefinitionCode:{}, depTaskCode:{}",
+ dependentItem.getDefinitionCode(), dependentItem.getDepTaskCode());
+ continue;
+ }
DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag);
if (dependResult != DependResult.WAITING) {
dependResultMap.put(dependentItem.getKey(), dependResult);
@@ -272,4 +288,42 @@ public class DependentExecute {
return dependResultMap;
}
+ /**
+ * check for self-dependent
+ * @param dependentItem
+ * @return
+ */
+ public boolean isSelfDependent(DependentItem dependentItem) {
+ if (processInstance.getProcessDefinitionCode().equals(dependentItem.getDefinitionCode())) {
+ if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
+ return true;
+ }
+ if (dependentItem.getDepTaskCode() == taskInstance.getTaskCode()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * check for first-running
+ * query the first processInstance by scheduleTime(or startTime if scheduleTime is null)
+ * @param dependentItem
+ * @return
+ */
+ public boolean isFirstProcessInstance(DependentItem dependentItem) {
+ ProcessInstance firstProcessInstance =
+ processInstanceDao.queryFirstScheduleProcessInstance(dependentItem.getDefinitionCode());
+ if (firstProcessInstance == null) {
+ firstProcessInstance = processInstanceDao.queryFirstStartProcessInstance(dependentItem.getDefinitionCode());
+ if (firstProcessInstance == null) {
+ log.warn("First process instance is null, processDefinitionCode:{}", dependentItem.getDefinitionCode());
+ return false;
+ }
+ }
+ if (firstProcessInstance.getId() == processInstance.getId()) {
+ return true;
+ }
+ return false;
+ }
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index 0cfed4545c..5a5f04a474 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@@ -34,14 +35,17 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.utils.DependentExecute;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -72,6 +76,8 @@ public class DependentTaskTest {
private ProcessService processService;
+ private ProcessInstanceDao processInstanceDao;
+
private TaskInstanceDao taskInstanceDao;
private TaskDefinitionDao taskDefinitionDao;
@@ -101,6 +107,9 @@ public class DependentTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+ processInstanceDao = Mockito.mock(ProcessInstanceDao.class);
+ Mockito.when(applicationContext.getBean(ProcessInstanceDao.class)).thenReturn(processInstanceDao);
+
taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
@@ -161,7 +170,7 @@ public class DependentTaskTest {
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.FAILURE);
// for DependentExecute.findLastProcessInterval
- Mockito.when(processService
+ Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(dependentProcessInstance);
@@ -183,7 +192,7 @@ public class DependentTaskTest {
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
- Mockito.when(processService
+ Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(dependentProcessInstance);
@@ -231,10 +240,10 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(300, WorkflowExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
- Mockito.when(processService
+ Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(processInstance200);
- Mockito.when(processService
+ Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(3L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(processInstance300);
@@ -281,7 +290,7 @@ public class DependentTaskTest {
public void testDependentOnAllSuccess() {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
- Mockito.when(processService
+ Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.SUCCESS));
@@ -291,7 +300,7 @@ public class DependentTaskTest {
public void testDependentOnAllFailure() {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
- Mockito.when(processService
+ Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.FAILURE));
@@ -325,7 +334,7 @@ public class DependentTaskTest {
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.RUNNING_EXECUTION);
// for DependentExecute.findLastProcessInterval
- Mockito.when(processService
+ Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(dependentProcessInstance);
@@ -345,9 +354,45 @@ public class DependentTaskTest {
}
+ @Test
+ public void testIsSelfDependent() {
+ DependentExecute dependentExecute =
+ new DependentExecute(new ArrayList<>(), DependentRelation.AND, processInstance, taskInstance);
+ DependentItem dependentItem = new DependentItem();
+ dependentItem.setDefinitionCode(processInstance.getProcessDefinitionCode());
+ dependentItem.setDepTaskCode(Constants.DEPENDENT_ALL_TASK_CODE);
+ Assertions.assertTrue(dependentExecute.isSelfDependent(dependentItem));
+
+ dependentItem.setDepTaskCode(taskInstance.getTaskCode());
+ Assertions.assertTrue(dependentExecute.isSelfDependent(dependentItem));
+
+ // no self task
+ dependentItem.setDepTaskCode(12345678);
+ Assertions.assertFalse(dependentExecute.isSelfDependent(dependentItem));
+
+ // no self wf
+ dependentItem.setDefinitionCode(processInstance.getProcessDefinitionCode());
+ Assertions.assertFalse(dependentExecute.isSelfDependent(dependentItem));
+ }
+
+ @Test
+ public void testIsFirstProcessInstance() {
+ Mockito.when(processInstanceDao.queryFirstScheduleProcessInstance(processInstance.getProcessDefinitionCode()))
+ .thenReturn(processInstance);
+ DependentExecute dependentExecute =
+ new DependentExecute(new ArrayList<>(), DependentRelation.AND, processInstance, taskInstance);
+ DependentItem dependentItem = new DependentItem();
+ dependentItem.setDefinitionCode(processInstance.getProcessDefinitionCode());
+ Assertions.assertTrue(dependentExecute.isFirstProcessInstance(dependentItem));
+
+ dependentItem.setDefinitionCode(12345678L);
+ Assertions.assertFalse(dependentExecute.isFirstProcessInstance(dependentItem));
+ }
+
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(100);
+ processInstance.setProcessDefinitionCode(10000L);
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
return processInstance;
}
@@ -355,6 +400,7 @@ public class DependentTaskTest {
private TaskInstance getTaskInstance() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000);
+ taskInstance.setTaskCode(10000L);
return taskInstance;
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 80bb6e3711..bfacd6fc0e 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -49,12 +49,10 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -136,12 +134,6 @@ public interface ProcessService {
List<Schedule> selectAllByProcessDefineCode(long[] codes);
- ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
-
- ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
-
- ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag);
-
String queryUserQueueByProcessInstance(ProcessInstance processInstance);
ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 281c57e2fc..676207c3bf 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -118,7 +118,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
-import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@@ -1683,54 +1682,6 @@ public class ProcessServiceImpl implements ProcessService {
return scheduleMapper.selectAllByProcessDefineArray(codes);
}
- /**
- * find last scheduler process instance in the date interval
- *
- * @param definitionCode definitionCode
- * @param dateInterval dateInterval
- * @return process instance
- */
- @Override
- public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval,
- int testFlag) {
- return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
- dateInterval.getStartTime(),
- dateInterval.getEndTime(),
- testFlag);
- }
-
- /**
- * find last manual process instance interval
- *
- * @param definitionCode process definition code
- * @param dateInterval dateInterval
- * @return process instance
- */
- @Override
- public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
- return processInstanceMapper.queryLastManualProcess(definitionCode,
- dateInterval.getStartTime(),
- dateInterval.getEndTime(),
- testFlag);
- }
-
- /**
- * find last running process instance
- *
- * @param definitionCode process definition code
- * @param startTime start time
- * @param endTime end time
- * @return process instance
- */
- @Override
- public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag) {
- return processInstanceMapper.queryLastRunningProcess(definitionCode,
- startTime,
- endTime,
- testFlag,
- WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
- }
-
/**
* query user queue by process instance
*
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index fdaf75ab90..f2b7dbe489 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -76,7 +76,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType;
-import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.service.cron.CronUtilsTest;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
@@ -750,24 +749,6 @@ public class ProcessServiceTest {
}
- @Test
- public void testFindLastManualProcessInterval() {
- long definitionCode = 1L;
- DateInterval dateInterval = new DateInterval(new Date(), new Date());
- int testFlag = 1;
-
- // find test lastManualProcessInterval
- ProcessInstance lastManualProcessInterval =
- processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
- Assertions.assertNull(lastManualProcessInterval);
-
- // find online lastManualProcessInterval
- testFlag = 0;
- lastManualProcessInterval =
- processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
- Assertions.assertNull(lastManualProcessInterval);
- }
-
@Test
public void testQueryTestDataSourceId() {
Integer onlineDataSourceId = 1;