You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/02/07 08:15:43 UTC
[incubator-dolphinscheduler] branch json_split updated:
[Feature][JsonSplit] refactor process definition update (#4708)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new f31eee4 [Feature][JsonSplit] refactor process definition update (#4708)
f31eee4 is described below
commit f31eee43411575e12f4f8b0ef27fc64f7932f296
Author: bao liang <29...@users.noreply.github.com>
AuthorDate: Sun Feb 7 16:15:35 2021 +0800
[Feature][JsonSplit] refactor process definition update (#4708)
* add code in task_instance and process instance
* delete process_definition_id in t_ds_task_instance
* add task_code task_definition_version process_definition_code in task instance
* add task_code task_definition_version process_definition_code in task instance
* refactor process instance
* refactor process instance update
* refactor json-split for process definition and task definition
refactor process instance update
* refactor json-split for process definition and task definition
refactor process instance update
* code style
* code style
* code style
* code style
* refactor code
---
.../api/service/ProcessInstanceService.java | 146 +++++++-----
.../service/impl/ProcessDefinitionServiceImpl.java | 154 ++-----------
.../service/impl/TaskDefinitionServiceImpl.java | 63 +-----
.../dao/entity/ProcessDefinition.java | 2 +
.../dao/entity/ProcessInstance.java | 37 +++
.../dolphinscheduler/dao/entity/TaskInstance.java | 43 ++++
.../dao/mapper/ProcessDefinitionLogMapper.java | 11 +-
.../dao/mapper/ProcessDefinitionLogMapper.xml | 10 +-
.../dao/mapper/ProcessInstanceMapper.xml | 4 +-
.../dao/mapper/TaskInstanceMapper.xml | 10 +-
.../service/process/ProcessService.java | 248 +++++++++++++++++++++
sql/dolphinscheduler-postgre.sql | 2 +-
sql/dolphinscheduler_mysql.sql | 4 +-
13 files changed, 466 insertions(+), 268 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index e74b2b4..7976882 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -190,6 +191,7 @@ public class ProcessInstanceService extends BaseService {
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
+ processInstance.setProcessDefinitionId(processDefinition.getId());
result.put(DATA_LIST, processInstance);
putMsg(result, Status.SUCCESS);
@@ -406,91 +408,123 @@ public class ProcessInstanceService extends BaseService {
Flag flag, String locations, String connects) throws ParseException {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
-
//check project permission
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
-
//check process instance exists
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
-
//check process instance status
if (!processInstance.getState().typeIsFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
}
- Date schedule = null;
- schedule = processInstance.getScheduleTime();
- if (scheduleTime != null) {
- schedule = DateUtils.getScheduleDate(scheduleTime);
+ ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion());
+ ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
+ //check workflow json is valid
+ result = processDefinitionService.checkProcessNodeList(processData, processInstanceJson);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
}
- processInstance.setScheduleTime(schedule);
- processInstance.setLocations(locations);
- processInstance.setConnects(connects);
- String globalParams = null;
- String originDefParams = null;
- int timeout = processInstance.getTimeout();
- ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
- if (StringUtils.isNotEmpty(processInstanceJson)) {
- ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
- //check workflow json is valid
- Map<String, Object> checkFlowJson = processDefinitionService.checkProcessNodeList(processData, processInstanceJson);
- if (checkFlowJson.get(Constants.STATUS) != Status.SUCCESS) {
- return result;
- }
-
- originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
- List<Property> globalParamList = processData.getGlobalParams();
- Map<String, String> globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
- globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
- processInstance.getCmdTypeIfComplement(), schedule);
- timeout = processData.getTimeout();
- processInstance.setTimeout(timeout);
- Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
- processDefinition.getUserId());
- if (tenant != null) {
- processInstance.setTenantCode(tenant.getTenantCode());
- }
- // get the processinstancejson before saving,and then save the name and taskid
- String oldJson = processInstance.getProcessInstanceJson();
- if (StringUtils.isNotEmpty(oldJson)) {
- processInstanceJson = processService.changeJson(processData,oldJson);
- }
- processInstance.setProcessInstanceJson(processInstanceJson);
- processInstance.setGlobalParams(globalParams);
+ Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
+ processDefinition.getUserId());
+ // get the processinstancejson before saving,and then save the name and taskid
+ String oldJson = processInstance.getProcessInstanceJson();
+ if (StringUtils.isNotEmpty(oldJson)) {
+ processInstanceJson = processService.changeJson(processData, oldJson);
}
-
+ setProcessInstance(processInstance, tenant, scheduleTime, locations,
+ connects, processInstanceJson, processData);
int update = processService.updateProcessInstance(processInstance);
int updateDefine = 1;
if (Boolean.TRUE.equals(syncDefine)) {
- processDefinition.setProcessDefinitionJson(processInstanceJson);
- processDefinition.setGlobalParams(originDefParams);
- processDefinition.setLocations(locations);
- processDefinition.setConnects(connects);
- processDefinition.setTimeout(timeout);
- processDefinition.setUpdateTime(new Date());
-
- // add process definition version
- int version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
- processDefinition.setVersion(version);
- updateDefine = processDefineMapper.updateById(processDefinition);
+ updateDefine = syncDefinition(loginUser, project, processInstanceJson, locations, connects,
+ processInstance, processDefinition, processData);
}
if (update > 0 && updateDefine > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
}
-
return result;
+ }
+ /**
+ * sync definition according process instance
+ *
+ * @param loginUser
+ * @param project
+ * @param processInstanceJson
+ * @param locations
+ * @param connects
+ * @param processInstance
+ * @param processDefinition
+ * @param processData
+ * @return
+ */
+ private int syncDefinition(User loginUser, Project project, String processInstanceJson, String locations, String connects,
+ ProcessInstance processInstance, ProcessDefinition processDefinition,
+ ProcessData processData) {
+
+ String originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
+ processDefinition.setProcessDefinitionJson(processInstanceJson);
+ processDefinition.setGlobalParams(originDefParams);
+ processDefinition.setLocations(locations);
+ processDefinition.setConnects(connects);
+ processDefinition.setTimeout(processInstance.getTimeout());
+ processDefinition.setUpdateTime(new Date());
+
+ int updateDefine = processService.saveProcessDefinition(loginUser, project, processDefinition.getName(),
+ processDefinition.getDescription(), locations, connects,
+ processData, processDefinition);
+ return updateDefine;
+ }
+
+ /**
+ * update process instance attributes
+ *
+ * @param processInstance
+ * @param tenant
+ * @param scheduleTime
+ * @param locations
+ * @param connects
+ * @param processInstanceJson
+ * @param processData
+ * @return false if check failed or
+ */
+ private void setProcessInstance(ProcessInstance processInstance, Tenant tenant,
+ String scheduleTime, String locations, String connects, String processInstanceJson,
+ ProcessData processData) {
+
+ Date schedule = processInstance.getScheduleTime();
+ if (scheduleTime != null) {
+ schedule = DateUtils.getScheduleDate(scheduleTime);
+ }
+ processInstance.setScheduleTime(schedule);
+ processInstance.setLocations(locations);
+ processInstance.setConnects(connects);
+ if (StringUtils.isNotEmpty(processInstanceJson)) {
+ return;
+ }
+ List<Property> globalParamList = processData.getGlobalParams();
+ Map<String, String> globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
+ String globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
+ processInstance.getCmdTypeIfComplement(), schedule);
+ int timeout = processData.getTimeout();
+ processInstance.setTimeout(timeout);
+ if (tenant != null) {
+ processInstance.setTenantCode(tenant.getTenantCode());
+ }
+ processInstance.setProcessInstanceJson(processInstanceJson);
+ processInstance.setGlobalParams(globalParams);
}
/**
@@ -705,13 +739,9 @@ public class ProcessInstanceService extends BaseService {
private static DAG<String, TaskNode, TaskNodeRelation> processInstance2DAG(ProcessInstance processInstance) {
String processDefinitionJson = processInstance.getProcessInstanceJson();
-
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
-
List<TaskNode> taskNodeList = processData.getTasks();
-
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
-
return DagHelper.buildDagGraph(processDag);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index f704971..03515e3 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
-import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -48,7 +47,6 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
-import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
@@ -66,24 +64,24 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -158,12 +156,15 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Autowired
private ProcessService processService;
+ @Autowired
+ private TaskDefinitionMapper taskDefinitionMapper;
+
/**
* create process definition
*
* @param loginUser login user
* @param projectName project name
- * @param name process definition name
+ * @param processDefinitionName process definition name
* @param processDefinitionJson process definition json
* @param desc description
* @param locations locations for nodes
@@ -174,7 +175,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> createProcessDefinition(User loginUser,
String projectName,
- String name,
+ String processDefinitionName,
String processDefinitionJson,
String desc,
String locations,
@@ -190,8 +191,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
}
ProcessDefinition processDefinition = new ProcessDefinition();
- Date now = new Date();
-
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson);
if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) {
@@ -206,68 +205,10 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
putMsg(result, Status.CREATE_PROCESS_DEFINITION);
return result;
}
-
- processDefinition.setName(name);
- processDefinition.setVersion(1);
- processDefinition.setReleaseState(ReleaseState.OFFLINE);
- processDefinition.setUserId(loginUser.getId());
- processDefinition.setDescription(desc);
- processDefinition.setLocations(locations);
- processDefinition.setConnects(connects);
- processDefinition.setTimeout(processData.getTimeout());
- processDefinition.setTenantId(processData.getTenantId());
- processDefinition.setModifyBy(loginUser.getUserName());
-
- //custom global params
- List<Property> globalParamsList = processData.getGlobalParams();
- if (CollectionUtils.isNotEmpty(globalParamsList)) {
- Set<Property> globalParamsSet = new HashSet<>(globalParamsList);
- globalParamsList = new ArrayList<>(globalParamsSet);
- processDefinition.setGlobalParamList(globalParamsList);
- }
- processDefinition.setCreateTime(now);
- processDefinition.setUpdateTime(now);
- processDefinition.setFlag(Flag.YES);
-
- // save the new process definition
- processDefinitionMapper.insert(processDefinition);
-
- // parse and save the taskDefinition and processTaskRelation
- try {
- List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
-
- for (TaskNode task : taskNodeList) {
- taskDefinitionService.createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task));
- }
-
- DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
- Collection<String> beginNode = dag.getBeginNode();
- Collection<String> endNode = dag.getEndNode();
-
- // TODO: query taskCode by projectCode and taskName
-
- processTaskRelationService.createProcessTaskRelation(
- loginUser,
- name,
- projectName,
- processDefinitionCode,
- 0L,
- 0L,
- "0",
- "");
-
- } catch (Exception e) {
- putMsg(result, Status.CREATE_PROCESS_DEFINITION);
- return result;
- }
-
- // save process definition log
- ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject(
- JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class);
-
- processDefinitionLog.setOperator(loginUser.getId());
- processDefinitionLog.setOperateTime(now);
- processDefinitionLogMapper.insert(processDefinitionLog);
+ ProcessDefinitionLog processDefinitionLog = processService.insertProcessDefinitionLog(loginUser, processDefinitionCode, processDefinitionName, processData,
+ project, desc, locations, connects);
+ processService.switchVersion(processDefinition, processDefinitionLog);
+ processService.createTaskAndRelation(loginUser, projectName, "", processDefinition, processData);
// return processDefinition object with ID
result.put(Constants.DATA_LIST, processDefinition.getId());
@@ -275,6 +216,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return result;
}
+
/**
* get resource ids
*
@@ -475,7 +417,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
return result;
}
-
if (!name.equals(processDefinition.getName())) {
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getId(), name);
@@ -487,55 +428,11 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
// get the processdefinitionjson before saving,and then save the name and taskid
String oldJson = processDefinition.getProcessDefinitionJson();
processDefinitionJson = processService.changeJson(processData, oldJson);
-
- // update TaskDefinition
ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
- List<TaskNode> taskNodeList = (newProcessData.getTasks() == null) ? new ArrayList<>() : newProcessData.getTasks();
+ int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc,
+ locations, connects, newProcessData, processDefinition);
- for (TaskNode task : taskNodeList) {
- // TODO update by code directly
- Map<String, Object> stringObjectMap = taskDefinitionService.queryTaskDefinitionByName(loginUser, projectName, task.getName());
- TaskDefinition taskDefinition = (TaskDefinition) stringObjectMap.get(Constants.DATA_LIST);
- taskDefinitionService.updateTaskDefinition(loginUser, projectName, taskDefinition.getCode(), JSONUtils.toJsonString(task));
- }
-
- List<ProcessDefinitionLog> processDefinitionLogs = processDefinitionLogMapper.queryByDefinitionCode(processDefinition.getCode());
- int version = getNextVersion(processDefinitionLogs);
-
- Date now = new Date();
- processDefinition.setVersion(version);
- processDefinition.setName(name);
- processDefinition.setReleaseState(ReleaseState.OFFLINE);
- processDefinition.setProjectCode(project.getCode());
- processDefinition.setDescription(desc);
- processDefinition.setLocations(locations);
- processDefinition.setConnects(connects);
- processDefinition.setTimeout(processData.getTimeout());
- processDefinition.setTenantId(processData.getTenantId());
-
- //custom global params
- List<Property> globalParamsList = new ArrayList<>();
- if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) {
- Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams());
- globalParamsList = new ArrayList<>(userDefParamsSet);
- }
- processDefinition.setGlobalParamList(globalParamsList);
- processDefinition.setUpdateTime(now);
- processDefinition.setFlag(Flag.YES);
-
-
- processDefinition.setVersion(version);
- int update = processDefinitionMapper.updateById(processDefinition);
-
- // save processDefinitionLog
- ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject(
- JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class);
-
- processDefinitionLog.setOperator(loginUser.getId());
- processDefinitionLog.setOperateTime(now);
- int insert = processDefinitionLogMapper.insert(processDefinitionLog);
-
- if (update > 0 && insert > 0) {
+ if (saveResult > 0) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
@@ -544,13 +441,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return result;
}
- private int getNextVersion(List<ProcessDefinitionLog> processDefinitionLogs) {
- return processDefinitionLogs
- .stream()
- .map(ProcessDefinitionLog::getVersion)
- .max((x, y) -> x > y ? x : y)
- .orElse(0) + 1;
- }
/**
* verify process definition name unique
@@ -1732,18 +1622,8 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
, version);
return result;
}
-
- processDefinition.setVersion(processDefinitionLog.getVersion());
- processDefinition.setDescription(processDefinitionLog.getDescription());
- processDefinition.setLocations(processDefinitionLog.getLocations());
- processDefinition.setConnects(processDefinitionLog.getConnects());
- processDefinition.setTimeout(processDefinitionLog.getTimeout());
- processDefinition.setGlobalParams(processDefinitionLog.getGlobalParams());
- processDefinition.setUpdateTime(new Date());
- processDefinition.setWarningGroupId(processDefinitionLog.getWarningGroupId());
- processDefinition.setResourceIds(processDefinitionLog.getResourceIds());
-
- if (processDefinitionMapper.updateById(processDefinition) > 0) {
+ int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
+ if (switchVersion > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index ed50788..e140052 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import java.util.HashMap;
@@ -88,6 +89,9 @@ public class TaskDefinitionServiceImpl extends BaseService implements
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
+ @Autowired
+ private ProcessService processService;
+
/**
* create task definition
*
@@ -144,7 +148,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
taskNode.getTaskTimeoutParameter().getInterval(),
now,
now);
- taskDefinition.setResourceIds(getResourceIds(taskDefinition));
+ taskDefinition.setResourceIds(processService.getResourceIds(taskDefinition));
// save the new task definition
taskDefinitionMapper.insert(taskDefinition);
// save task definition log
@@ -160,30 +164,6 @@ public class TaskDefinitionServiceImpl extends BaseService implements
}
/**
- * get resource ids
- *
- * @param taskDefinition taskDefinition
- * @return resource ids
- */
- private String getResourceIds(TaskDefinition taskDefinition) {
- Set<Integer> resourceIds = null;
- // TODO modify taskDefinition.getTaskType()
- AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(), taskDefinition.getTaskParams());
-
- if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
- resourceIds = params.getResourceFilesList().
- stream()
- .filter(t -> t.getId() != 0)
- .map(ResourceInfo::getId)
- .collect(Collectors.toSet());
- }
- if (CollectionUtils.isEmpty(resourceIds)) {
- return StringUtils.EMPTY;
- }
- return StringUtils.join(resourceIds, ",");
- }
-
- /**
* query task definition
*
* @param loginUser login user
@@ -276,38 +256,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
return result;
}
- List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByDefinitionCode(taskCode);
- int version = taskDefinitionLogs
- .stream()
- .map(TaskDefinitionLog::getVersion)
- .max((x, y) -> x > y ? x : y)
- .orElse(0) + 1;
- Date now = new Date();
- taskDefinition.setVersion(version);
- taskDefinition.setCode(taskCode);
- taskDefinition.setName(taskNode.getName());
- taskDefinition.setDescription(taskNode.getDesc());
- taskDefinition.setProjectCode(project.getCode());
- taskDefinition.setUserId(loginUser.getId());
- taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
- taskDefinition.setTaskParams(taskNode.getParams());
- taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
- taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
- taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
- taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes());
- taskDefinition.setFailRetryInterval(taskNode.getRetryInterval());
- taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
- taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
- taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
- taskDefinition.setUpdateTime(now);
- taskDefinition.setResourceIds(getResourceIds(taskDefinition));
- taskDefinitionMapper.updateById(taskDefinition);
- // save task definition log
- TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
- taskDefinitionLog.set(taskDefinition);
- taskDefinitionLog.setOperator(loginUser.getId());
- taskDefinitionLog.setOperateTime(now);
- taskDefinitionLogMapper.insert(taskDefinitionLog);
+ processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index 6779258..5271392 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -184,6 +184,8 @@ public class ProcessDefinition {
@TableField(exist = false)
private int warningGroupId;
+ public ProcessDefinition(){}
+
public String getName() {
return name;
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index f1d43a3..aa6b4b8 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -47,10 +47,23 @@ public class ProcessInstance {
*/
@TableId(value = "id", type = IdType.AUTO)
private int id;
+
/**
* process definition id
+ * TODO delete
*/
private int processDefinitionId;
+
+ /**
+ * process definition code
+ */
+ private Long processDefinitionCode;
+
+ /**
+ * process definition version
+ */
+ private int processDefinitionVersion;
+
/**
* process state
*/
@@ -145,6 +158,7 @@ public class ProcessInstance {
/**
* process instance json
+ * TODO delete
*/
private String processInstanceJson;
@@ -579,6 +593,22 @@ public class ProcessInstance {
this.tenantId = tenantId;
}
+ public Long getProcessDefinitionCode() {
+ return processDefinitionCode;
+ }
+
+ public void setProcessDefinitionCode(Long processDefinitionCode) {
+ this.processDefinitionCode = processDefinitionCode;
+ }
+
+ public int getProcessDefinitionVersion() {
+ return processDefinitionVersion;
+ }
+
+ public void setProcessDefinitionVersion(int processDefinitionVersion) {
+ this.processDefinitionVersion = processDefinitionVersion;
+ }
+
@Override
public String toString() {
return "ProcessInstance{"
@@ -651,6 +681,12 @@ public class ProcessInstance {
+ timeout
+ ", tenantId="
+ tenantId
+ + ", processDefinitionCode='"
+ + processDefinitionCode
+ + '\''
+ + ", processDefinitionVersion='"
+ + processDefinitionVersion
+ + '\''
+ '}';
}
@@ -672,4 +708,5 @@ public class ProcessInstance {
public int hashCode() {
return Objects.hash(id);
}
+
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index b13ca87..e35cf8f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -58,7 +58,9 @@ public class TaskInstance implements Serializable {
/**
* process definition id
+ * TODO delete
*/
+ @TableField(exist = false)
private int processDefinitionId;
/**
@@ -67,6 +69,21 @@ public class TaskInstance implements Serializable {
private int processInstanceId;
/**
+ * task code
+ */
+ private long taskCode;
+
+ /**
+ * process definition code
+ */
+ private long processDefinitionCode;
+
+ /**
+ * task defintion version
+ */
+ private String taskDefinitionVersion;
+
+ /**
* process instance name
*/
@TableField(exist = false)
@@ -74,7 +91,9 @@ public class TaskInstance implements Serializable {
/**
* task json
+ * TODO delete
*/
+ @TableField(exist = false)
private String taskJson;
/**
@@ -601,4 +620,28 @@ public class TaskInstance implements Serializable {
+ ", delayTime=" + delayTime
+ '}';
}
+
+ public long getTaskCode() {
+ return taskCode;
+ }
+
+ public void setTaskCode(long taskCode) {
+ this.taskCode = taskCode;
+ }
+
+ public long getProcessDefinitionCode() {
+ return processDefinitionCode;
+ }
+
+ public void setProcessDefinitionCode(long processDefinitionCode) {
+ this.processDefinitionCode = processDefinitionCode;
+ }
+
+ public String getTaskDefinitionVersion() {
+ return taskDefinitionVersion;
+ }
+
+ public void setTaskDefinitionVersion(String taskDefinitionVersion) {
+ this.taskDefinitionVersion = taskDefinitionVersion;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
index 80a426a..18c3b96 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
@@ -46,12 +46,19 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
List<ProcessDefinitionLog> queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/**
+ * query max version for definition
+ * @param processDefinitionCode
+ * @return
+ */
+ int queryMaxVersionForDefinition(@Param("processDefinitionCode") long processDefinitionCode);
+
+ /**
* query the certain process definition version info by process definition code and version number
*
* @param processDefinitionCode process definition code
* @param version version number
* @return the process definition version info
*/
- ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("processDefinitionCode") Long processDefinitionCode, @Param("version") long version);
-
+ ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("processDefinitionCode") Long processDefinitionCode,
+ @Param("version") long version);
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
index 80cca29..60108fb 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
@@ -25,7 +25,6 @@
warning_group_id, timeout, tenant_id,operator, operate_time, create_time,
update_time
</sql>
-
<select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code,
pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects,
@@ -37,15 +36,13 @@
WHERE p.code = #{projectCode}
and pd.name = #{processDefinitionName}
</select>
-
<select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select
<include refid="baseSql"/>
from t_ds_process_definition_log
WHERE pd.code = #{processDefinitionCode}
</select>
-
- <select id="queryByProcessDefinitionCodeAndVersion"
+ <select id="queryByDefinitionCodeAndVersion"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select
<include refid="baseSql"/>
@@ -53,5 +50,10 @@
where code = #{processDefinitionCode}
and version = #{version}
</select>
+ <select id="queryMaxVersionForDefinition" resultType="java.lang.Integer">
+ select max(version)
+ from t_ds_process_definition_log
+ where code = #{processDefinitionCode}
+ </select>
</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index f661635..39ccdeb 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -19,7 +19,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper">
<sql id="baseSql">
- id, name, process_definition_id, state, recovery, start_time, end_time, run_times,host,
+ id, name, process_definition_id, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host,
command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params, process_instance_json, flag,
update_time, is_sub_process, executor_id, locations, connects, history_cmd, dependence_schedule_times,
@@ -88,7 +88,7 @@
</select>
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
- select instance.id, instance.process_definition_id, instance.command_type, instance.executor_id,
+ select instance.id, instance.process_definition_id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.name, instance.state, instance.schedule_time, instance.start_time, instance.end_time,
instance.run_times, instance.recovery, instance.host
from t_ds_process_instance instance
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 71bd251..43cfe39 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -19,13 +19,13 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper">
<sql id="baseSql">
- id, name, task_type, process_definition_id, process_instance_id, task_json, state, submit_time,
+ id, name, task_type, process_instance_id, task_code, task_definition_version, process_definition_code, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group, executor_id,
first_submit_time, delay_time, var_pool
</sql>
<sql id="baseSqlV2">
- ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.process_definition_id, ${alias}.process_instance_id, ${alias}.task_json, ${alias}.state, ${alias}.submit_time,
+ ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_definition_code, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group, ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.var_pool
@@ -72,7 +72,7 @@
<select id="countTaskInstanceStateByUser" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select state, count(0) as count
from t_ds_task_instance t
- left join t_ds_process_definition d on d.id=t.process_definition_id
+ left join t_ds_process_definition d on d.code=t.process_definition_code
left join t_ds_project p on p.id=d.project_id
where 1=1
<if test="projectIds != null and projectIds.length != 0">
@@ -98,7 +98,7 @@
<select id="countTask" resultType="java.lang.Integer">
select count(1) as count
from t_ds_task_instance task,t_ds_process_definition process
- where task.process_definition_id=process.id
+ where task.process_definition_code=process.code
<if test="projectIds != null and projectIds.length != 0">
and process.project_id in
<foreach collection="projectIds" index="index" item="i" open="(" separator="," close=")">
@@ -120,7 +120,7 @@
,
process.name as process_instance_name
from t_ds_task_instance instance
- left join t_ds_process_definition define on instance.process_definition_id = define.id
+ left join t_ds_process_definition define on instance.process_definition_code = define.code
left join t_ds_process_instance process on process.id=instance.process_instance_id
where define.project_id = #{projectId}
<if test="startTime != null">
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b9065ec..4acf6fe 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -35,13 +35,17 @@ import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@@ -49,18 +53,22 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CycleDependency;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
@@ -68,12 +76,15 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
@@ -126,6 +137,11 @@ public class ProcessService {
private ProcessDefinitionMapper processDefineMapper;
@Autowired
+ private ProcessDefinitionLogMapper processDefineLogMapper;
+
+
+
+ @Autowired
private ProcessInstanceMapper processInstanceMapper;
@Autowired
@@ -158,6 +174,15 @@ public class ProcessService {
@Autowired
private ProjectMapper projectMapper;
+ @Autowired
+ private TaskDefinitionMapper taskDefinitionMapper;
+
+ @Autowired
+ private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+ @Autowired
+ private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
@@ -340,6 +365,37 @@ public class ProcessService {
}
/**
+ * find process define by id.
+ *
+ * @param processDefinitionCode processDefinitionCode
+ * @return process definition
+ */
+ public ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version) {
+ ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
+ if (processDefinition.getVersion() != version) {
+ ProcessDefinitionLog log = processDefineLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, version);
+ processDefinition = convertFromLog(log);
+ }
+ return processDefinition;
+ }
+
+ /**
+ * covert log to process definition
+ * @param processDefinitionLog
+ * @return
+ */
+ public ProcessDefinition convertFromLog(ProcessDefinitionLog processDefinitionLog) {
+ ProcessDefinition definition = null;
+ if (null != processDefinitionLog) {
+ definition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog), ProcessDefinition.class);
+ }
+ if (null != definition) {
+ definition.setId(0);
+ }
+ return definition;
+ }
+
+ /**
* delete work process instance by id
*
* @param processInstanceId processInstanceId
@@ -2055,4 +2111,196 @@ public class ProcessService {
}
return JSONUtils.toJsonString(processData);
}
+
+ /**
+ * switch process definition version to process definition log version
+ *
+ * @param processDefinition
+ * @param processDefinitionLog
+ * @return
+ */
+ public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
+ if (null == processDefinition || null == processDefinitionLog) {
+ return Constants.EXIT_CODE_FAILURE;
+ }
+
+ ProcessDefinition tmpDefinition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog),
+ ProcessDefinition.class);
+ tmpDefinition.setId(processDefinition.getId());
+ tmpDefinition.setReleaseState(ReleaseState.OFFLINE);
+ tmpDefinition.setFlag(Flag.YES);
+
+ int switchResult = 0;
+ if (0 == processDefinition.getId()) {
+
+ switchResult = processDefineMapper.insert(tmpDefinition);
+ } else {
+ switchResult = processDefineMapper.updateById(tmpDefinition);
+ }
+ //TODO... switch task relations
+ return switchResult;
+ }
+
+ /**
+ * update task definition
+ *
+ * @param operator
+ * @param projectCode
+ * @param taskNode
+ * @param taskDefinition
+ * @return
+ */
+ public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {
+
+ List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByDefinitionCode(taskDefinition.getCode());
+ int version = taskDefinitionLogs
+ .stream()
+ .map(TaskDefinitionLog::getVersion)
+ .max((x, y) -> x > y ? x : y)
+ .orElse(0) + 1;
+ Date now = new Date();
+ taskDefinition.setVersion(version);
+ taskDefinition.setCode(taskDefinition.getCode());
+ taskDefinition.setName(taskNode.getName());
+ taskDefinition.setDescription(taskNode.getDesc());
+ taskDefinition.setProjectCode(projectCode);
+ taskDefinition.setUserId(operator.getId());
+ taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
+ taskDefinition.setTaskParams(taskNode.getParams());
+ taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
+ taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
+ taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
+ taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes());
+ taskDefinition.setFailRetryInterval(taskNode.getRetryInterval());
+ taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
+ taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
+ taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
+ taskDefinition.setUpdateTime(now);
+ taskDefinition.setResourceIds(getResourceIds(taskDefinition));
+ int update = taskDefinitionMapper.updateById(taskDefinition);
+ // save task definition log
+ TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
+ taskDefinitionLog.set(taskDefinition);
+ taskDefinitionLog.setOperator(operator.getId());
+ taskDefinitionLog.setOperateTime(now);
+ int insert = taskDefinitionLogMapper.insert(taskDefinitionLog);
+ return insert & update;
+ }
+
+ /**
+ * get resource ids
+ *
+ * @param taskDefinition taskDefinition
+ * @return resource ids
+ */
+ public String getResourceIds(TaskDefinition taskDefinition) {
+ Set<Integer> resourceIds = null;
+ // TODO modify taskDefinition.getTaskType()
+ AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(), taskDefinition.getTaskParams());
+
+ if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
+ resourceIds = params.getResourceFilesList().
+ stream()
+ .filter(t -> t.getId() != 0)
+ .map(ResourceInfo::getId)
+ .collect(Collectors.toSet());
+ }
+ if (CollectionUtils.isEmpty(resourceIds)) {
+ return StringUtils.EMPTY;
+ }
+ return StringUtils.join(resourceIds, ",");
+ }
+
+ /**
+ * @param operator
+ * @param name
+ * @param desc
+ * @param locations
+ * @param connects
+ * @param project
+ * @param processData
+ * @param processDefinition
+ * @return
+ */
+ public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
+ String connects, ProcessData processData, ProcessDefinition processDefinition) {
+ List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
+ for (TaskNode task : taskNodeList) {
+ // TODO update by code directly
+ TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(project.getCode(), task.getName());
+ updateTaskDefinition(operator, project.getCode(), task, taskDefinition);
+ }
+ createTaskAndRelation(operator, project.getName(), "", processDefinition, processData);
+ ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(),
+ name, processData, project, desc, locations, connects);
+ return switchVersion(processDefinition, processDefinitionLog);
+ }
+
+ /**
+ * @param operator
+ * @param processDefinitionCode
+ * @param processDefinitionName
+ * @param processData
+ * @param project
+ * @param desc
+ * @param locations
+ * @param connects
+ * @return
+ */
+ public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName,
+ ProcessData processData, Project project,
+ String desc, String locations, String connects) {
+ ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
+ int version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode());
+ processDefinitionLog.setCode(processDefinitionCode);
+ processDefinitionLog.setVersion(version);
+ processDefinitionLog.setName(processDefinitionName);
+ processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
+ processDefinitionLog.setProjectCode(project.getCode());
+ processDefinitionLog.setDescription(desc);
+ processDefinitionLog.setLocations(locations);
+ processDefinitionLog.setConnects(connects);
+ processDefinitionLog.setTimeout(processData.getTimeout());
+ processDefinitionLog.setTenantId(processData.getTenantId());
+ processDefinitionLog.setOperator(operator.getId());
+ Date updateTime = new Date();
+ processDefinitionLog.setOperateTime(updateTime);
+ processDefinitionLog.setUpdateTime(updateTime);
+
+ //custom global params
+ List<Property> globalParamsList = new ArrayList<>();
+ if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) {
+ Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams());
+ globalParamsList = new ArrayList<>(userDefParamsSet);
+ }
+ processDefinitionLog.setGlobalParamList(globalParamsList);
+ processDefinitionLog.setFlag(Flag.YES);
+ int insert = processDefinitionLogMapper.insert(processDefinitionLog);
+ if (insert > 0) {
+ return processDefinitionLog;
+ }
+ return null;
+ }
+
+ /**
+ * create task defintion and task relations
+ *
+ * @param loginUser
+ * @param projectName
+ * @param relationName
+ * @param processDefinition
+ * @param processData
+ * @return
+ */
+ public void createTaskAndRelation(User loginUser, String projectName, String relationName,
+ ProcessDefinition processDefinition,
+ ProcessData processData) {
+ List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
+ for (TaskNode task : taskNodeList) {
+ //TODO... task code exists, update task
+ //createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task));
+ }
+ // TODO: query taskCode by projectCode and taskName
+ }
+
}
diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql
index 430f03a..d1439f6 100644
--- a/sql/dolphinscheduler-postgre.sql
+++ b/sql/dolphinscheduler-postgre.sql
@@ -674,10 +674,10 @@ CREATE TABLE t_ds_task_instance (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
task_type varchar(64) DEFAULT NULL ,
+ task_code bigint NOT NULL,
task_definition_version int DEFAULT NULL ,
process_definition_code bigint DEFAULT NULL ,
process_instance_id int DEFAULT NULL ,
- task_json text ,
state int DEFAULT NULL ,
submit_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index bdc5dd8..a9d4752 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -810,11 +810,11 @@ DROP TABLE IF EXISTS `t_ds_task_instance`;
CREATE TABLE `t_ds_task_instance` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`name` varchar(255) DEFAULT NULL COMMENT 'task name',
- `task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version',
`task_type` varchar(64) DEFAULT NULL COMMENT 'task type',
+ `task_code` bigint(20) NOT NULL COMMENT 'task definition code',
+ `task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
`process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id',
- `task_json` longtext COMMENT 'task content json',
`state` tinyint(4) DEFAULT NULL COMMENT 'Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete',
`submit_time` datetime DEFAULT NULL COMMENT 'task submit time',
`start_time` datetime DEFAULT NULL COMMENT 'task start time',