You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2023/03/29 10:18:13 UTC

[dolphinscheduler] branch dev updated: [improvement] support self-dependent (#13818)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9d9ae9ad54 [improvement] support self-dependent (#13818)
9d9ae9ad54 is described below

commit 9d9ae9ad540a9f2a8bf93345d13418ef417f7227
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Wed Mar 29 18:18:03 2023 +0800

    [improvement] support self-dependent (#13818)
---
 .../dao/mapper/ProcessInstanceMapper.java          | 16 +++++
 .../dao/repository/ProcessInstanceDao.java         | 48 ++++++++++++++
 .../repository/impl/ProcessInstanceDaoImpl.java    | 73 ++++++++++++++++++++++
 .../dao/mapper/ProcessInstanceMapper.xml           | 19 ++++++
 .../master/runner/task/DependentTaskProcessor.java |  3 +-
 .../server/master/utils/DependentExecute.java      | 70 ++++++++++++++++++---
 .../server/master/DependentTaskTest.java           | 60 +++++++++++++++---
 .../service/process/ProcessService.java            |  8 ---
 .../service/process/ProcessServiceImpl.java        | 49 ---------------
 .../service/process/ProcessServiceTest.java        | 19 ------
 10 files changed, 273 insertions(+), 92 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index 7807998b16..ea9fe453b8 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -227,6 +227,22 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
                                            @Param("endTime") Date endTime,
                                            @Param("testFlag") int testFlag);
 
+    /**
+     * query first schedule process instance
+     *
+     * @param definitionCode definitionCode
+     * @return process instance
+     */
+    ProcessInstance queryFirstScheduleProcessInstance(@Param("processDefinitionCode") Long definitionCode);
+
+    /**
+     * query first manual process instance
+     *
+     * @param definitionCode definitionCode
+     * @return process instance
+     */
+    ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode);
+
     /**
      * query top n process instance order by running duration
      *
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index b249e265e7..81064e7f84 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -18,7 +18,11 @@
 package org.apache.dolphinscheduler.dao.repository;
 
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
 
+import org.apache.ibatis.annotations.Param;
+
+import java.util.Date;
 import java.util.List;
 
 public interface ProcessInstanceDao {
@@ -39,4 +43,48 @@ public interface ProcessInstanceDao {
     void deleteById(Integer workflowInstanceId);
 
     ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId);
+
+    /**
+     * find last scheduler process instance in the date interval
+     *
+     * @param definitionCode definitionCode
+     * @param dateInterval   dateInterval
+     * @return process instance
+     */
+    ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
+
+    /**
+     * find last manual process instance interval
+     *
+     * @param definitionCode process definition code
+     * @param dateInterval   dateInterval
+     * @return process instance
+     */
+    ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
+
+    /**
+     * find last running process instance
+     *
+     * @param definitionCode process definition code
+     * @param startTime      start time
+     * @param endTime        end time
+     * @return process instance
+     */
+    ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag);
+
+    /**
+     * query first schedule process instance
+     *
+     * @param definitionCode definitionCode
+     * @return process instance
+     */
+    ProcessInstance queryFirstScheduleProcessInstance(@Param("processDefinitionCode") Long definitionCode);
+
+    /**
+     * query first manual process instance
+     *
+     * @param definitionCode definitionCode
+     * @return process instance
+     */
+    ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index ae36ca924e..d3d6f66280 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -17,12 +17,15 @@
 
 package org.apache.dolphinscheduler.dao.repository.impl;
 
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
 
 import org.apache.commons.collections4.CollectionUtils;
 
+import java.util.Date;
 import java.util.List;
 
 import lombok.NonNull;
@@ -74,4 +77,74 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
     public ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId) {
         return processInstanceMapper.selectById(workflowInstanceId);
     }
+
+    /**
+     * find last scheduler process instance in the date interval
+     *
+     * @param definitionCode definitionCode
+     * @param dateInterval   dateInterval
+     * @return process instance
+     */
+    @Override
+    public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval,
+                                                            int testFlag) {
+        return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
+                dateInterval.getStartTime(),
+                dateInterval.getEndTime(),
+                testFlag);
+    }
+
+    /**
+     * find last manual process instance interval
+     *
+     * @param definitionCode process definition code
+     * @param dateInterval   dateInterval
+     * @return process instance
+     */
+    @Override
+    public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
+        return processInstanceMapper.queryLastManualProcess(definitionCode,
+                dateInterval.getStartTime(),
+                dateInterval.getEndTime(),
+                testFlag);
+    }
+
+    /**
+     * find last running process instance
+     *
+     * @param definitionCode process definition code
+     * @param startTime      start time
+     * @param endTime        end time
+     * @return process instance
+     */
+    @Override
+    public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag) {
+        return processInstanceMapper.queryLastRunningProcess(definitionCode,
+                startTime,
+                endTime,
+                testFlag,
+                WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
+    }
+
+    /**
+     * query first schedule process instance
+     *
+     * @param definitionCode definitionCode
+     * @return process instance
+     */
+    @Override
+    public ProcessInstance queryFirstScheduleProcessInstance(Long definitionCode) {
+        return processInstanceMapper.queryFirstScheduleProcessInstance(definitionCode);
+    }
+
+    /**
+     * query first manual process instance
+     *
+     * @param definitionCode definitionCode
+     * @return process instance
+     */
+    @Override
+    public ProcessInstance queryFirstStartProcessInstance(Long definitionCode) {
+        return processInstanceMapper.queryFirstStartProcessInstance(definitionCode);
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 7c64c035c6..77f7b4e3ee 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -242,6 +242,25 @@
         </if>
         order by end_time desc limit 1
     </select>
+
+    <select id="queryFirstScheduleProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_instance
+        where process_definition_code = #{processDefinitionCode} and schedule_time is not null
+        order by schedule_time
+        limit 1
+    </select>
+
+    <select id="queryFirstStartProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_process_instance
+        where process_definition_code = #{processDefinitionCode} and start_time is not null
+        order by start_time
+        limit 1
+    </select>
+
     <select id="queryByProcessDefineCodeAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select
         <include refid="baseSql"/>
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 5b97a88696..372305c996 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -222,7 +222,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
                             dependentItem.getKey());
                 }
             }
-            this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
+            this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation(),
+                    processInstance, taskInstance));
         }
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
index bdf6a2957f..38a71d00ec 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.utils;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
@@ -28,7 +29,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
 import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
 import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -36,15 +36,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
+
 /**
  * dependent item execute
  */
+@Slf4j
 public class DependentExecute {
 
-    /**
-     * process service
-     */
-    private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
+    private final ProcessInstanceDao processInstanceDao = SpringApplicationContext.getBean(ProcessInstanceDao.class);
 
     private final TaskInstanceDao taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
 
@@ -58,6 +58,10 @@ public class DependentExecute {
      */
     private DependentRelation relation;
 
+    private ProcessInstance processInstance;
+
+    private TaskInstance taskInstance;
+
     /**
      * depend result
      */
@@ -74,9 +78,12 @@ public class DependentExecute {
      * @param itemList item list
      * @param relation relation
      */
-    public DependentExecute(List<DependentItem> itemList, DependentRelation relation) {
+    public DependentExecute(List<DependentItem> itemList, DependentRelation relation, ProcessInstance processInstance,
+                            TaskInstance taskInstance) {
         this.dependItemList = itemList;
         this.relation = relation;
+        this.processInstance = processInstance;
+        this.taskInstance = taskInstance;
     }
 
     /**
@@ -185,10 +192,10 @@ public class DependentExecute {
     private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
 
         ProcessInstance lastSchedulerProcess =
-                processService.findLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
+                processInstanceDao.findLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
 
         ProcessInstance lastManualProcess =
-                processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
+                processInstanceDao.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
 
         if (lastManualProcess == null) {
             return lastSchedulerProcess;
@@ -243,6 +250,15 @@ public class DependentExecute {
         List<DependResult> dependResultList = new ArrayList<>();
 
         for (DependentItem dependentItem : dependItemList) {
+            if (isSelfDependent(dependentItem) && isFirstProcessInstance(dependentItem)) {
+                // if self-dependent, default success at first time
+                dependResultMap.put(dependentItem.getKey(), DependResult.SUCCESS);
+                dependResultList.add(DependResult.SUCCESS);
+                log.info(
+                        "This dependent item is self-dependent and run at first time, default success, processDefinitionCode:{}, depTaskCode:{}",
+                        dependentItem.getDefinitionCode(), dependentItem.getDepTaskCode());
+                continue;
+            }
             DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag);
             if (dependResult != DependResult.WAITING) {
                 dependResultMap.put(dependentItem.getKey(), dependResult);
@@ -272,4 +288,42 @@ public class DependentExecute {
         return dependResultMap;
     }
 
+    /**
+     * check for self-dependent
+     * @param dependentItem
+     * @return
+     */
+    public boolean isSelfDependent(DependentItem dependentItem) {
+        if (processInstance.getProcessDefinitionCode().equals(dependentItem.getDefinitionCode())) {
+            if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
+                return true;
+            }
+            if (dependentItem.getDepTaskCode() == taskInstance.getTaskCode()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * check for first-running
+     * query the first processInstance by scheduleTime(or startTime if scheduleTime is null)
+     * @param dependentItem
+     * @return
+     */
+    public boolean isFirstProcessInstance(DependentItem dependentItem) {
+        ProcessInstance firstProcessInstance =
+                processInstanceDao.queryFirstScheduleProcessInstance(dependentItem.getDefinitionCode());
+        if (firstProcessInstance == null) {
+            firstProcessInstance = processInstanceDao.queryFirstStartProcessInstance(dependentItem.getDefinitionCode());
+            if (firstProcessInstance == null) {
+                log.warn("First process instance is null, processDefinitionCode:{}", dependentItem.getDefinitionCode());
+                return false;
+            }
+        }
+        if (firstProcessInstance.getId() == processInstance.getId()) {
+            return true;
+        }
+        return false;
+    }
 }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index 0cfed4545c..5a5f04a474 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@@ -34,14 +35,17 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
 import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.utils.DependentExecute;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.model.TaskNode;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -72,6 +76,8 @@ public class DependentTaskTest {
 
     private ProcessService processService;
 
+    private ProcessInstanceDao processInstanceDao;
+
     private TaskInstanceDao taskInstanceDao;
 
     private TaskDefinitionDao taskDefinitionDao;
@@ -101,6 +107,9 @@ public class DependentTaskTest {
         processService = Mockito.mock(ProcessService.class);
         Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
+        processInstanceDao = Mockito.mock(ProcessInstanceDao.class);
+        Mockito.when(applicationContext.getBean(ProcessInstanceDao.class)).thenReturn(processInstanceDao);
+
         taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
         Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
 
@@ -161,7 +170,7 @@ public class DependentTaskTest {
         ProcessInstance dependentProcessInstance =
                 getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.FAILURE);
         // for DependentExecute.findLastProcessInterval
-        Mockito.when(processService
+        Mockito.when(processInstanceDao
                 .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
                 .thenReturn(dependentProcessInstance);
 
@@ -183,7 +192,7 @@ public class DependentTaskTest {
         ProcessInstance dependentProcessInstance =
                 getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.SUCCESS);
         // for DependentExecute.findLastProcessInterval
-        Mockito.when(processService
+        Mockito.when(processInstanceDao
                 .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
                 .thenReturn(dependentProcessInstance);
 
@@ -231,10 +240,10 @@ public class DependentTaskTest {
                 getProcessInstanceForFindLastRunningProcess(300, WorkflowExecutionStatus.SUCCESS);
 
         // for DependentExecute.findLastProcessInterval
-        Mockito.when(processService
+        Mockito.when(processInstanceDao
                 .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
                 .thenReturn(processInstance200);
-        Mockito.when(processService
+        Mockito.when(processInstanceDao
                 .findLastRunningProcess(Mockito.eq(3L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
                 .thenReturn(processInstance300);
 
@@ -281,7 +290,7 @@ public class DependentTaskTest {
     public void testDependentOnAllSuccess() {
         testDependentOnAllInit();
         // for DependentExecute.findLastProcessInterval
-        Mockito.when(processService
+        Mockito.when(processInstanceDao
                 .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
                 .thenReturn(getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.SUCCESS));
 
@@ -291,7 +300,7 @@ public class DependentTaskTest {
     public void testDependentOnAllFailure() {
         testDependentOnAllInit();
         // for DependentExecute.findLastProcessInterval
-        Mockito.when(processService
+        Mockito.when(processInstanceDao
                 .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
                 .thenReturn(getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.FAILURE));
 
@@ -325,7 +334,7 @@ public class DependentTaskTest {
         ProcessInstance dependentProcessInstance =
                 getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.RUNNING_EXECUTION);
         // for DependentExecute.findLastProcessInterval
-        Mockito.when(processService
+        Mockito.when(processInstanceDao
                 .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
                 .thenReturn(dependentProcessInstance);
 
@@ -345,9 +354,45 @@ public class DependentTaskTest {
 
     }
 
+    @Test
+    public void testIsSelfDependent() {
+        DependentExecute dependentExecute =
+                new DependentExecute(new ArrayList<>(), DependentRelation.AND, processInstance, taskInstance);
+        DependentItem dependentItem = new DependentItem();
+        dependentItem.setDefinitionCode(processInstance.getProcessDefinitionCode());
+        dependentItem.setDepTaskCode(Constants.DEPENDENT_ALL_TASK_CODE);
+        Assertions.assertTrue(dependentExecute.isSelfDependent(dependentItem));
+
+        dependentItem.setDepTaskCode(taskInstance.getTaskCode());
+        Assertions.assertTrue(dependentExecute.isSelfDependent(dependentItem));
+
+        // no self task
+        dependentItem.setDepTaskCode(12345678);
+        Assertions.assertFalse(dependentExecute.isSelfDependent(dependentItem));
+
+        // no self wf
+        dependentItem.setDefinitionCode(processInstance.getProcessDefinitionCode());
+        Assertions.assertFalse(dependentExecute.isSelfDependent(dependentItem));
+    }
+
+    @Test
+    public void testIsFirstProcessInstance() {
+        Mockito.when(processInstanceDao.queryFirstScheduleProcessInstance(processInstance.getProcessDefinitionCode()))
+                .thenReturn(processInstance);
+        DependentExecute dependentExecute =
+                new DependentExecute(new ArrayList<>(), DependentRelation.AND, processInstance, taskInstance);
+        DependentItem dependentItem = new DependentItem();
+        dependentItem.setDefinitionCode(processInstance.getProcessDefinitionCode());
+        Assertions.assertTrue(dependentExecute.isFirstProcessInstance(dependentItem));
+
+        dependentItem.setDefinitionCode(12345678L);
+        Assertions.assertFalse(dependentExecute.isFirstProcessInstance(dependentItem));
+    }
+
     private ProcessInstance getProcessInstance() {
         ProcessInstance processInstance = new ProcessInstance();
         processInstance.setId(100);
+        processInstance.setProcessDefinitionCode(10000L);
         processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
         return processInstance;
     }
@@ -355,6 +400,7 @@ public class DependentTaskTest {
     private TaskInstance getTaskInstance() {
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setId(1000);
+        taskInstance.setTaskCode(10000L);
         return taskInstance;
     }
 
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 80bb6e3711..bfacd6fc0e 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -49,12 +49,10 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
 import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.model.TaskNode;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -136,12 +134,6 @@ public interface ProcessService {
 
     List<Schedule> selectAllByProcessDefineCode(long[] codes);
 
-    ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
-
-    ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
-
-    ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag);
-
     String queryUserQueueByProcessInstance(ProcessInstance processInstance);
 
     ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 281c57e2fc..676207c3bf 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -118,7 +118,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
-import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@@ -1683,54 +1682,6 @@ public class ProcessServiceImpl implements ProcessService {
         return scheduleMapper.selectAllByProcessDefineArray(codes);
     }
 
-    /**
-     * find last scheduler process instance in the date interval
-     *
-     * @param definitionCode definitionCode
-     * @param dateInterval   dateInterval
-     * @return process instance
-     */
-    @Override
-    public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval,
-                                                            int testFlag) {
-        return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
-                dateInterval.getStartTime(),
-                dateInterval.getEndTime(),
-                testFlag);
-    }
-
-    /**
-     * find last manual process instance interval
-     *
-     * @param definitionCode process definition code
-     * @param dateInterval   dateInterval
-     * @return process instance
-     */
-    @Override
-    public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
-        return processInstanceMapper.queryLastManualProcess(definitionCode,
-                dateInterval.getStartTime(),
-                dateInterval.getEndTime(),
-                testFlag);
-    }
-
-    /**
-     * find last running process instance
-     *
-     * @param definitionCode process definition code
-     * @param startTime      start time
-     * @param endTime        end time
-     * @return process instance
-     */
-    @Override
-    public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag) {
-        return processInstanceMapper.queryLastRunningProcess(definitionCode,
-                startTime,
-                endTime,
-                testFlag,
-                WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
-    }
-
     /**
      * query user queue by process instance
      *
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index fdaf75ab90..f2b7dbe489 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -76,7 +76,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType;
-import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.service.cron.CronUtilsTest;
 import org.apache.dolphinscheduler.service.exceptions.CronParseException;
@@ -750,24 +749,6 @@ public class ProcessServiceTest {
 
     }
 
-    @Test
-    public void testFindLastManualProcessInterval() {
-        long definitionCode = 1L;
-        DateInterval dateInterval = new DateInterval(new Date(), new Date());
-        int testFlag = 1;
-
-        // find test lastManualProcessInterval
-        ProcessInstance lastManualProcessInterval =
-                processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
-        Assertions.assertNull(lastManualProcessInterval);
-
-        // find online lastManualProcessInterval
-        testFlag = 0;
-        lastManualProcessInterval =
-                processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
-        Assertions.assertNull(lastManualProcessInterval);
-    }
-
     @Test
     public void testQueryTestDataSourceId() {
         Integer onlineDataSourceId = 1;