You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/05/03 14:52:53 UTC

[dolphinscheduler] branch dev updated: [Fix-9868] A task flow definition isolates the runs of different execution strategies by version numbers. (#9869)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new fb0f96ed94 [Fix-9868] A task flow definition isolates the runs of different execution strategies by version numbers. (#9869)
fb0f96ed94 is described below

commit fb0f96ed946c89323ba347eb9d538a22b9be6e73
Author: WangJPLeo <10...@users.noreply.github.com>
AuthorDate: Tue May 3 22:52:42 2022 +0800

    [Fix-9868] A task flow definition isolates the runs of different execution strategies by version numbers. (#9869)
    
    * The thread cache task flow definition should get the latest version.
    
    * Coverage on New Code
    
    * Coverage on New Code
    
    * Coverage on New Code
    
    * use an existing method.
    
    * Increase unit test coverage.
    
    * Task flow definitions enforce policy isolation.
---
 .../dolphinscheduler/dao/mapper/ProcessInstanceMapper.java   |  3 ++-
 .../dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml    |  3 ++-
 .../server/master/runner/WorkflowExecuteThread.java          |  2 +-
 .../dolphinscheduler/service/process/ProcessServiceImpl.java | 12 ++++++------
 .../dolphinscheduler/service/process/ProcessServiceTest.java |  6 +++---
 5 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index 1a639df8b4..8c0abf3dce 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -247,7 +247,8 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
     List<ProcessInstance> queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long processDefinitionCode,
                                                             @Param("states") int[] states);
 
-    List<ProcessInstance> queryByProcessDefineCodeAndStatusAndNextId(@Param("processDefinitionCode") Long processDefinitionCode,
+    List<ProcessInstance> queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(@Param("processDefinitionCode") Long processDefinitionCode,
+                                                                               @Param("processDefinitionVersion") int processDefinitionVersion,
                                                                      @Param("states") int[] states, @Param("id") int id);
 
     int updateGlobalParamsById(@Param("globalParams") String globalParams,
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 2e49ce9b5b..d77e3cc4d8 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -231,11 +231,12 @@
         </foreach>
         order by id asc
     </select>
-    <select id="queryByProcessDefineCodeAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+    <select id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select
         <include refid="baseSql"/>
         from t_ds_process_instance
         where process_definition_code=#{processDefinitionCode}
+          and process_definition_version = #{processDefinitionVersion}
         and state in
         <foreach collection="states" item="i" open="(" close=")" separator=",">
             #{i}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 1db5d81b6d..b7af67e675 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -802,7 +802,7 @@ public class WorkflowExecuteThread {
     /**
      * process end handle
      */
-    private void endProcess() {
+    public void endProcess() {
         this.stateEvents.clear();
         if (processDefinition.getExecutionType().typeIsSerialWait()) {
             checkSerialProcess(processDefinition);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 58183f2c88..53c20a5ae8 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -313,8 +313,8 @@ public class ProcessServiceImpl implements ProcessService {
         //when we get the running instance(or waiting instance) only get the priority instance(by id)
         if (processDefinition.getExecutionType().typeIsSerialWait()) {
             while (true) {
-                List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
-                    Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+                List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+                        processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
                 if (CollectionUtils.isEmpty(runningProcessInstances)) {
                     processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
                     saveProcessInstance(processInstance);
@@ -326,15 +326,15 @@ public class ProcessServiceImpl implements ProcessService {
                 }
             }
         } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
-            List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
-                Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+            List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+                    processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
             if (CollectionUtils.isEmpty(runningProcessInstances)) {
                 processInstance.setState(ExecutionStatus.STOP);
                 saveProcessInstance(processInstance);
             }
         } else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
-            List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
-                Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+            List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+                    processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
             if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
                 for (ProcessInstance info : runningProcessInstances) {
                     info.setCommandType(CommandType.STOP);
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 22575681e1..47bdca17d1 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -398,7 +398,7 @@ public class ProcessServiceTest {
         command6.setCommandParam("{\"ProcessInstanceId\":223}");
         command6.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
         command6.setProcessDefinitionVersion(1);
-        Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
+        Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
         Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
         Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
         ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6);
@@ -419,7 +419,7 @@ public class ProcessServiceTest {
         command7.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
         command7.setProcessDefinitionVersion(1);
         Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
-        Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
+        Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
         ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7);
         Assert.assertTrue(processInstance8 == null);
 
@@ -441,7 +441,7 @@ public class ProcessServiceTest {
         command9.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
         command9.setProcessDefinitionVersion(1);
         Mockito.when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9);
-        Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
+        Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
         Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
         Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
         ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9);