You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2021/10/08 01:56:00 UTC
[dolphinscheduler] branch dev updated: [Fix-5875][API] When I saved
the task that had the same name task in another flow ,
the service would throw DuplicateKeyException (#6430)
This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new fac6b4a [Fix-5875][API] When I saved the task that had the same name task in another flow ,the service would throw DuplicateKeyException (#6430)
fac6b4a is described below
commit fac6b4afd3d185aeb09c9cb17de1549539b61c3f
Author: Hua Jiang <ji...@163.com>
AuthorDate: Fri Oct 8 09:55:54 2021 +0800
[Fix-5875][API] When I saved the task that had the same name task in another flow ,the service would throw DuplicateKeyException (#6430)
* using the task code in the dag
---
.../service/impl/ProcessDefinitionServiceImpl.java | 22 +--
.../service/impl/ProcessInstanceServiceImpl.java | 2 +-
.../dao/mapper/TaskInstanceMapper.java | 3 +
.../dolphinscheduler/dao/utils/DagHelper.java | 148 ++++++++++++---------
.../dao/mapper/TaskInstanceMapper.xml | 9 ++
.../dao/mapper/TaskInstanceMapperTest.java | 21 +++
.../dolphinscheduler/dao/utils/DagHelperTest.java | 13 ++
.../master/runner/WorkflowExecuteThread.java | 72 +++++-----
.../service/process/ProcessService.java | 2 +-
sql/dolphinscheduler_h2.sql | 3 +-
sql/dolphinscheduler_mysql.sql | 3 +-
sql/dolphinscheduler_postgre.sql | 3 +-
.../1.4.0_schema/mysql/dolphinscheduler_ddl.sql | 22 ++-
.../postgresql/dolphinscheduler_ddl.sql | 17 +++
14 files changed, 220 insertions(+), 120 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index fc2a2ac..a138451 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -1155,18 +1155,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Iterator<Map.Entry<String, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, List<TreeViewDto>> en = iter.next();
- String nodeName = en.getKey();
+ String nodeCode = en.getKey();
parentTreeViewDtoList = en.getValue();
TreeViewDto treeViewDto = new TreeViewDto();
- treeViewDto.setName(nodeName);
- TaskNode taskNode = dag.getNode(nodeName);
+ TaskNode taskNode = dag.getNode(nodeCode);
treeViewDto.setType(taskNode.getType());
treeViewDto.setCode(taskNode.getCode());
+ treeViewDto.setName(taskNode.getName());
//set treeViewDto instances
for (int i = limit - 1; i >= 0; i--) {
ProcessInstance processInstance = processInstanceList.get(i);
- TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName);
+ TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(), Long.parseLong(nodeCode));
if (taskInstance == null) {
treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null"));
} else {
@@ -1188,18 +1188,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
pTreeViewDto.getChildren().add(treeViewDto);
}
- postNodeList = dag.getSubsequentNodes(nodeName);
+ postNodeList = dag.getSubsequentNodes(nodeCode);
if (CollectionUtils.isNotEmpty(postNodeList)) {
- for (String nextNodeName : postNodeList) {
- List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeName);
+ for (String nextNodeCode : postNodeList) {
+ List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeCode);
if (CollectionUtils.isEmpty(treeViewDtoList)) {
treeViewDtoList = new ArrayList<>();
}
treeViewDtoList.add(treeViewDto);
- waitingRunningNodeMap.put(nextNodeName, treeViewDtoList);
+ waitingRunningNodeMap.put(nextNodeCode, treeViewDtoList);
}
}
- runningNodeMap.remove(nodeName);
+ runningNodeMap.remove(nodeCode);
}
if (waitingRunningNodeMap.size() == 0) {
break;
@@ -1224,14 +1224,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
DAG<String, TaskNode, String> graph = new DAG<>();
// Fill the vertices
for (TaskNode taskNodeResponse : taskNodeResponseList) {
- graph.addNode(taskNodeResponse.getName(), taskNodeResponse);
+ graph.addNode(Long.toString(taskNodeResponse.getCode()), taskNodeResponse);
}
// Fill edge relations
for (TaskNode taskNodeResponse : taskNodeResponseList) {
List<String> preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(), String.class);
if (CollectionUtils.isNotEmpty(preTasks)) {
for (String preTask : preTasks) {
- if (!graph.addEdge(preTask, taskNodeResponse.getName())) {
+ if (!graph.addEdge(preTask, Long.toString(taskNodeResponse.getCode()))) {
return true;
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 194a992..3df2060 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -707,7 +707,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
List<Task> taskList = new ArrayList<>();
for (String node : nodeList) {
- TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstanceId, node);
+ TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, Long.parseLong(node));
if (taskInstance == null) {
continue;
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index 898708e..795004d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -51,6 +51,9 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int processInstanceId,
@Param("name") String name);
+ TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
+ @Param("taskCode") Long taskCode);
+
Integer countTask(@Param("projectCodes") Long[] projectCodes,
@Param("taskIds") int[] taskIds);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index de27f17..e692645 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -63,9 +63,9 @@ public class DagHelper {
String preTasks = taskNode.getPreTasks();
List<String> preTaskList = JSONUtils.toList(preTasks, String.class);
if (preTaskList != null) {
- for (String depNodeName : preTaskList) {
- if (null != findNodeByName(taskNodeList, depNodeName)) {
- nodeRelationList.add(new TaskNodeRelation(depNodeName, taskNode.getName()));
+ for (String depNodeCode : preTaskList) {
+ if (null != findNodeByCode(taskNodeList, depNodeCode)) {
+ nodeRelationList.add(new TaskNodeRelation(depNodeCode, Long.toString(taskNode.getCode())));
}
}
}
@@ -78,12 +78,12 @@ public class DagHelper {
*
* @param taskNodeList taskNodeList
* @param startNodeNameList startNodeNameList
- * @param recoveryNodeNameList recoveryNodeNameList
+ * @param recoveryNodeCodeList recoveryNodeCodeList
* @param taskDependType taskDependType
* @return task node list
*/
public static List<TaskNode> generateFlowNodeListByStartNode(List<TaskNode> taskNodeList, List<String> startNodeNameList,
- List<String> recoveryNodeNameList, TaskDependType taskDependType) {
+ List<String> recoveryNodeCodeList, TaskDependType taskDependType) {
List<TaskNode> destFlowNodeList = new ArrayList<>();
List<String> startNodeList = startNodeNameList;
@@ -92,32 +92,34 @@ public class DagHelper {
logger.error("start node list is empty! cannot continue run the process ");
return destFlowNodeList;
}
+
List<TaskNode> destTaskNodeList = new ArrayList<>();
List<TaskNode> tmpTaskNodeList = new ArrayList<>();
+
if (taskDependType == TaskDependType.TASK_POST
- && CollectionUtils.isNotEmpty(recoveryNodeNameList)) {
- startNodeList = recoveryNodeNameList;
+ && CollectionUtils.isNotEmpty(recoveryNodeCodeList)) {
+ startNodeList = recoveryNodeCodeList;
}
if (CollectionUtils.isEmpty(startNodeList)) {
// no special designation start nodes
tmpTaskNodeList = taskNodeList;
} else {
// specified start nodes or resume execution
- for (String startNodeName : startNodeList) {
- TaskNode startNode = findNodeByName(taskNodeList, startNodeName);
+ for (String startNodeCode : startNodeList) {
+ TaskNode startNode = findNodeByCode(taskNodeList, startNodeCode);
List<TaskNode> childNodeList = new ArrayList<>();
if (startNode == null) {
logger.error("start node name [{}] is not in task node list [{}] ",
- startNodeName,
+ startNodeCode,
taskNodeList
);
continue;
} else if (TaskDependType.TASK_POST == taskDependType) {
- List<String> visitedNodeNameList = new ArrayList<>();
- childNodeList = getFlowNodeListPost(startNode, taskNodeList, visitedNodeNameList);
+ List<String> visitedNodeCodeList = new ArrayList<>();
+ childNodeList = getFlowNodeListPost(startNode, taskNodeList, visitedNodeCodeList);
} else if (TaskDependType.TASK_PRE == taskDependType) {
- List<String> visitedNodeNameList = new ArrayList<>();
- childNodeList = getFlowNodeListPre(startNode, recoveryNodeNameList, taskNodeList, visitedNodeNameList);
+ List<String> visitedNodeCodeList = new ArrayList<>();
+ childNodeList = getFlowNodeListPre(startNode, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList);
} else {
childNodeList.add(startNode);
}
@@ -126,7 +128,7 @@ public class DagHelper {
}
for (TaskNode taskNode : tmpTaskNodeList) {
- if (null == findNodeByName(destTaskNodeList, taskNode.getName())) {
+ if (null == findNodeByCode(destTaskNodeList, Long.toString(taskNode.getCode()))) {
destTaskNodeList.add(taskNode);
}
}
@@ -141,17 +143,17 @@ public class DagHelper {
* @param taskNodeList taskNodeList
* @return task node list
*/
- private static List<TaskNode> getFlowNodeListPost(TaskNode startNode, List<TaskNode> taskNodeList, List<String> visitedNodeNameList) {
+ private static List<TaskNode> getFlowNodeListPost(TaskNode startNode, List<TaskNode> taskNodeList, List<String> visitedNodeCodeList) {
List<TaskNode> resultList = new ArrayList<>();
for (TaskNode taskNode : taskNodeList) {
List<String> depList = taskNode.getDepList();
- if (null != depList && null != startNode && depList.contains(startNode.getName()) && !visitedNodeNameList.contains(taskNode.getName())) {
- resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList, visitedNodeNameList));
+ if (null != depList && null != startNode && depList.contains(Long.toString(startNode.getCode())) && !visitedNodeCodeList.contains(Long.toString(taskNode.getCode()))) {
+ resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList, visitedNodeCodeList));
}
}
// why add (startNode != null) condition? for SonarCloud Quality Gate passed
if (null != startNode) {
- visitedNodeNameList.add(startNode.getName());
+ visitedNodeCodeList.add(Long.toString(startNode.getCode()));
}
resultList.add(startNode);
@@ -163,11 +165,11 @@ public class DagHelper {
* find all nodes that start nodes depend on.
*
* @param startNode startNode
- * @param recoveryNodeNameList recoveryNodeNameList
+ * @param recoveryNodeCodeList recoveryNodeCodeList
* @param taskNodeList taskNodeList
* @return task node list
*/
- private static List<TaskNode> getFlowNodeListPre(TaskNode startNode, List<String> recoveryNodeNameList, List<TaskNode> taskNodeList, List<String> visitedNodeNameList) {
+ private static List<TaskNode> getFlowNodeListPre(TaskNode startNode, List<String> recoveryNodeCodeList, List<TaskNode> taskNodeList, List<String> visitedNodeCodeList) {
List<TaskNode> resultList = new ArrayList<>();
@@ -179,17 +181,17 @@ public class DagHelper {
if (CollectionUtils.isEmpty(depList)) {
return resultList;
}
- for (String depNodeName : depList) {
- TaskNode start = findNodeByName(taskNodeList, depNodeName);
- if (recoveryNodeNameList.contains(depNodeName)) {
+ for (String depNodeCode : depList) {
+ TaskNode start = findNodeByCode(taskNodeList, depNodeCode);
+ if (recoveryNodeCodeList.contains(depNodeCode)) {
resultList.add(start);
- } else if (!visitedNodeNameList.contains(depNodeName)) {
- resultList.addAll(getFlowNodeListPre(start, recoveryNodeNameList, taskNodeList, visitedNodeNameList));
+ } else if (!visitedNodeCodeList.contains(depNodeCode)) {
+ resultList.addAll(getFlowNodeListPre(start, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList));
}
}
// why add (startNode != null) condition? for SonarCloud Quality Gate passed
if (null != startNode) {
- visitedNodeNameList.add(startNode.getName());
+ visitedNodeCodeList.add(Long.toString(startNode.getCode()));
}
return resultList;
}
@@ -199,17 +201,17 @@ public class DagHelper {
*
* @param totalTaskNodeList totalTaskNodeList
* @param startNodeNameList startNodeNameList
- * @param recoveryNodeNameList recoveryNodeNameList
+ * @param recoveryNodeCodeList recoveryNodeCodeList
* @param depNodeType depNodeType
* @return process dag
* @throws Exception if error throws Exception
*/
public static ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
List<String> startNodeNameList,
- List<String> recoveryNodeNameList,
+ List<String> recoveryNodeCodeList,
TaskDependType depNodeType) throws Exception {
- List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
+ List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
if (destTaskNodeList.isEmpty()) {
return null;
}
@@ -237,6 +239,22 @@ public class DagHelper {
}
/**
+ * find node by node code
+ *
+ * @param nodeDetails nodeDetails
+ * @param nodeCode nodeCode
+ * @return task node
+ */
+ public static TaskNode findNodeByCode(List<TaskNode> nodeDetails, String nodeCode) {
+ for (TaskNode taskNode : nodeDetails) {
+ if (Long.toString(taskNode.getCode()).equals(nodeCode)) {
+ return taskNode;
+ }
+ }
+ return null;
+ }
+
+ /**
* the task can be submit when all the depends nodes are forbidden or complete
*
* @param taskNode taskNode
@@ -252,11 +270,11 @@ public class DagHelper {
if (dependList == null) {
return true;
}
- for (String dependNodeName : dependList) {
- TaskNode dependNode = dag.getNode(dependNodeName);
- if (dependNode == null || completeTaskList.containsKey(dependNodeName)
+ for (String dependNodeCode : dependList) {
+ TaskNode dependNode = dag.getNode(dependNodeCode);
+ if (dependNode == null || completeTaskList.containsKey(dependNodeCode)
|| dependNode.isForbidden()
- || skipTaskNodeList.containsKey(dependNodeName)) {
+ || skipTaskNodeList.containsKey(dependNodeCode)) {
continue;
} else {
return false;
@@ -272,22 +290,23 @@ public class DagHelper {
*
* @return successor nodes
*/
- public static Set<String> parsePostNodes(String preNodeName,
+ public static Set<String> parsePostNodes(String preNodeCode,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
Set<String> postNodeList = new HashSet<>();
Collection<String> startVertexes = new ArrayList<>();
- if (preNodeName == null) {
+
+ if (preNodeCode == null) {
startVertexes = dag.getBeginNode();
- } else if (dag.getNode(preNodeName).isConditionsTask()) {
- List<String> conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
+ } else if (dag.getNode(preNodeCode).isConditionsTask()) {
+ List<String> conditionTaskList = parseConditionTask(preNodeCode, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
- } else if (dag.getNode(preNodeName).isSwitchTask()) {
- List<String> conditionTaskList = parseSwitchTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
+ } else if (dag.getNode(preNodeCode).isSwitchTask()) {
+ List<String> conditionTaskList = parseSwitchTask(preNodeCode, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
} else {
- startVertexes = dag.getSubsequentNodes(preNodeName);
+ startVertexes = dag.getSubsequentNodes(preNodeCode);
}
for (String subsequent : startVertexes) {
TaskNode taskNode = dag.getNode(subsequent);
@@ -329,19 +348,19 @@ public class DagHelper {
* parse condition task find the branch process
* set skip flag for another one.
*/
- public static List<String> parseConditionTask(String nodeName,
+ public static List<String> parseConditionTask(String nodeCode,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
List<String> conditionTaskList = new ArrayList<>();
- TaskNode taskNode = dag.getNode(nodeName);
+ TaskNode taskNode = dag.getNode(nodeCode);
if (!taskNode.isConditionsTask()) {
return conditionTaskList;
}
- if (!completeTaskList.containsKey(nodeName)) {
+ if (!completeTaskList.containsKey(nodeCode)) {
return conditionTaskList;
}
- TaskInstance taskInstance = completeTaskList.get(nodeName);
+ TaskInstance taskInstance = completeTaskList.get(nodeCode);
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
List<String> skipNodeList = new ArrayList<>();
@@ -352,7 +371,7 @@ public class DagHelper {
conditionTaskList = conditionsParameters.getFailedNode();
skipNodeList = conditionsParameters.getSuccessNode();
} else {
- conditionTaskList.add(nodeName);
+ conditionTaskList.add(nodeCode);
}
for (String failedNode : skipNodeList) {
setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList);
@@ -364,19 +383,19 @@ public class DagHelper {
* parse condition task find the branch process
* set skip flag for another one.
*
- * @param nodeName
+ * @param nodeCode
* @return
*/
- public static List<String> parseSwitchTask(String nodeName,
+ public static List<String> parseSwitchTask(String nodeCode,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
List<String> conditionTaskList = new ArrayList<>();
- TaskNode taskNode = dag.getNode(nodeName);
+ TaskNode taskNode = dag.getNode(nodeCode);
if (!taskNode.isSwitchTask()) {
return conditionTaskList;
}
- if (!completeTaskList.containsKey(nodeName)) {
+ if (!completeTaskList.containsKey(nodeCode)) {
return conditionTaskList;
}
conditionTaskList = skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag);
@@ -386,6 +405,7 @@ public class DagHelper {
private static List<String> skipTaskNode4Switch(TaskNode taskNode, Map<String, TaskNode> skipTaskNodeList,
Map<String, TaskInstance> completeTaskList,
DAG<String, TaskNode, TaskNodeRelation> dag) {
+
SwitchParameters switchParameters = completeTaskList.get(taskNode.getName()).getSwitchDependency();
int resultConditionLocation = switchParameters.getResultConditionLocation();
List<SwitchResultVo> conditionResultVoList = switchParameters.getDependTaskList();
@@ -406,15 +426,15 @@ public class DagHelper {
/**
* set task node and the post nodes skip flag
*/
- private static void setTaskNodeSkip(String skipNodeName,
+ private static void setTaskNodeSkip(String skipNodeCode,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList,
Map<String, TaskNode> skipTaskNodeList) {
- if (!dag.containsNode(skipNodeName)) {
+ if (!dag.containsNode(skipNodeCode)) {
return;
}
- skipTaskNodeList.putIfAbsent(skipNodeName, dag.getNode(skipNodeName));
- Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeName);
+ skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode));
+ Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeCode);
for (String post : postNodeList) {
TaskNode postNode = dag.getNode(post);
if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) {
@@ -436,7 +456,7 @@ public class DagHelper {
//add vertex
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
- dag.addNode(node.getName(), node);
+ dag.addNode(Long.toString(node.getCode()), node);
}
}
@@ -466,7 +486,7 @@ public class DagHelper {
// If the dependency is not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
- taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
+ taskNodeRelations.add(new TaskNodeRelation(depNode, Long.toString(taskNode.getCode())));
}
}
}
@@ -500,7 +520,7 @@ public class DagHelper {
&& taskNodeMap.containsKey(preTaskCode) && taskNodeMap.containsKey(postTaskCode)) {
TaskNode preNode = taskNodeMap.get(preTaskCode);
TaskNode postNode = taskNodeMap.get(postTaskCode);
- taskNodeRelations.add(new TaskNodeRelation(preNode.getName(), postNode.getName()));
+ taskNodeRelations.add(new TaskNodeRelation(Long.toString(preNode.getCode()), Long.toString(postNode.getCode())));
}
}
ProcessDag processDag = new ProcessDag();
@@ -512,18 +532,18 @@ public class DagHelper {
/**
* is there have conditions after the parent node
*/
- public static boolean haveConditionsAfterNode(String parentNodeName,
+ public static boolean haveConditionsAfterNode(String parentNodeCode,
DAG<String, TaskNode, TaskNodeRelation> dag
) {
boolean result = false;
- Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeName);
+ Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
if (CollectionUtils.isEmpty(subsequentNodes)) {
return result;
}
- for (String nodeName : subsequentNodes) {
- TaskNode taskNode = dag.getNode(nodeName);
+ for (String nodeCode : subsequentNodes) {
+ TaskNode taskNode = dag.getNode(nodeCode);
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
- if (preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()) {
+ if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) {
return true;
}
}
@@ -533,13 +553,13 @@ public class DagHelper {
/**
* is there have conditions after the parent node
*/
- public static boolean haveConditionsAfterNode(String parentNodeName, List<TaskNode> taskNodes) {
+ public static boolean haveConditionsAfterNode(String parentNodeCode, List<TaskNode> taskNodes) {
if (CollectionUtils.isEmpty(taskNodes)) {
return false;
}
for (TaskNode taskNode : taskNodes) {
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
- if (preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()) {
+ if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) {
return true;
}
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 0964d1c..f41b58a 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -95,6 +95,15 @@
and flag = 1
limit 1
</select>
+ <select id="queryByInstanceIdAndCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_task_instance
+ where process_instance_id = #{processInstanceId}
+ and task_code = #{taskCode}
+ and flag = 1
+ limit 1
+ </select>
<select id="countTask" resultType="java.lang.Integer">
select count(1) as count
from t_ds_task_instance task,t_ds_task_definition_log define
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
index 003f8a9..1f89382 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
@@ -277,6 +277,27 @@ public class TaskInstanceMapperTest {
}
/**
+ * test query by task instance id and code
+ */
+ @Test
+ public void testQueryByInstanceIdAndCode() {
+ // insert ProcessInstance
+ ProcessInstance processInstance = insertProcessInstance();
+
+ // insert taskInstance
+ TaskInstance task = insertTaskInstance(processInstance.getId());
+ task.setHost("111.111.11.11");
+ taskInstanceMapper.updateById(task);
+
+ TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(
+ task.getProcessInstanceId(),
+ task.getTaskCode()
+ );
+ taskInstanceMapper.deleteById(task.getId());
+ Assert.assertNotEquals(taskInstance, null);
+ }
+
+ /**
* test count task instance
*/
@Test
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
index 18c17fe..8df92dd 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
@@ -291,12 +291,14 @@ public class DagHelperTest {
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
+ node1.setCode(1);
node1.setType(TaskType.SHELL.getDesc());
taskNodeList.add(node1);
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
+ node2.setCode(2);
node2.setType(TaskType.SHELL.getDesc());
List<String> dep2 = new ArrayList<>();
dep2.add("1");
@@ -306,12 +308,14 @@ public class DagHelperTest {
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
+ node4.setCode(4);
node4.setType(TaskType.SHELL.getDesc());
taskNodeList.add(node4);
TaskNode node3 = new TaskNode();
node3.setId("3");
node3.setName("3");
+ node3.setCode(3);
node3.setType(TaskType.SHELL.getDesc());
List<String> dep3 = new ArrayList<>();
dep3.add("2");
@@ -322,6 +326,7 @@ public class DagHelperTest {
TaskNode node5 = new TaskNode();
node5.setId("5");
node5.setName("5");
+ node5.setCode(5);
node5.setType(TaskType.SHELL.getDesc());
List<String> dep5 = new ArrayList<>();
dep5.add("3");
@@ -332,6 +337,7 @@ public class DagHelperTest {
TaskNode node6 = new TaskNode();
node6.setId("6");
node6.setName("6");
+ node6.setCode(6);
node6.setType(TaskType.SHELL.getDesc());
List<String> dep6 = new ArrayList<>();
dep6.add("3");
@@ -341,6 +347,7 @@ public class DagHelperTest {
TaskNode node7 = new TaskNode();
node7.setId("7");
node7.setName("7");
+ node7.setCode(7);
node7.setType(TaskType.SHELL.getDesc());
List<String> dep7 = new ArrayList<>();
dep7.add("5");
@@ -350,6 +357,7 @@ public class DagHelperTest {
TaskNode node8 = new TaskNode();
node8.setId("8");
node8.setName("8");
+ node8.setCode(8);
node8.setType(TaskType.SHELL.getDesc());
List<String> dep8 = new ArrayList<>();
dep8.add("2");
@@ -381,12 +389,14 @@ public class DagHelperTest {
TaskNode node = new TaskNode();
node.setId("0");
node.setName("0");
+ node.setCode(0);
node.setType("SHELL");
taskNodeList.add(node);
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
+ node1.setCode(1);
node1.setType("switch");
node1.setDependence(JSONUtils.toJsonString(getSwitchNode()));
taskNodeList.add(node1);
@@ -394,6 +404,7 @@ public class DagHelperTest {
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
+ node2.setCode(2);
node2.setType("SHELL");
List<String> dep2 = new ArrayList<>();
dep2.add("1");
@@ -403,6 +414,7 @@ public class DagHelperTest {
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
+ node4.setCode(4);
node4.setType("SHELL");
List<String> dep4 = new ArrayList<>();
dep4.add("1");
@@ -412,6 +424,7 @@ public class DagHelperTest {
TaskNode node5 = new TaskNode();
node5.setId("4");
node5.setName("4");
+ node5.setCode(4);
node5.setType("SHELL");
List<String> dep5 = new ArrayList<>();
dep5.add("1");
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index eae6abe..7ed2679 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -375,19 +375,19 @@ public class WorkflowExecuteThread implements Runnable {
return;
}
ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId());
- completeTaskList.put(task.getName(), task);
+ completeTaskList.put(Long.toString(task.getTaskCode()), task);
activeTaskProcessorMaps.remove(task.getId());
taskTimeoutCheckList.remove(task.getId());
if (task.getState().typeIsSuccess()) {
processInstance.setVarPool(task.getVarPool());
processService.saveProcessInstance(processInstance);
- submitPostNode(task.getName());
+ submitPostNode(Long.toString(task.getTaskCode()));
} else if (task.getState().typeIsFailure()) {
if (task.isConditionsTask()
- || DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
- submitPostNode(task.getName());
+ || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
+ submitPostNode(Long.toString(task.getTaskCode()));
} else {
- errorTaskList.put(task.getName(), task);
+ errorTaskList.put(Long.toString(task.getTaskCode()), task);
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
@@ -522,16 +522,18 @@ public class WorkflowExecuteThread implements Runnable {
List<TaskNode> taskNodeList =
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
forbiddenTaskList.clear();
+
taskNodeList.forEach(taskNode -> {
if (taskNode.isForbidden()) {
- forbiddenTaskList.put(taskNode.getName(), taskNode);
+ forbiddenTaskList.put(Long.toString(taskNode.getCode()), taskNode);
}
});
+
// generate process to get DAG info
- List<String> recoveryNameList = getRecoveryNodeNameList();
+ List<String> recoveryNodeCodeList = getRecoveryNodeCodeList();
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(taskNodeList,
- startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
+ startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
if (processDag == null) {
logger.error("processDag is null");
return;
@@ -553,13 +555,13 @@ public class WorkflowExecuteThread implements Runnable {
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : taskInstanceList) {
if (task.isTaskComplete()) {
- completeTaskList.put(task.getName(), task);
+ completeTaskList.put(Long.toString(task.getTaskCode()), task);
}
- if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
+ if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
continue;
}
if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
- errorTaskList.put(task.getName(), task);
+ errorTaskList.put(Long.toString(task.getTaskCode()), task);
}
}
@@ -806,8 +808,8 @@ public class WorkflowExecuteThread implements Runnable {
}
}
- private void submitPostNode(String parentNodeName) {
- Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
+ private void submitPostNode(String parentNodeCode) {
+ Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeList, dag, completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) {
TaskNode taskNodeObject = dag.getNode(taskNode);
@@ -825,7 +827,7 @@ public class WorkflowExecuteThread implements Runnable {
continue;
}
- if (completeTaskList.containsKey(task.getName())) {
+ if (completeTaskList.containsKey(Long.toString(task.getTaskCode()))) {
logger.info("task {} has already run success", task.getName());
continue;
}
@@ -844,16 +846,16 @@ public class WorkflowExecuteThread implements Runnable {
*
* @return DependResult
*/
- private DependResult isTaskDepsComplete(String taskName) {
+ private DependResult isTaskDepsComplete(String taskCode) {
Collection<String> startNodes = dag.getBeginNode();
// if vertex,returns true directly
- if (startNodes.contains(taskName)) {
+ if (startNodes.contains(taskCode)) {
return DependResult.SUCCESS;
}
- TaskNode taskNode = dag.getNode(taskName);
- List<String> depNameList = taskNode.getDepList();
- for (String depsNode : depNameList) {
+ TaskNode taskNode = dag.getNode(taskCode);
+ List<String> depCodeList = taskNode.getDepList();
+ for (String depsNode : depCodeList) {
if (!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode)
|| skipTaskNodeList.containsKey(depsNode)) {
@@ -871,11 +873,11 @@ public class WorkflowExecuteThread implements Runnable {
if (taskNode.isConditionsTask()) {
continue;
}
- if (!dependTaskSuccess(depsNode, taskName)) {
+ if (!dependTaskSuccess(depsNode, taskCode)) {
return DependResult.FAILED;
}
}
- logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray()));
+ logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskList.keySet().toArray()));
return DependResult.SUCCESS;
}
@@ -1109,7 +1111,7 @@ public class WorkflowExecuteThread implements Runnable {
* @return DependResult
*/
private DependResult getDependResultForTask(TaskInstance taskInstance) {
- return isTaskDepsComplete(taskInstance.getName());
+ return isTaskDepsComplete(Long.toString(taskInstance.getTaskCode()));
}
/**
@@ -1228,15 +1230,15 @@ public class WorkflowExecuteThread implements Runnable {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
- completeTaskList.put(task.getName(), task);
- submitPostNode(task.getName());
+ completeTaskList.put(Long.toString(task.getTaskCode()), task);
+ submitPostNode(Long.toString(task.getTaskCode()));
continue;
}
}
//init varPool only this task is the first time running
if (task.isFirstRun()) {
//get pre task ,get all the task varPool to this task
- Set<String> preTask = dag.getPreviousNodes(task.getName());
+ Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
getPreVarPool(task, preTask);
}
DependResult dependResult = getDependResultForTask(task);
@@ -1251,7 +1253,7 @@ public class WorkflowExecuteThread implements Runnable {
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
- dependFailedTask.put(task.getName(), task);
+ dependFailedTask.put(Long.toString(task.getTaskCode()), task);
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
@@ -1331,19 +1333,19 @@ public class WorkflowExecuteThread implements Runnable {
}
/**
- * generate start node name list from parsing command param;
+ * generate start node code list from parsing command param;
* if "StartNodeIdList" exists in command param, return StartNodeIdList
*
- * @return recovery node name list
+ * @return recovery node code list
*/
- private List<String> getRecoveryNodeNameList() {
- List<String> recoveryNodeNameList = new ArrayList<>();
+ private List<String> getRecoveryNodeCodeList() {
+ List<String> recoveryNodeCodeList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(recoverNodeIdList)) {
for (TaskInstance task : recoverNodeIdList) {
- recoveryNodeNameList.add(task.getName());
+ recoveryNodeCodeList.add(Long.toString(task.getTaskCode()));
}
}
- return recoveryNodeNameList;
+ return recoveryNodeCodeList;
}
/**
@@ -1351,15 +1353,15 @@ public class WorkflowExecuteThread implements Runnable {
*
* @param totalTaskNodeList total task node list
* @param startNodeNameList start node name list
- * @param recoveryNodeNameList recovery node name list
+ * @param recoveryNodeCodeList recovery node code list
* @param depNodeType depend node type
* @return ProcessDag process dag
* @throws Exception exception
*/
public ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
List<String> startNodeNameList,
- List<String> recoveryNodeNameList,
+ List<String> recoveryNodeCodeList,
TaskDependType depNodeType) throws Exception {
- return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
+ return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 4f5058c..6821967 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2498,7 +2498,7 @@ public class ProcessService {
taskDefinitionLog.getTimeoutNotifyStrategy(),
taskDefinitionLog.getTimeout())));
taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
- taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getName).collect(Collectors.toList())));
+ taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
taskNodeList.add(taskNode);
}
}
diff --git a/sql/dolphinscheduler_h2.sql b/sql/dolphinscheduler_h2.sql
index ffa3a3a..934c141 100644
--- a/sql/dolphinscheduler_h2.sql
+++ b/sql/dolphinscheduler_h2.sql
@@ -472,8 +472,7 @@ CREATE TABLE t_ds_task_definition
resource_ids varchar(255) DEFAULT NULL,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
- PRIMARY KEY (id, code),
- UNIQUE KEY task_unique (name,project_code) USING BTREE
+ PRIMARY KEY (id, code)
);
-- ----------------------------
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 8aa1519..82f41a2 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -473,8 +473,7 @@ CREATE TABLE `t_ds_task_definition` (
`resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
- PRIMARY KEY (`id`,`code`),
- UNIQUE KEY `task_unique` (`name`,`project_code`) USING BTREE
+ PRIMARY KEY (`id`,`code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 27f5259..40a3535 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -382,8 +382,7 @@ CREATE TABLE t_ds_task_definition (
resource_ids varchar(255) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
- PRIMARY KEY (id) ,
- CONSTRAINT task_definition_unique UNIQUE (name, project_code)
+ PRIMARY KEY (id)
) ;
create index task_definition_index on t_ds_task_definition (project_code,id);
diff --git a/sql/upgrade/1.4.0_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.4.0_schema/mysql/dolphinscheduler_ddl.sql
index 1dc9097..5034ffe 100644
--- a/sql/upgrade/1.4.0_schema/mysql/dolphinscheduler_ddl.sql
+++ b/sql/upgrade/1.4.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -337,6 +337,25 @@ delimiter ;
CALL uc_dolphin_T_t_ds_schedules_A_add_timezone();
DROP PROCEDURE uc_dolphin_T_t_ds_schedules_A_add_timezone;
+-- uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName()
+BEGIN
+ IF EXISTS (SELECT 1 FROM information_schema.STATISTICS
+ WHERE TABLE_NAME='t_ds_task_definition'
+ AND TABLE_SCHEMA=(SELECT DATABASE())
+ AND INDEX_NAME ='task_unique')
+ ALTER TABLE t_ds_task_definition drop INDEX `task_unique`;
+ END IF;
+END;
+
+d//
+
+delimiter ;
+CALL uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName();
+DROP PROCEDURE uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName;
+
-- ----------------------------
-- Table structure for t_ds_environment
-- ----------------------------
@@ -382,8 +401,7 @@ CREATE TABLE `t_ds_task_definition` (
`resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
- PRIMARY KEY (`id`,`code`),
- UNIQUE KEY `task_unique` (`name`,`project_code`) USING BTREE
+ PRIMARY KEY (`id`,`code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
diff --git a/sql/upgrade/1.4.0_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.4.0_schema/postgresql/dolphinscheduler_ddl.sql
index ce9a3f2..4296e2b 100644
--- a/sql/upgrade/1.4.0_schema/postgresql/dolphinscheduler_ddl.sql
+++ b/sql/upgrade/1.4.0_schema/postgresql/dolphinscheduler_ddl.sql
@@ -379,6 +379,23 @@ CREATE TABLE t_ds_environment_worker_group_relation (
CONSTRAINT environment_worker_group_unique UNIQUE (environment_code,worker_group)
);
+-- uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName
+delimiter d//
+CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName() RETURNS void AS $$
+BEGIN
+ IF EXISTS (SELECT 1 FROM pg_stat_all_indexes
+ WHERE relname='t_ds_task_definition'
+ AND indexrelname ='task_definition_unique')
+ ALTER TABLE t_ds_task_definition drop CONSTRAINT task_definition_unique;
+ END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
+
+delimiter ;
+SELECT uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName();
+DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName();
+
-- ----------------------------
-- These columns will not be used in the new version,if you determine that the historical data is useless, you can delete it using the sql below
-- ----------------------------