You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/02/22 02:13:41 UTC
[incubator-dolphinscheduler] branch json_split updated:
[Feature][JsonSplit]refactor remove the json in process instance and
definition (#4828)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new 62e961e [Feature][JsonSplit]refactor remove the json in process instance and definition (#4828)
62e961e is described below
commit 62e961e3f1bab666781df982d2284720db434461
Author: bao liang <29...@users.noreply.github.com>
AuthorDate: Mon Feb 22 10:13:32 2021 +0800
[Feature][JsonSplit]refactor remove the json in process instance and definition (#4828)
* #4417 [JsonSplit] refactor process definition json
* [#4177] refactor json split. remove json in process definition and process instance
* [#4177] refactor json split. copy process definition need set task code 0L.
* code style
* code style
* code style
* code style
* update
* update
---
.../service/impl/ProcessDefinitionServiceImpl.java | 8 +-
.../impl/ProcessDefinitionVersionServiceImpl.java | 1 -
.../service/impl/ProcessInstanceServiceImpl.java | 6 -
.../impl/ProcessTaskRelationServiceImpl.java | 2 +-
.../service/impl/TaskDefinitionServiceImpl.java | 3 +-
.../dolphinscheduler/common/model/TaskNode.java | 665 +++++++++++----------
.../dolphinscheduler/dao/utils/DagHelper.java | 52 +-
.../server/master/runner/MasterExecThread.java | 17 +-
.../service/process/ProcessService.java | 118 +---
.../service/process/ProcessServiceTest.java | 104 ----
10 files changed, 406 insertions(+), 570 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 2b9e7a3..f7ddf06 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -427,9 +427,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
}
- // get the processdefinitionjson before saving,and then save the name and taskid
- String oldJson = processDefinition.getProcessDefinitionJson();
- processDefinitionJson = processService.changeJson(processData, oldJson);
ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc,
locations, connects, newProcessData, processDefinition);
@@ -1452,6 +1449,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
return result;
} else {
+ ProcessData processData = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson(), ProcessData.class);
+ List<TaskNode> taskNodeList = processData.getTasks();
+ taskNodeList.stream().forEach(taskNode -> {
+ taskNode.setCode(0L);
+ });
return createProcessDefinition(
loginUser,
targetProject.getName(),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
index 64dcd87..0764810 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
@@ -71,7 +71,6 @@ public class ProcessDefinitionVersionServiceImpl extends BaseServiceImpl impleme
.newBuilder()
.processDefinitionId(processDefinition.getId())
.version(version)
- .processDefinitionJson(processDefinition.getProcessDefinitionJson())
.description(processDefinition.getDescription())
.locations(processDefinition.getLocations())
.connects(processDefinition.getConnects())
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index aeb23df..905b245 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -457,11 +457,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
}
Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
processDefinition.getUserId());
- // get the processinstancejson before saving,and then save the name and taskid
- String oldJson = processInstance.getProcessInstanceJson();
- if (StringUtils.isNotEmpty(oldJson)) {
- processInstanceJson = processService.changeJson(processData, oldJson);
- }
setProcessInstance(processInstance, tenant, scheduleTime, locations,
connects, processInstanceJson, processData);
int update = processService.updateProcessInstance(processInstance);
@@ -530,7 +525,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
-
/**
* query parent process instance detail info by sub process instance id
*
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index 5ea01bd..e57ef00 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -41,7 +41,7 @@ import org.springframework.stereotype.Service;
* task definition service impl
*/
@Service
-public class ProcessTaskRelationServiceImpl extends BaseService implements
+public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements
ProcessTaskRelationService {
private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 8b1e672..8e32133 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -56,8 +56,7 @@ import org.springframework.transaction.annotation.Transactional;
* task definition service impl
*/
@Service
-public class TaskDefinitionServiceImpl extends BaseService implements
- TaskDefinitionService {
+public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDefinitionService {
private static final Logger logger = LoggerFactory.getLogger(TaskDefinitionServiceImpl.class);
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index cd3e573..0072b93 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -23,9 +23,11 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.io.IOException;
@@ -35,330 +37,299 @@ import java.util.Objects;
public class TaskNode {
- /**
- * task node id
- */
- private String id;
-
- /**
- * task node name
- */
- private String name;
-
- /**
- * task node description
- */
- private String desc;
-
- /**
- * task node type
- */
- private String type;
-
- /**
- * the run flag has two states, NORMAL or FORBIDDEN
- */
- private String runFlag;
-
- /**
- * the front field
- */
- private String loc;
-
- /**
- * maximum number of retries
- */
- private int maxRetryTimes;
-
- /**
- * Unit of retry interval: points
- */
- private int retryInterval;
-
- /**
- * params information
- */
- @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
- @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
- private String params;
-
- /**
- * inner dependency information
- */
- @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
- @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
- private String preTasks;
-
- /**
- * users store additional information
- */
- @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
- @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
- private String extras;
-
- /**
- * node dependency list
- */
- private List<String> depList;
-
- /**
- * outer dependency information
- */
- @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
- @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
- private String dependence;
-
-
- @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
- @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
- private String conditionResult;
-
- /**
- * task instance priority
- */
- private Priority taskInstancePriority;
-
- /**
- * worker group
- */
- private String workerGroup;
-
- /**
- * worker group id
- */
- private Integer workerGroupId;
-
-
- /**
- * task time out
- */
- @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
- @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
- private String timeout;
+ /**
+ * task node id
+ */
+ private String id;
+
+ /**
+ * task node code
+ */
+ private Long code;
+
+ /**
+ * task node version
+ */
+ private int version;
+
+ /**
+ * task node name
+ */
+ private String name;
+
+ /**
+ * task node description
+ */
+ private String desc;
+
+ /**
+ * task node type
+ */
+ private String type;
+
+ /**
+ * the run flag has two states, NORMAL or FORBIDDEN
+ */
+ private String runFlag;
+
+ /**
+ * the front field
+ */
+ private String loc;
+
+ /**
+ * maximum number of retries
+ */
+ private int maxRetryTimes;
+
+ /**
+ * Unit of retry interval: points
+ */
+ private int retryInterval;
+
+ /**
+ * params information
+ */
+ @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+ @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+ private String params;
+
+ /**
+ * inner dependency information
+ */
+ @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+ @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+ private String preTasks;
+
+ /**
+ * users store additional information
+ */
+ @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+ @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+ private String extras;
+
+ /**
+ * node dependency list
+ */
+ private List<String> depList;
+
+ /**
+ * outer dependency information
+ */
+ @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+ @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+ private String dependence;
+
+
+ @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+ @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+ private String conditionResult;
+
+ /**
+ * task instance priority
+ */
+ private Priority taskInstancePriority;
+
+ /**
+ * worker group
+ */
+ private String workerGroup;
+
+ /**
+ * worker group id
+ */
+ private Integer workerGroupId;
+
+
+ /**
+ * task time out
+ */
+ @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+ @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+ private String timeout;
/**
* delay execution time.
*/
private int delayTime;
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getDesc() {
- return desc;
- }
-
- public void setDesc(String desc) {
- this.desc = desc;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getParams() {
- return params;
- }
-
- public void setParams(String params) {
- this.params = params;
- }
-
- public String getPreTasks() {
- return preTasks;
- }
-
- public void setPreTasks(String preTasks) throws IOException {
- this.preTasks = preTasks;
- this.depList = JSONUtils.toList(preTasks, String.class);
- }
-
- public String getExtras() {
- return extras;
- }
-
- public void setExtras(String extras) {
- this.extras = extras;
- }
-
- public List<String> getDepList() {
- return depList;
- }
-
- public void setDepList(List<String> depList) throws JsonProcessingException {
- this.depList = depList;
- this.preTasks = JSONUtils.toJsonString(depList);
- }
-
- public String getLoc() {
- return loc;
- }
-
- public void setLoc(String loc) {
- this.loc = loc;
- }
-
- public String getRunFlag(){
- return runFlag;
- }
-
- public void setRunFlag(String runFlag) {
- this.runFlag = runFlag;
- }
-
- public Boolean isForbidden(){
- return (StringUtils.isNotEmpty(this.runFlag) &&
- this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN));
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TaskNode taskNode = (TaskNode) o;
- return Objects.equals(name, taskNode.name) &&
- Objects.equals(desc, taskNode.desc) &&
- Objects.equals(type, taskNode.type) &&
- Objects.equals(params, taskNode.params) &&
- Objects.equals(preTasks, taskNode.preTasks) &&
- Objects.equals(extras, taskNode.extras) &&
- Objects.equals(runFlag, taskNode.runFlag) &&
- Objects.equals(dependence, taskNode.dependence) &&
- Objects.equals(workerGroup, taskNode.workerGroup) &&
- Objects.equals(conditionResult, taskNode.conditionResult) &&
-
- CollectionUtils.equalLists(depList, taskNode.depList);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag);
- }
-
- public String getDependence() {
- return dependence;
- }
-
- public void setDependence(String dependence) {
- this.dependence = dependence;
- }
-
- public int getMaxRetryTimes() {
- return maxRetryTimes;
- }
-
- public void setMaxRetryTimes(int maxRetryTimes) {
- this.maxRetryTimes = maxRetryTimes;
- }
-
- public int getRetryInterval() {
- return retryInterval;
- }
-
- public void setRetryInterval(int retryInterval) {
- this.retryInterval = retryInterval;
- }
-
- public Priority getTaskInstancePriority() {
- return taskInstancePriority;
- }
-
- public void setTaskInstancePriority(Priority taskInstancePriority) {
- this.taskInstancePriority = taskInstancePriority;
- }
-
- public String getTimeout() {
- return timeout;
- }
-
- public void setTimeout(String timeout) {
- this.timeout = timeout;
- }
-
- /**
- * get task time out parameter
- * @return task time out parameter
- */
- public TaskTimeoutParameter getTaskTimeoutParameter() {
- if(StringUtils.isNotEmpty(this.getTimeout())){
- String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name());
- String taskTimeout = this.getTimeout().replace(formatStr,TaskTimeoutStrategy.WARNFAILED.name());
- return JSONUtils.parseObject(taskTimeout,TaskTimeoutParameter.class);
- }
- return new TaskTimeoutParameter(false);
- }
-
- public boolean isConditionsTask(){
- return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType());
- }
-
- @Override
- public String toString() {
- return "TaskNode{"
- + "id='" + id + '\''
- + ", name='" + name + '\''
- + ", desc='" + desc + '\''
- + ", type='" + type + '\''
- + ", runFlag='" + runFlag + '\''
- + ", loc='" + loc + '\''
- + ", maxRetryTimes=" + maxRetryTimes
- + ", retryInterval=" + retryInterval
- + ", params='" + params + '\''
- + ", preTasks='" + preTasks + '\''
- + ", extras='" + extras + '\''
- + ", depList=" + depList
- + ", dependence='" + dependence + '\''
- + ", taskInstancePriority=" + taskInstancePriority
- + ", timeout='" + timeout + '\''
- + ", workerGroup='" + workerGroup + '\''
- + ", delayTime=" + delayTime
- + '}';
- }
-
- public String getWorkerGroup() {
- return workerGroup;
- }
-
- public void setWorkerGroup(String workerGroup) {
- this.workerGroup = workerGroup;
- }
-
- public String getConditionResult() {
- return conditionResult;
- }
-
- public void setConditionResult(String conditionResult) {
- this.conditionResult = conditionResult;
- }
-
- public Integer getWorkerGroupId() {
- return workerGroupId;
- }
-
- public void setWorkerGroupId(Integer workerGroupId) {
- this.workerGroupId = workerGroupId;
- }
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public void setDesc(String desc) {
+ this.desc = desc;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getParams() {
+ return params;
+ }
+
+ public void setParams(String params) {
+ this.params = params;
+ }
+
+ public String getPreTasks() {
+ return preTasks;
+ }
+
+ public void setPreTasks(String preTasks) throws IOException {
+ this.preTasks = preTasks;
+ this.depList = JSONUtils.toList(preTasks, String.class);
+ }
+
+ public String getExtras() {
+ return extras;
+ }
+
+ public void setExtras(String extras) {
+ this.extras = extras;
+ }
+
+ public List<String> getDepList() {
+ return depList;
+ }
+
+ public void setDepList(List<String> depList) throws JsonProcessingException {
+ this.depList = depList;
+ this.preTasks = JSONUtils.toJsonString(depList);
+ }
+
+ public String getLoc() {
+ return loc;
+ }
+
+ public void setLoc(String loc) {
+ this.loc = loc;
+ }
+
+ public String getRunFlag() {
+ return runFlag;
+ }
+
+ public void setRunFlag(String runFlag) {
+ this.runFlag = runFlag;
+ }
+
+ public Boolean isForbidden() {
+ return (StringUtils.isNotEmpty(this.runFlag)
+ && this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskNode taskNode = (TaskNode) o;
+ return Objects.equals(name, taskNode.name)
+ && Objects.equals(desc, taskNode.desc)
+ && Objects.equals(type, taskNode.type)
+ && Objects.equals(params, taskNode.params)
+ && Objects.equals(preTasks, taskNode.preTasks)
+ && Objects.equals(extras, taskNode.extras)
+ && Objects.equals(runFlag, taskNode.runFlag)
+ && Objects.equals(dependence, taskNode.dependence)
+ && Objects.equals(workerGroup, taskNode.workerGroup)
+ && Objects.equals(conditionResult, taskNode.conditionResult)
+ && CollectionUtils.equalLists(depList, taskNode.depList);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag);
+ }
+
+ public String getDependence() {
+ return dependence;
+ }
+
+ public void setDependence(String dependence) {
+ this.dependence = dependence;
+ }
+
+ public int getMaxRetryTimes() {
+ return maxRetryTimes;
+ }
+
+ public void setMaxRetryTimes(int maxRetryTimes) {
+ this.maxRetryTimes = maxRetryTimes;
+ }
+
+ public int getRetryInterval() {
+ return retryInterval;
+ }
+
+ public void setRetryInterval(int retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+
+ public Priority getTaskInstancePriority() {
+ return taskInstancePriority;
+ }
+
+ public void setTaskInstancePriority(Priority taskInstancePriority) {
+ this.taskInstancePriority = taskInstancePriority;
+ }
+
+ public String getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(String timeout) {
+ this.timeout = timeout;
+ }
+
+ public String getWorkerGroup() {
+ return workerGroup;
+ }
+
+ public void setWorkerGroup(String workerGroup) {
+ this.workerGroup = workerGroup;
+ }
+
+ public String getConditionResult() {
+ return conditionResult;
+ }
+
+ public void setConditionResult(String conditionResult) {
+ this.conditionResult = conditionResult;
+ }
+
+ public Integer getWorkerGroupId() {
+ return workerGroupId;
+ }
+
+ public void setWorkerGroupId(Integer workerGroupId) {
+ this.workerGroupId = workerGroupId;
+ }
public int getDelayTime() {
return delayTime;
@@ -367,4 +338,62 @@ public class TaskNode {
public void setDelayTime(int delayTime) {
this.delayTime = delayTime;
}
+
+ public Long getCode() {
+ return code;
+ }
+
+ public void setCode(Long code) {
+ this.code = code;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ /**
+ * get task time out parameter
+ *
+ * @return task time out parameter
+ */
+ public TaskTimeoutParameter getTaskTimeoutParameter() {
+ if (StringUtils.isNotEmpty(this.getTimeout())) {
+ String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name());
+ String taskTimeout = this.getTimeout().replace(formatStr, TaskTimeoutStrategy.WARNFAILED.name());
+ return JSONUtils.parseObject(taskTimeout, TaskTimeoutParameter.class);
+ }
+ return new TaskTimeoutParameter(false);
+ }
+
+ public boolean isConditionsTask() {
+ return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType());
+ }
+
+ @Override
+ public String toString() {
+ return "TaskNode{"
+ + "id='" + id + '\''
+ + ", name='" + name + '\''
+ + ", desc='" + desc + '\''
+ + ", type='" + type + '\''
+ + ", runFlag='" + runFlag + '\''
+ + ", loc='" + loc + '\''
+ + ", maxRetryTimes=" + maxRetryTimes
+ + ", retryInterval=" + retryInterval
+ + ", params='" + params + '\''
+ + ", preTasks='" + preTasks + '\''
+ + ", extras='" + extras + '\''
+ + ", depList=" + depList
+ + ", dependence='" + dependence + '\''
+ + ", taskInstancePriority=" + taskInstancePriority
+ + ", timeout='" + timeout + '\''
+ + ", workerGroup='" + workerGroup + '\''
+ + ", delayTime=" + delayTime
+ + '}';
+ }
+
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index f7eaabc..970fc47 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -25,17 +25,13 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
/**
* dag tools
@@ -193,24 +189,19 @@ public class DagHelper {
/**
* generate dag by start nodes and recovery nodes
*
- * @param processDefinitionJson processDefinitionJson
+ * @param totalTaskNodeList totalTaskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeNameList recoveryNodeNameList
* @param depNodeType depNodeType
* @return process dag
* @throws Exception if error throws Exception
*/
- public static ProcessDag generateFlowDag(String processDefinitionJson,
+ public static ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
List<String> startNodeNameList,
List<String> recoveryNodeNameList,
TaskDependType depNodeType) throws Exception {
- ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
- List<TaskNode> taskNodeList = new ArrayList<>();
- if (null != processData) {
- taskNodeList = processData.getTasks();
- }
- List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(taskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
+ List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
if (destTaskNodeList.isEmpty()) {
return null;
}
@@ -222,29 +213,6 @@ public class DagHelper {
}
/**
- * parse the forbidden task nodes in process definition.
- *
- * @param processDefinitionJson processDefinitionJson
- * @return task node map
- */
- public static Map<String, TaskNode> getForbiddenTaskNodeMaps(String processDefinitionJson) {
- Map<String, TaskNode> forbidTaskNodeMap = new ConcurrentHashMap<>();
- ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
-
- List<TaskNode> taskNodeList = new ArrayList<>();
- if (null != processData) {
- taskNodeList = processData.getTasks();
- }
- for (TaskNode node : taskNodeList) {
- if (node.isForbidden()) {
- forbidTaskNodeMap.putIfAbsent(node.getName(), node);
- }
- }
- return forbidTaskNodeMap;
- }
-
-
- /**
* find node by node name
*
* @param nodeDetails nodeDetails
@@ -470,18 +438,16 @@ public class DagHelper {
/**
* get process dag
*
- * @param taskDefinitions task definition
+ * @param taskNodeList task node list
* @return Process dag
*/
- public static ProcessDag getProcessDag(List<TaskDefinition> taskDefinitions,
+ public static ProcessDag getProcessDag(List<TaskNode> taskNodeList,
List<ProcessTaskRelation> processTaskRelations) {
Map<Long, TaskNode> taskNodeMap = new HashMap<>();
- List<TaskNode> taskNodeList = new ArrayList<>();
- for (TaskDefinition taskDefinition : taskDefinitions) {
- TaskNode taskNode = JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinition), TaskNode.class);
- taskNodeMap.put(taskDefinition.getCode(), taskNode);
- taskNodeList.add(taskNode);
- }
+
+ taskNodeList.stream().forEach(taskNode -> {
+ taskNodeMap.putIfAbsent(taskNode.getCode(), taskNode);
+ });
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 3b113b6..ff44842 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -383,12 +383,17 @@ public class MasterExecThread implements Runnable {
*/
private void buildFlowDag() throws Exception {
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
-
- forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
+ List<TaskNode> taskNodeList = processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId());
+ forbiddenTaskList.clear();
+ taskNodeList.stream().forEach(taskNode -> {
+ if (taskNode.isForbidden()) {
+ forbiddenTaskList.put(taskNode.getName(), taskNode);
+ }
+ });
// generate process to get DAG info
List<String> recoveryNameList = getRecoveryNodeNameList();
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
- ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(),
+ ProcessDag processDag = generateFlowDag(taskNodeList,
startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
if (processDag == null) {
logger.error("processDag is null");
@@ -1229,17 +1234,17 @@ public class MasterExecThread implements Runnable {
/**
* generate flow dag
*
- * @param processDefinitionJson process definition json
+ * @param totalTaskNodeList total task node list
* @param startNodeNameList start node name list
* @param recoveryNodeNameList recovery node name list
* @param depNodeType depend node type
* @return ProcessDag process dag
* @throws Exception exception
*/
- public ProcessDag generateFlowDag(String processDefinitionJson,
+ public ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
List<String> startNodeNameList,
List<String> recoveryNodeNameList,
TaskDependType depNodeType) throws Exception {
- return DagHelper.generateFlowDag(processDefinitionJson, startNodeNameList, recoveryNodeNameList, depNodeType);
+ return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 472f98a..cc82844 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -341,20 +341,22 @@ public class ProcessService {
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId) {
ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
if (processDefinition == null) {
- logger.info("process define not exists");
- return null;
- }
-
- String processDefinitionJson = processDefinition.getProcessDefinitionJson();
- ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
-
- //process data check
- if (null == processData) {
- logger.error("process data is null");
+ logger.error("process define not exists");
return new ArrayList<>();
}
- return processData.getTasks();
+ List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
+ Map<Long, TaskDefinition> taskDefinitionMap = new HashMap<>();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+ if (taskDefinitionMap.containsKey(processTaskRelation.getPostTaskCode())) {
+ TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(processTaskRelation.getPostTaskCode());
+ taskDefinitionMap.put(processTaskRelation.getPostTaskCode(), taskDefinition);
+ }
+ }
+ return taskDefinitionMap.entrySet()
+ .stream()
+ .map(e -> JSONUtils.parseObject(JSONUtils.toJsonString(e.getValue()), TaskNode.class))
+ .collect(Collectors.toList());
}
/**
@@ -495,13 +497,7 @@ public class ProcessService {
* @param ids ids
*/
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
- ProcessDefinition processDefinition = processDefineMapper.selectById(parentId);
- String processDefinitionJson = processDefinition.getProcessDefinitionJson();
-
- ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
-
- List<TaskNode> taskNodeList = processData.getTasks();
-
+ List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(parentId);
if (taskNodeList != null && !taskNodeList.isEmpty()) {
for (TaskNode taskNode : taskNodeList) {
@@ -650,8 +646,6 @@ public class ProcessService {
getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime()));
- //copy process define json to process instance
- processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
String workerGroup = StringUtils.isBlank(command.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : command.getWorkerGroup();
@@ -2064,60 +2058,6 @@ public class ProcessService {
taskInstance.getId());
}
- /**
- * solve the branch rename bug
- *
- * @return String
- */
- public String changeJson(ProcessData processData, String oldJson) {
- ProcessData oldProcessData = JSONUtils.parseObject(oldJson, ProcessData.class);
- HashMap<String, String> oldNameTaskId = new HashMap<>();
- List<TaskNode> oldTasks = oldProcessData.getTasks();
- for (int i = 0; i < oldTasks.size(); i++) {
- TaskNode taskNode = oldTasks.get(i);
- String oldName = taskNode.getName();
- String oldId = taskNode.getId();
- oldNameTaskId.put(oldName, oldId);
- }
-
- // take the processdefinitionjson saved this time, and then save the taskid and name
- HashMap<String, String> newNameTaskId = new HashMap<>();
- List<TaskNode> newTasks = processData.getTasks();
- for (int i = 0; i < newTasks.size(); i++) {
- TaskNode taskNode = newTasks.get(i);
- String newId = taskNode.getId();
- String newName = taskNode.getName();
- newNameTaskId.put(newId, newName);
- }
-
- // replace the previous conditionresult with a new one
- List<TaskNode> tasks = processData.getTasks();
- for (int i = 0; i < tasks.size(); i++) {
- TaskNode taskNode = newTasks.get(i);
- String type = taskNode.getType();
- if (TaskType.CONDITIONS.getDescp().equalsIgnoreCase(type)) {
- ConditionsParameters conditionsParameters = JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
- String oldSuccessNodeName = conditionsParameters.getSuccessNode().get(0);
- String oldFailedNodeName = conditionsParameters.getFailedNode().get(0);
- String newSuccessNodeName = newNameTaskId.get(oldNameTaskId.get(oldSuccessNodeName));
- String newFailedNodeName = newNameTaskId.get(oldNameTaskId.get(oldFailedNodeName));
- if (newSuccessNodeName != null) {
- ArrayList<String> successNode = new ArrayList<>();
- successNode.add(newSuccessNodeName);
- conditionsParameters.setSuccessNode(successNode);
- }
- if (newFailedNodeName != null) {
- ArrayList<String> failedNode = new ArrayList<>();
- failedNode.add(newFailedNodeName);
- conditionsParameters.setFailedNode(failedNode);
- }
- String conditionResultStr = conditionsParameters.getConditionResult();
- taskNode.setConditionResult(conditionResultStr);
- tasks.set(i, taskNode);
- }
- }
- return JSONUtils.toJsonString(processData);
- }
/**
* switch process definition version to process definition log version
@@ -2391,24 +2331,30 @@ public class ProcessService {
* @return dag graph
*/
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
+ List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(processDefinition.getId());
+ List<ProcessTaskRelation> processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion());
+ ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, processTaskRelations);
+ // Generate concrete Dag to be executed
+ return DagHelper.buildDagGraph(processDag);
+ }
+ /**
+ * get process task relation list
+ * this function can be query relation list from log record
+ *
+ * @param processCode
+ * @param processVersion
+ * @return
+ */
+ public List<ProcessTaskRelation> getProcessTaskRelationList(Long processCode, int processVersion) {
List<ProcessTaskRelationLog> taskRelationLogs = processTaskRelationLogMapper.queryByProcessCodeAndVersion(
- processDefinition.getCode(),
- processDefinition.getVersion());
+ processCode,
+ processVersion);
List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
- List<TaskDefinition> taskDefinitions = new ArrayList<>();
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationLogs) {
processTaskRelations.add(JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class));
-
- TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
- processTaskRelationLog.getPostTaskCode(),
- processTaskRelationLog.getPostNodeVersion());
- taskDefinitions.add(JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class));
}
-
- ProcessDag processDag = DagHelper.getProcessDag(taskDefinitions, processTaskRelations);
- // Generate concrete Dag to be executed
- return DagHelper.buildDagGraph(processDag);
+ return processTaskRelations;
}
}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index b6d518c..cfa8caa 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -338,108 +338,4 @@ public class ProcessServiceTest {
processService.recurseFindSubProcessId(parentId, ids);
}
-
- @Test
- public void testChangeJson() {
-
- ProcessData oldProcessData = new ProcessData();
- ConditionsParameters conditionsParameters = new ConditionsParameters();
- ArrayList<TaskNode> tasks = new ArrayList<>();
- TaskNode taskNode = new TaskNode();
- TaskNode taskNode11 = new TaskNode();
- TaskNode taskNode111 = new TaskNode();
- ArrayList<String> successNode = new ArrayList<>();
- ArrayList<String> faildNode = new ArrayList<>();
-
- taskNode.setName("bbb");
- taskNode.setType("SHELL");
- taskNode.setId("222");
-
- taskNode11.setName("vvv");
- taskNode11.setType("CONDITIONS");
- taskNode11.setId("444");
- successNode.add("bbb");
- faildNode.add("ccc");
-
- taskNode111.setName("ccc");
- taskNode111.setType("SHELL");
- taskNode111.setId("333");
-
- conditionsParameters.setSuccessNode(successNode);
- conditionsParameters.setFailedNode(faildNode);
- taskNode11.setConditionResult(conditionsParameters.getConditionResult());
- tasks.add(taskNode);
- tasks.add(taskNode11);
- tasks.add(taskNode111);
- oldProcessData.setTasks(tasks);
-
- ProcessData newProcessData = new ProcessData();
- ConditionsParameters conditionsParameters2 = new ConditionsParameters();
- TaskNode taskNode2 = new TaskNode();
- TaskNode taskNode22 = new TaskNode();
- TaskNode taskNode222 = new TaskNode();
- ArrayList<TaskNode> tasks2 = new ArrayList<>();
- ArrayList<String> successNode2 = new ArrayList<>();
- ArrayList<String> faildNode2 = new ArrayList<>();
-
- taskNode2.setName("bbbchange");
- taskNode2.setType("SHELL");
- taskNode2.setId("222");
-
- taskNode22.setName("vv");
- taskNode22.setType("CONDITIONS");
- taskNode22.setId("444");
- successNode2.add("bbb");
- faildNode2.add("ccc");
-
- taskNode222.setName("ccc");
- taskNode222.setType("SHELL");
- taskNode222.setId("333");
-
- conditionsParameters2.setSuccessNode(successNode2);
- conditionsParameters2.setFailedNode(faildNode2);
- taskNode22.setConditionResult(conditionsParameters2.getConditionResult());
- tasks2.add(taskNode2);
- tasks2.add(taskNode22);
- tasks2.add(taskNode222);
-
- newProcessData.setTasks(tasks2);
-
- ProcessData exceptProcessData = new ProcessData();
- ConditionsParameters conditionsParameters3 = new ConditionsParameters();
- TaskNode taskNode3 = new TaskNode();
- TaskNode taskNode33 = new TaskNode();
- TaskNode taskNode333 = new TaskNode();
- ArrayList<TaskNode> tasks3 = new ArrayList<>();
- ArrayList<String> successNode3 = new ArrayList<>();
- ArrayList<String> faildNode3 = new ArrayList<>();
-
- taskNode3.setName("bbbchange");
- taskNode3.setType("SHELL");
- taskNode3.setId("222");
-
- taskNode33.setName("vv");
- taskNode33.setType("CONDITIONS");
- taskNode33.setId("444");
- successNode3.add("bbbchange");
- faildNode3.add("ccc");
-
- taskNode333.setName("ccc");
- taskNode333.setType("SHELL");
- taskNode333.setId("333");
-
- conditionsParameters3.setSuccessNode(successNode3);
- conditionsParameters3.setFailedNode(faildNode3);
- taskNode33.setConditionResult(conditionsParameters3.getConditionResult());
- tasks3.add(taskNode3);
- tasks3.add(taskNode33);
- tasks3.add(taskNode333);
- exceptProcessData.setTasks(tasks3);
-
- String expect = JSONUtils.toJsonString(exceptProcessData);
- String oldJson = JSONUtils.toJsonString(oldProcessData);
-
- Assert.assertEquals(expect, processService.changeJson(newProcessData, oldJson));
-
- }
}