You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/05/03 14:52:53 UTC
[dolphinscheduler] branch dev updated: [Fix-9868] A task flow definition isolates the runs of different execution strategies by version numbers. (#9869)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new fb0f96ed94 [Fix-9868] A task flow definition isolates the runs of different execution strategies by version numbers. (#9869)
fb0f96ed94 is described below
commit fb0f96ed946c89323ba347eb9d538a22b9be6e73
Author: WangJPLeo <10...@users.noreply.github.com>
AuthorDate: Tue May 3 22:52:42 2022 +0800
[Fix-9868] A task flow definition isolates the runs of different execution strategies by version numbers. (#9869)
* The thread cache task flow definition should get the latest version.
* Coverage on New Code
* Coverage on New Code
* Coverage on New Code
* use an existing method.
* Increase unit test coverage.
* Task flow definitions enforce policy isolation.
---
.../dolphinscheduler/dao/mapper/ProcessInstanceMapper.java | 3 ++-
.../dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml | 3 ++-
.../server/master/runner/WorkflowExecuteThread.java | 2 +-
.../dolphinscheduler/service/process/ProcessServiceImpl.java | 12 ++++++------
.../dolphinscheduler/service/process/ProcessServiceTest.java | 6 +++---
5 files changed, 14 insertions(+), 12 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index 1a639df8b4..8c0abf3dce 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -247,7 +247,8 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
List<ProcessInstance> queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("states") int[] states);
- List<ProcessInstance> queryByProcessDefineCodeAndStatusAndNextId(@Param("processDefinitionCode") Long processDefinitionCode,
+ List<ProcessInstance> queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(@Param("processDefinitionCode") Long processDefinitionCode,
+ @Param("processDefinitionVersion") int processDefinitionVersion,
@Param("states") int[] states, @Param("id") int id);
int updateGlobalParamsById(@Param("globalParams") String globalParams,
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 2e49ce9b5b..d77e3cc4d8 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -231,11 +231,12 @@
</foreach>
order by id asc
</select>
- <select id="queryByProcessDefineCodeAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
+ <select id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
+ and process_definition_version = #{processDefinitionVersion}
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 1db5d81b6d..b7af67e675 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -802,7 +802,7 @@ public class WorkflowExecuteThread {
/**
* process end handle
*/
- private void endProcess() {
+ public void endProcess() {
this.stateEvents.clear();
if (processDefinition.getExecutionType().typeIsSerialWait()) {
checkSerialProcess(processDefinition);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 58183f2c88..53c20a5ae8 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -313,8 +313,8 @@ public class ProcessServiceImpl implements ProcessService {
//when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
while (true) {
- List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
@@ -326,15 +326,15 @@ public class ProcessServiceImpl implements ProcessService {
}
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
- List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.STOP);
saveProcessInstance(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
- List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
for (ProcessInstance info : runningProcessInstances) {
info.setCommandType(CommandType.STOP);
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 22575681e1..47bdca17d1 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -398,7 +398,7 @@ public class ProcessServiceTest {
command6.setCommandParam("{\"ProcessInstanceId\":223}");
command6.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command6.setProcessDefinitionVersion(1);
- Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
+ Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6);
@@ -419,7 +419,7 @@ public class ProcessServiceTest {
command7.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command7.setProcessDefinitionVersion(1);
Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
- Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
+ Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7);
Assert.assertTrue(processInstance8 == null);
@@ -441,7 +441,7 @@ public class ProcessServiceTest {
command9.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command9.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9);
- Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
+ Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9);