You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2021/08/03 03:54:47 UTC
[dolphinscheduler] branch json_split_two updated: [Fix-5519] The
t_ds_command table and t_ds_error_command table,
process_definition_id change to process_definition_code (#5937)
This is an automated email from the ASF dual-hosted git repository.
wenhemin pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split_two by this push:
new 92b92b5 [Fix-5519] The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code (#5937)
92b92b5 is described below
commit 92b92b5f5a6d5202f031c781001a14725fb6219e
Author: wen-hemin <39...@users.noreply.github.com>
AuthorDate: Tue Aug 3 11:54:39 2021 +0800
[Fix-5519] The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code (#5937)
* fix: the t_ds_schedules table, process_definition_id change to process_definition_code
* fix checkstyle
* fix: recovery code
* fix UT
* fix: The t_ds_command table and t_ds_error_command table, process_definition_id change to process_definition_code
* fix comment
* fix checkstyle
* fix: remove duplacated lines
* fix: remove TODO
Co-authored-by: wen-hemin <we...@apache.com>
---
.../api/service/impl/ExecutorServiceImpl.java | 26 +++---
.../dolphinscheduler/dao/entity/Command.java | 71 ++++++++--------
.../dolphinscheduler/dao/entity/ErrorCommand.java | 99 +++++++++-------------
.../dolphinscheduler/dao/mapper/CommandMapper.xml | 6 +-
.../dao/mapper/ErrorCommandMapper.xml | 4 +-
.../dao/mapper/CommandMapperTest.java | 73 ++++++++--------
.../dao/mapper/ErrorCommandMapperTest.java | 11 +--
.../server/master/MasterCommandTest.java | 22 ++---
.../service/process/ProcessService.java | 72 +++++++++++-----
.../service/quartz/ProcessScheduleJob.java | 2 +-
.../service/process/ProcessServiceTest.java | 60 ++++++++-----
sql/dolphinscheduler_mysql.sql | 4 +-
sql/dolphinscheduler_postgre.sql | 4 +-
13 files changed, 243 insertions(+), 211 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 8978257..c3b422f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -160,7 +160,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/**
* create command
*/
- int create = this.createCommand(commandType, processDefinition.getId(),
+ int create = this.createCommand(commandType, processDefinition.getCode(),
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode, processInstancePriority, workerGroup, startParams);
@@ -276,13 +276,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
switch (executeType) {
case REPEAT_RUNNING:
- result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING, startParams);
+ result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.REPEAT_RUNNING, startParams);
break;
case RECOVER_SUSPENDED_PROCESS:
- result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
+ result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
break;
case START_FAILURE_TASK_PROCESS:
- result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
+ result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
break;
case STOP:
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
@@ -394,11 +394,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*
* @param loginUser login user
* @param instanceId instance id
- * @param processDefinitionId process definition id
+ * @param processDefinitionCode process definition code
* @param commandType command type
* @return insert result code
*/
- private Map<String, Object> insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType, String startParams) {
+ private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, CommandType commandType, String startParams) {
Map<String, Object> result = new HashMap<>();
//To add startParams only when repeat running is needed
@@ -410,12 +410,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Command command = new Command();
command.setCommandType(commandType);
- command.setProcessDefinitionId(processDefinitionId);
+ command.setProcessDefinitionCode(processDefinitionCode);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setExecutorId(loginUser.getId());
if (!processService.verifyIsNeedCreateCommand(command)) {
- putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionId);
+ putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode);
return result;
}
@@ -475,7 +475,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* create command
*
* @param commandType commandType
- * @param processDefineId processDefineId
+ * @param processDefineCode processDefineCode
* @param nodeDep nodeDep
* @param failureStrategy failureStrategy
* @param startNodeList startNodeList
@@ -488,7 +488,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param workerGroup workerGroup
* @return command id
*/
- private int createCommand(CommandType commandType, int processDefineId,
+ private int createCommand(CommandType commandType, long processDefineCode,
TaskDependType nodeDep, FailureStrategy failureStrategy,
String startNodeList, String schedule, WarningType warningType,
int executorId, int warningGroupId,
@@ -506,7 +506,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
} else {
command.setCommandType(commandType);
}
- command.setProcessDefinitionId(processDefineId);
+ command.setProcessDefinitionCode(processDefineCode);
if (nodeDep != null) {
command.setTaskDependType(nodeDep);
}
@@ -549,7 +549,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
} else if (runMode == RunMode.RUN_MODE_PARALLEL) {
- List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineId); // TODO: next pr change to code
+ List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineCode);
List<Date> listDate = new LinkedList<>();
if (!CollectionUtils.isEmpty(schedules)) {
for (Schedule item : schedules) {
@@ -580,7 +580,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
}
} else {
- logger.error("there is not valid schedule date for the process definition: id:{}", processDefineId);
+ logger.error("there is not valid schedule date for the process definition code:{}", processDefineCode);
}
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
index cba0151..bef466e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
@@ -14,15 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.entity;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+
+import java.util.Date;
+
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
-import org.apache.dolphinscheduler.common.enums.*;
-
-import java.util.Date;
/**
* command
@@ -33,7 +39,7 @@ public class Command {
/**
* id
*/
- @TableId(value="id", type=IdType.AUTO)
+ @TableId(value = "id", type = IdType.AUTO)
private int id;
/**
@@ -43,10 +49,10 @@ public class Command {
private CommandType commandType;
/**
- * process definition id
+ * process definition code
*/
- @TableField("process_definition_id")
- private int processDefinitionId;
+ @TableField("process_definition_code")
+ private long processDefinitionCode;
/**
* executor id
@@ -126,7 +132,7 @@ public class Command {
TaskDependType taskDependType,
FailureStrategy failureStrategy,
int executorId,
- int processDefinitionId,
+ long processDefinitionCode,
String commandParam,
WarningType warningType,
int warningGroupId,
@@ -135,7 +141,7 @@ public class Command {
Priority processInstancePriority) {
this.commandType = commandType;
this.executorId = executorId;
- this.processDefinitionId = processDefinitionId;
+ this.processDefinitionCode = processDefinitionCode;
this.commandParam = commandParam;
this.warningType = warningType;
this.warningGroupId = warningGroupId;
@@ -148,7 +154,6 @@ public class Command {
this.processInstancePriority = processInstancePriority;
}
-
public TaskDependType getTaskDependType() {
return taskDependType;
}
@@ -173,15 +178,14 @@ public class Command {
this.commandType = commandType;
}
- public int getProcessDefinitionId() {
- return processDefinitionId;
+ public long getProcessDefinitionCode() {
+ return processDefinitionCode;
}
- public void setProcessDefinitionId(int processDefinitionId) {
- this.processDefinitionId = processDefinitionId;
+ public void setProcessDefinitionCode(long processDefinitionCode) {
+ this.processDefinitionCode = processDefinitionCode;
}
-
public FailureStrategy getFailureStrategy() {
return failureStrategy;
}
@@ -276,7 +280,7 @@ public class Command {
if (id != command.id) {
return false;
}
- if (processDefinitionId != command.processDefinitionId) {
+ if (processDefinitionCode != command.processDefinitionCode) {
return false;
}
if (executorId != command.executorId) {
@@ -320,7 +324,7 @@ public class Command {
public int hashCode() {
int result = id;
result = 31 * result + (commandType != null ? commandType.hashCode() : 0);
- result = 31 * result + processDefinitionId;
+ result = 31 * result + Long.hashCode(processDefinitionCode);
result = 31 * result + executorId;
result = 31 * result + (commandParam != null ? commandParam.hashCode() : 0);
result = 31 * result + (taskDependType != null ? taskDependType.hashCode() : 0);
@@ -334,24 +338,25 @@ public class Command {
result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0);
return result;
}
+
@Override
public String toString() {
- return "Command{" +
- "id=" + id +
- ", commandType=" + commandType +
- ", processDefinitionId=" + processDefinitionId +
- ", executorId=" + executorId +
- ", commandParam='" + commandParam + '\'' +
- ", taskDependType=" + taskDependType +
- ", failureStrategy=" + failureStrategy +
- ", warningType=" + warningType +
- ", warningGroupId=" + warningGroupId +
- ", scheduleTime=" + scheduleTime +
- ", startTime=" + startTime +
- ", processInstancePriority=" + processInstancePriority +
- ", updateTime=" + updateTime +
- ", workerGroup='" + workerGroup + '\'' +
- '}';
+ return "Command{"
+ + "id=" + id
+ + ", commandType=" + commandType
+ + ", processDefinitionCode=" + processDefinitionCode
+ + ", executorId=" + executorId
+ + ", commandParam='" + commandParam + '\''
+ + ", taskDependType=" + taskDependType
+ + ", failureStrategy=" + failureStrategy
+ + ", warningType=" + warningType
+ + ", warningGroupId=" + warningGroupId
+ + ", scheduleTime=" + scheduleTime
+ + ", startTime=" + startTime
+ + ", processInstancePriority=" + processInstancePriority
+ + ", updateTime=" + updateTime
+ + ", workerGroup='" + workerGroup + '\''
+ + '}';
}
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
index 760bb23..c4a5e60 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
@@ -14,15 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.entity;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+
+import java.util.Date;
+
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
-import org.apache.dolphinscheduler.common.enums.*;
-
-import java.util.Date;
/**
* command
@@ -33,7 +39,7 @@ public class ErrorCommand {
/**
* id
*/
- @TableId(value="id", type = IdType.INPUT)
+ @TableId(value = "id", type = IdType.INPUT)
private int id;
/**
@@ -42,9 +48,9 @@ public class ErrorCommand {
private CommandType commandType;
/**
- * process definition id
+ * process definition code
*/
- private int processDefinitionId;
+ private long processDefinitionCode;
/**
* executor id
@@ -79,13 +85,13 @@ public class ErrorCommand {
/**
* schedule time
*/
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date scheduleTime;
/**
* start time
*/
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date startTime;
/**
@@ -96,7 +102,7 @@ public class ErrorCommand {
/**
* update time
*/
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date updateTime;
/**
@@ -111,11 +117,11 @@ public class ErrorCommand {
public ErrorCommand(){}
- public ErrorCommand(Command command, String message){
+ public ErrorCommand(Command command, String message) {
this.id = command.getId();
this.commandType = command.getCommandType();
this.executorId = command.getExecutorId();
- this.processDefinitionId = command.getProcessDefinitionId();
+ this.processDefinitionCode = command.getProcessDefinitionCode();
this.commandParam = command.getCommandParam();
this.warningType = command.getWarningType();
this.warningGroupId = command.getWarningGroupId();
@@ -128,34 +134,6 @@ public class ErrorCommand {
this.message = message;
}
- public ErrorCommand(
- CommandType commandType,
- TaskDependType taskDependType,
- FailureStrategy failureStrategy,
- int executorId,
- int processDefinitionId,
- String commandParam,
- WarningType warningType,
- int warningGroupId,
- Date scheduleTime,
- Priority processInstancePriority,
- String message){
- this.commandType = commandType;
- this.executorId = executorId;
- this.processDefinitionId = processDefinitionId;
- this.commandParam = commandParam;
- this.warningType = warningType;
- this.warningGroupId = warningGroupId;
- this.scheduleTime = scheduleTime;
- this.taskDependType = taskDependType;
- this.failureStrategy = failureStrategy;
- this.startTime = new Date();
- this.updateTime = new Date();
- this.processInstancePriority = processInstancePriority;
- this.message = message;
- }
-
-
public TaskDependType getTaskDependType() {
return taskDependType;
}
@@ -180,15 +158,14 @@ public class ErrorCommand {
this.commandType = commandType;
}
- public int getProcessDefinitionId() {
- return processDefinitionId;
+ public long getProcessDefinitionCode() {
+ return processDefinitionCode;
}
- public void setProcessDefinitionId(int processDefinitionId) {
- this.processDefinitionId = processDefinitionId;
+ public void setProcessDefinitionCode(long processDefinitionCode) {
+ this.processDefinitionCode = processDefinitionCode;
}
-
public FailureStrategy getFailureStrategy() {
return failureStrategy;
}
@@ -279,22 +256,22 @@ public class ErrorCommand {
@Override
public String toString() {
- return "ErrorCommand{" +
- "id=" + id +
- ", commandType=" + commandType +
- ", processDefinitionId=" + processDefinitionId +
- ", executorId=" + executorId +
- ", commandParam='" + commandParam + '\'' +
- ", taskDependType=" + taskDependType +
- ", failureStrategy=" + failureStrategy +
- ", warningType=" + warningType +
- ", warningGroupId=" + warningGroupId +
- ", scheduleTime=" + scheduleTime +
- ", startTime=" + startTime +
- ", processInstancePriority=" + processInstancePriority +
- ", updateTime=" + updateTime +
- ", message='" + message + '\'' +
- ", workerGroup='" + workerGroup + '\'' +
- '}';
+ return "ErrorCommand{"
+ + "id=" + id
+ + ", commandType=" + commandType
+ + ", processDefinitionCode=" + processDefinitionCode
+ + ", executorId=" + executorId
+ + ", commandParam='" + commandParam + '\''
+ + ", taskDependType=" + taskDependType
+ + ", failureStrategy=" + failureStrategy
+ + ", warningType=" + warningType
+ + ", warningGroupId=" + warningGroupId
+ + ", scheduleTime=" + scheduleTime
+ + ", startTime=" + startTime
+ + ", processInstancePriority=" + processInstancePriority
+ + ", updateTime=" + updateTime
+ + ", message='" + message + '\''
+ + ", workerGroup='" + workerGroup + '\''
+ + '}';
}
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index c0728f2..38f5bff 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -19,11 +19,11 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.CommandMapper">
<select id="getOneToRun" resultType="org.apache.dolphinscheduler.dao.entity.Command">
- select cmd.id, cmd.command_type, cmd.process_definition_id, cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
+ select cmd.id, cmd.command_type, cmd.process_definition_code, cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
cmd.warning_type, cmd.warning_group_id, cmd.schedule_time, cmd.start_time, cmd.executor_id, cmd.update_time,
cmd.process_instance_priority, cmd.worker_group
from t_ds_command cmd
- join t_ds_process_definition definition on cmd.process_definition_id = definition.id
+ join t_ds_process_definition definition on cmd.process_definition_code = definition.code
where definition.release_state = 1 AND definition.flag = 1
order by cmd.update_time asc
limit 1
@@ -31,7 +31,7 @@
<select id="countCommandState" resultType="org.apache.dolphinscheduler.dao.entity.CommandCount">
select cmd.command_type as command_type, count(1) as count
from t_ds_command cmd, t_ds_process_definition process
- where cmd.process_definition_id = process.id
+ where cmd.process_definition_code = process.code
<if test="projectCodeArray != null and projectCodeArray.length != 0">
and process.project_code in
<foreach collection="projectCodeArray" index="index" item="i" open="(" close=")" separator=",">
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml
index 5f93854..8179ff4 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapper.xml
@@ -21,7 +21,7 @@
<select id="countCommandState" resultType="org.apache.dolphinscheduler.dao.entity.CommandCount">
select cmd.command_type as command_type, count(1) as count
from t_ds_error_command cmd, t_ds_process_definition process
- where cmd.process_definition_id = process.id
+ where cmd.process_definition_code = process.code
<if test="projectCodeArray != null and projectCodeArray.length != 0">
and process.project_code in
<foreach collection="projectCodeArray" index="index" item="i" open="(" close=")" separator=",">
@@ -33,4 +33,4 @@
</if>
group by cmd.command_type
</select>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
index dd73bc3..dc9dbee 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
@@ -14,15 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.mapper;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CommandCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.common.enums.*;
-import org.junit.Assert;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
@@ -31,14 +50,6 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.*;
-
/**
* command mapper test
*/
@@ -48,24 +59,21 @@ import static org.junit.Assert.*;
@Rollback(true)
public class CommandMapperTest {
-
@Autowired
CommandMapper commandMapper;
@Autowired
ProcessDefinitionMapper processDefinitionMapper;
-
/**
* test insert
*/
@Test
- public void testInsert(){
+ public void testInsert() {
Command command = createCommand();
assertThat(command.getId(),greaterThan(0));
}
-
/**
* test select by id
*/
@@ -76,14 +84,14 @@ public class CommandMapperTest {
Command actualCommand = commandMapper.selectById(expectedCommand.getId());
assertNotNull(actualCommand);
- assertEquals(expectedCommand.getProcessDefinitionId(), actualCommand.getProcessDefinitionId());
+ assertEquals(expectedCommand.getProcessDefinitionCode(), actualCommand.getProcessDefinitionCode());
}
/**
* test update
*/
@Test
- public void testUpdate(){
+ public void testUpdate() {
Command expectedCommand = createCommand();
@@ -103,7 +111,7 @@ public class CommandMapperTest {
* test delete
*/
@Test
- public void testDelete(){
+ public void testDelete() {
Command expectedCommand = createCommand();
commandMapper.deleteById(expectedCommand.getId());
@@ -124,7 +132,6 @@ public class CommandMapperTest {
Map<Integer, Command> commandMap = createCommandMap(count);
-
List<Command> actualCommands = commandMapper.selectList(null);
assertThat(actualCommands.size(), greaterThanOrEqualTo(count));
@@ -138,7 +145,7 @@ public class CommandMapperTest {
ProcessDefinition processDefinition = createProcessDefinition();
- Command expectedCommand = createCommand(CommandType.START_PROCESS,processDefinition.getId());
+ createCommand(CommandType.START_PROCESS, processDefinition.getCode());
Command actualCommand = commandMapper.getOneToRun();
@@ -154,7 +161,7 @@ public class CommandMapperTest {
ProcessDefinition processDefinition = createProcessDefinition();
- CommandCount expectedCommandCount = createCommandMap(count, CommandType.START_PROCESS, processDefinition.getId());
+ createCommandMap(count, CommandType.START_PROCESS, processDefinition.getCode());
Long[] projectCodeArray = {processDefinition.getProjectCode()};
@@ -167,23 +174,22 @@ public class CommandMapperTest {
assertThat(actualCommandCounts.size(),greaterThanOrEqualTo(1));
}
-
/**
* create command map
* @param count map count
* @param commandType comman type
- * @param processDefinitionId process definition id
+ * @param processDefinitionCode process definition code
* @return command map
*/
private CommandCount createCommandMap(
Integer count,
CommandType commandType,
- Integer processDefinitionId){
+ long processDefinitionCode) {
CommandCount commandCount = new CommandCount();
- for (int i = 0 ;i < count ;i++){
- createCommand(commandType,processDefinitionId);
+ for (int i = 0;i < count;i++) {
+ createCommand(commandType, processDefinitionCode);
}
commandCount.setCommandType(commandType);
commandCount.setCount(count);
@@ -195,7 +201,7 @@ public class CommandMapperTest {
* create process definition
* @return process definition
*/
- private ProcessDefinition createProcessDefinition(){
+ private ProcessDefinition createProcessDefinition() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
processDefinition.setReleaseState(ReleaseState.ONLINE);
@@ -215,22 +221,21 @@ public class CommandMapperTest {
* @param count map count
* @return command map
*/
- private Map<Integer,Command> createCommandMap(Integer count){
+ private Map<Integer,Command> createCommandMap(Integer count) {
Map<Integer,Command> commandMap = new HashMap<>();
- for (int i = 0; i < count ;i++){
+ for (int i = 0;i < count;i++) {
Command command = createCommand();
commandMap.put(command.getId(),command);
}
return commandMap;
}
-
/**
* create command
* @return
*/
- private Command createCommand(){
+ private Command createCommand() {
return createCommand(CommandType.START_PROCESS,1);
}
@@ -238,11 +243,11 @@ public class CommandMapperTest {
* create command
* @return Command
*/
- private Command createCommand(CommandType commandType,Integer processDefinitionId){
+ private Command createCommand(CommandType commandType, long processDefinitionCode) {
Command command = new Command();
command.setCommandType(commandType);
- command.setProcessDefinitionId(processDefinitionId);
+ command.setProcessDefinitionCode(processDefinitionCode);
command.setExecutorId(4);
command.setCommandParam("test command param");
command.setTaskDependType(TaskDependType.TASK_ONLY);
@@ -259,6 +264,4 @@ public class CommandMapperTest {
return command;
}
-
-
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java
index 6e18aa9..58bd93a 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java
@@ -21,6 +21,10 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.CommandCount;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+
+import java.util.Date;
+import java.util.List;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -30,9 +34,6 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
-import java.util.Date;
-import java.util.List;
-
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
@@ -81,7 +82,7 @@ public class ErrorCommandMapperTest {
processDefinition.setCreateTime(new Date());
processDefinitionMapper.insert(processDefinition);
- errorCommand.setProcessDefinitionId(processDefinition.getId());
+ errorCommand.setProcessDefinitionCode(processDefinition.getCode());
errorCommandMapper.updateById(errorCommand);
@@ -103,4 +104,4 @@ public class ErrorCommandMapperTest {
Assert.assertNotEquals(commandCounts.size(), 0);
Assert.assertNotEquals(commandCounts2.size(), 0);
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
index d402afc..2093d95 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
@@ -18,26 +18,16 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
-import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.graph.DAG;
-import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.utils.DagHelper;
+
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
/**
* master test
*/
@@ -56,7 +46,7 @@ public class MasterCommandTest {
Command cmd = new Command();
cmd.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
cmd.setCommandParam("{\"ProcessInstanceId\":325}");
- cmd.setProcessDefinitionId(63);
+ cmd.setProcessDefinitionCode(63);
commandMapper.insert(cmd);
@@ -66,7 +56,7 @@ public class MasterCommandTest {
public void RecoverSuspendCommand(){
Command cmd = new Command();
- cmd.setProcessDefinitionId(44);
+ cmd.setProcessDefinitionCode(44);
cmd.setCommandParam("{\"ProcessInstanceId\":290}");
cmd.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
@@ -80,7 +70,7 @@ public class MasterCommandTest {
public void startNewProcessCommand(){
Command cmd = new Command();
cmd.setCommandType(CommandType.START_PROCESS);
- cmd.setProcessDefinitionId(167);
+ cmd.setProcessDefinitionCode(167);
cmd.setFailureStrategy(FailureStrategy.CONTINUE);
cmd.setWarningType(WarningType.NONE);
cmd.setWarningGroupId(4);
@@ -94,7 +84,7 @@ public class MasterCommandTest {
Command cmd = new Command();
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
cmd.setCommandParam("{\"ProcessInstanceId\":816}");
- cmd.setProcessDefinitionId(15);
+ cmd.setProcessDefinitionCode(15);
commandMapper.insert(cmd);
}
@@ -105,7 +95,7 @@ public class MasterCommandTest {
cmd.setCommandType(CommandType.START_PROCESS);
cmd.setFailureStrategy(FailureStrategy.CONTINUE);
cmd.setWarningType(WarningType.ALL);
- cmd.setProcessDefinitionId(72);
+ cmd.setProcessDefinitionCode(72);
cmd.setExecutorId(10);
commandMapper.insert(cmd);
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 01e4678..bd5e4df 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -270,7 +270,7 @@ public class ProcessService {
* @return if thread is enough
*/
private boolean checkThreadNum(Command command, int validThreadNum) {
- int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId());
+ int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionCode());
return validThreadNum >= commandThreadCount;
}
@@ -469,12 +469,14 @@ public class ProcessService {
/**
* calculate sub process number in the process define.
*
- * @param processDefinitionId processDefinitionId
+ * @param processDefinitionCode processDefinitionCode
* @return process thread num count
*/
- private Integer workProcessThreadNumCount(Integer processDefinitionId) {
+ private Integer workProcessThreadNumCount(long processDefinitionCode) {
+ ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
+
List<Integer> ids = new ArrayList<>();
- recurseFindSubProcessId(processDefinitionId, ids);
+ recurseFindSubProcessId(processDefinition.getId(), ids);
return ids.size() + 1;
}
@@ -497,7 +499,6 @@ public class ProcessService {
ids.add(subProcessParam.getProcessDefinitionId());
recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(), ids);
}
-
}
}
}
@@ -529,7 +530,7 @@ public class ProcessService {
processInstance.getTaskDependType(),
processInstance.getFailureStrategy(),
processInstance.getExecutorId(),
- processInstance.getProcessDefinition().getId(),
+ processInstance.getProcessDefinition().getCode(),
JSONUtils.toJsonString(cmdParam),
processInstance.getWarningType(),
processInstance.getWarningGroupId(),
@@ -713,13 +714,10 @@ public class ProcessService {
CommandType commandType = command.getCommandType();
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
- ProcessDefinition processDefinition = null;
- if (command.getProcessDefinitionId() != 0) {
- processDefinition = processDefineMapper.selectById(command.getProcessDefinitionId());
- if (processDefinition == null) {
- logger.error("cannot find the work process define! define id : {}", command.getProcessDefinitionId());
- return null;
- }
+ ProcessDefinition processDefinition = getProcessDefinitionByCommand(command.getProcessDefinitionCode(), cmdParam);
+ if (processDefinition == null) {
+ logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
+ return null;
}
if (cmdParam != null) {
@@ -741,6 +739,7 @@ public class ProcessService {
String pId = cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD);
processInstanceId = Integer.parseInt(pId);
}
+
if (processInstanceId == 0) {
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
} else {
@@ -869,6 +868,40 @@ public class ProcessService {
}
/**
+ * get process definition by command
+ * If it is a fault-tolerant command, get the specified version of ProcessDefinition through ProcessInstance
+ * Otherwise, get the latest version of ProcessDefinition
+ *
+ * @param processDefinitionCode
+ * @param cmdParam
+ * @return ProcessDefinition
+ */
+ private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) {
+ if (cmdParam != null) {
+ int processInstanceId = 0;
+ if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
+ processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING));
+ } else if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
+ processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_SUB_PROCESS));
+ } else if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)) {
+ processInstanceId = Integer.parseInt(cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD));
+ }
+
+ if (processInstanceId != 0) {
+ ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId);
+ if (processInstance == null) {
+ return null;
+ }
+
+ return processDefineLogMapper.queryByDefinitionCodeAndVersion(
+ processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
+ }
+ }
+
+ return processDefineMapper.queryByCode(processDefinitionCode);
+ }
+
+ /**
* return complement data if the process start with complement data
*
* @param processInstance processInstance
@@ -1103,7 +1136,7 @@ public class ProcessService {
childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
}
Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
- updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId());
+ updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode());
initSubInstanceState(childInstance);
createCommand(subProcessCommand);
logger.info("sub process command created: {} ", subProcessCommand);
@@ -1152,6 +1185,7 @@ public class ProcessService {
CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
Map<String, String> subProcessParam = JSONUtils.toMap(task.getTaskParams());
int childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
+ ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(childDefineId);
Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
@@ -1169,7 +1203,7 @@ public class ProcessService {
TaskDependType.TASK_POST,
parentProcessInstance.getFailureStrategy(),
parentProcessInstance.getExecutorId(),
- childDefineId,
+ processDefinition.getCode(),
processParam,
parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(),
@@ -1208,12 +1242,12 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
- * @param childDefinitionId childDefinitionId
+ * @param childDefinitionCode childDefinitionId
*/
- private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) {
+ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
parentProcessInstance.getProcessDefinitionVersion());
- ProcessDefinition childDefinition = this.findProcessDefineById(childDefinitionId);
+ ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode);
if (childDefinition != null && fatherDefinition != null) {
childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());
processDefineMapper.updateById(childDefinition);
@@ -1693,7 +1727,7 @@ public class ProcessService {
//2 insert into recover command
Command cmd = new Command();
- cmd.setProcessDefinitionId(processDefinition.getId());
+ cmd.setProcessDefinitionCode(processDefinition.getCode());
cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index bda8ad8..eacd8bc 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -93,7 +93,7 @@ public class ProcessScheduleJob implements Job {
command.setCommandType(CommandType.SCHEDULER);
command.setExecutorId(schedule.getUserId());
command.setFailureStrategy(schedule.getFailureStrategy());
- //command.setProcessDefinitionId(schedule.getProcessDefinitionCode()); TODO next pr
+ command.setProcessDefinitionCode(schedule.getProcessDefinitionCode());
command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId());
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 9eeec78..413a9d8 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -126,6 +126,9 @@ public class ProcessServiceTest {
//father history: start; child null == command type: start
parentInstance.setHistoryCmd("START_PROCESS");
parentInstance.setCommandType(CommandType.START_PROCESS);
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setCode(1L);
+ Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition);
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
@@ -227,16 +230,15 @@ public class ProcessServiceTest {
String host = "127.0.0.1";
int validThreadNum = 1;
Command command = new Command();
- command.setProcessDefinitionId(222);
+ command.setProcessDefinitionCode(222);
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}");
- Mockito.when(processDefineMapper.selectById(command.getProcessDefinitionId())).thenReturn(null);
Assert.assertNull(processService.handleCommand(logger, host, validThreadNum, command));
//there is not enough thread for this command
Command command1 = new Command();
- command1.setProcessDefinitionId(123);
+ command1.setProcessDefinitionCode(123);
command1.setCommandParam("{\"ProcessInstanceId\":222}");
command1.setCommandType(CommandType.START_PROCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
@@ -254,31 +256,35 @@ public class ProcessServiceTest {
processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(222);
- Mockito.when(processDefineMapper.selectById(command1.getProcessDefinitionId())).thenReturn(processDefinition);
+ processInstance.setProcessDefinitionCode(11L);
+ processInstance.setProcessDefinitionVersion(1);
+ Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition);
+ Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command1));
Command command2 = new Command();
command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
- command2.setProcessDefinitionId(123);
+ command2.setProcessDefinitionCode(123);
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command2));
Command command3 = new Command();
- command3.setProcessDefinitionId(123);
+ command3.setProcessDefinitionCode(123);
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command3));
Command command4 = new Command();
- command4.setProcessDefinitionId(123);
+ command4.setProcessDefinitionCode(123);
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command4.setCommandType(CommandType.REPEAT_RUNNING);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command4));
Command command5 = new Command();
- command5.setProcessDefinitionId(123);
+ command5.setProcessDefinitionCode(123);
HashMap<String, String> startParams = new HashMap<>();
startParams.put("startParam1", "testStartParam1");
HashMap<String, String> commandParams = new HashMap<>();
@@ -317,20 +323,36 @@ public class ProcessServiceTest {
@Test
public void testRecurseFindSubProcessId() {
+ int parentProcessDefineId = 1;
+ long parentProcessDefineCode = 1L;
+ int parentProcessDefineVersion = 1;
+
ProcessDefinition processDefinition = new ProcessDefinition();
- processDefinition.setCode(10L);
- int parentId = 111;
- List<Integer> ids = new ArrayList<>();
- ProcessDefinition processDefinition2 = new ProcessDefinition();
- processDefinition2.setCode(11L);
- Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition);
+ processDefinition.setCode(parentProcessDefineCode);
+ processDefinition.setVersion(parentProcessDefineVersion);
+ Mockito.when(processDefineMapper.selectById(parentProcessDefineId)).thenReturn(processDefinition);
+
+ long postTaskCode = 2L;
+ int postTaskVersion = 2;
+
List<ProcessTaskRelationLog> relationLogList = new ArrayList<>();
- Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong()
- , Mockito.anyInt()))
- .thenReturn(relationLogList);
+ ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
+ processTaskRelationLog.setPostTaskCode(postTaskCode);
+ processTaskRelationLog.setPostTaskVersion(postTaskVersion);
+ relationLogList.add(processTaskRelationLog);
+ Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(parentProcessDefineCode
+ , parentProcessDefineVersion)).thenReturn(relationLogList);
- processService.recurseFindSubProcessId(parentId, ids);
+ List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+ TaskDefinitionLog taskDefinitionLog1 = new TaskDefinitionLog();
+ taskDefinitionLog1.setTaskParams("{\"processDefinitionId\": 123}");
+ taskDefinitionLogs.add(taskDefinitionLog1);
+ Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(Mockito.anySet())).thenReturn(taskDefinitionLogs);
+
+ List<Integer> ids = new ArrayList<>();
+ processService.recurseFindSubProcessId(parentProcessDefineId, ids);
+ Assert.assertEquals(1, ids.size());
}
@Test
@@ -499,7 +521,7 @@ public class ProcessServiceTest {
@Test
public void testCreateCommand() {
Command command = new Command();
- command.setProcessDefinitionId(123);
+ command.setProcessDefinitionCode(123);
command.setCommandParam("{\"ProcessInstanceId\":222}");
command.setCommandType(CommandType.START_PROCESS);
int mockResult = 1;
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 11bdc8d..d056b23 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -319,7 +319,7 @@ DROP TABLE IF EXISTS `t_ds_command`;
CREATE TABLE `t_ds_command` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`command_type` tinyint(4) DEFAULT NULL COMMENT 'Command type: 0 start workflow, 1 start execution from current node, 2 resume fault-tolerant workflow, 3 resume pause process, 4 start execution from failed node, 5 complement, 6 schedule, 7 rerun, 8 pause, 9 stop, 10 resume waiting thread',
- `process_definition_id` int(11) DEFAULT NULL COMMENT 'process definition id',
+ `process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process definition code',
`command_param` text COMMENT 'json command parameters',
`task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'Node dependency type: 0 current node, 1 forward, 2 backward',
`failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'Failed policy: 0 end, 1 continue',
@@ -367,7 +367,7 @@ CREATE TABLE `t_ds_error_command` (
`id` int(11) NOT NULL COMMENT 'key',
`command_type` tinyint(4) DEFAULT NULL COMMENT 'command type',
`executor_id` int(11) DEFAULT NULL COMMENT 'executor id',
- `process_definition_id` int(11) DEFAULT NULL COMMENT 'process definition id',
+ `process_definition_code` bigint(20) DEFAULT NULL COMMENT 'process definition code',
`command_param` text COMMENT 'json command parameters',
`task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'task depend type',
`failure_strategy` tinyint(4) DEFAULT '0' COMMENT 'failure strategy',
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index eb97912..f396781 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -220,7 +220,7 @@ DROP TABLE IF EXISTS t_ds_command;
CREATE TABLE t_ds_command (
id int NOT NULL ,
command_type int DEFAULT NULL ,
- process_definition_id int DEFAULT NULL ,
+ process_definition_code bigint NOT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
failure_strategy int DEFAULT '0' ,
@@ -262,7 +262,7 @@ CREATE TABLE t_ds_error_command (
id int NOT NULL ,
command_type int DEFAULT NULL ,
executor_id int DEFAULT NULL ,
- process_definition_id int DEFAULT NULL ,
+ process_definition_code bigint NOT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
failure_strategy int DEFAULT '0' ,