You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/12/26 11:07:34 UTC

[GitHub] [dolphinscheduler] ruanwenjun commented on a change in pull request #7613: [Fix-7538] [server] Fix when there is a forbidden node in dag, the execution flow is abnormal

ruanwenjun commented on a change in pull request #7613:
URL: https://github.com/apache/dolphinscheduler/pull/7613#discussion_r775226946



##########
File path: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
##########
@@ -1083,34 +1083,51 @@ private DependResult isTaskDepsComplete(String taskCode) {
             return DependResult.SUCCESS;
         }
         TaskNode taskNode = dag.getNode(taskCode);
-        List<String> depCodeList = taskNode.getDepList();
-        for (String depsNode : depCodeList) {
-            if (!dag.containsNode(depsNode)
-                    || forbiddenTaskMap.containsKey(depsNode)
-                    || skipTaskNodeMap.containsKey(depsNode)) {
-                continue;
-            }
-            // dependencies must be fully completed
-            if (!completeTaskMap.containsKey(depsNode)) {
-                return DependResult.WAITING;
-            }
-            Integer depsTaskId = completeTaskMap.get(depsNode);
-            ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
-            if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
-                return DependResult.NON_EXEC;
-            }
-            // ignore task state if current task is condition
-            if (taskNode.isConditionsTask()) {
-                continue;
-            }
-            if (!dependTaskSuccess(depsNode, taskCode)) {
-                return DependResult.FAILED;
+        List<String> indirectDepCodeList = new ArrayList<>();
+        getIndirectDepList(taskCode, indirectDepCodeList);
+        for (String depsNode : indirectDepCodeList) {
+            if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) {
+                // dependencies must be fully completed
+                if (!completeTaskMap.containsKey(depsNode)) {
+                    return DependResult.WAITING;
+                }
+                Integer depsTaskId = completeTaskMap.get(depsNode);
+                ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
+                if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
+                    return DependResult.NON_EXEC;
+                }
+                // ignore task state if current task is condition
+                if (taskNode.isConditionsTask()) {
+                    continue;
+                }
+                if (!dependTaskSuccess(depsNode, taskCode)) {
+                    return DependResult.FAILED;
+                }
             }
         }
         logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
         return DependResult.SUCCESS;
     }
 
+    /**
+     * This function is specially used to handle the dependency situation where the parent node is a prohibited node.
+     * When the parent node is a forbidden node, the dependency relationship should continue to be traced
+     *
+     * @param taskCode            taskCode
+     * @param indirectDepCodeList All indirectly dependent nodes
+     */
+    private void getIndirectDepList(String taskCode, List<String> indirectDepCodeList) {

Review comment:
       Suggest add a return result here, get method should not carry side effects.

##########
File path: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
##########
@@ -1083,34 +1083,51 @@ private DependResult isTaskDepsComplete(String taskCode) {
             return DependResult.SUCCESS;
         }
         TaskNode taskNode = dag.getNode(taskCode);
-        List<String> depCodeList = taskNode.getDepList();
-        for (String depsNode : depCodeList) {
-            if (!dag.containsNode(depsNode)
-                    || forbiddenTaskMap.containsKey(depsNode)
-                    || skipTaskNodeMap.containsKey(depsNode)) {
-                continue;
-            }
-            // dependencies must be fully completed
-            if (!completeTaskMap.containsKey(depsNode)) {
-                return DependResult.WAITING;
-            }
-            Integer depsTaskId = completeTaskMap.get(depsNode);
-            ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
-            if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
-                return DependResult.NON_EXEC;
-            }
-            // ignore task state if current task is condition
-            if (taskNode.isConditionsTask()) {
-                continue;
-            }
-            if (!dependTaskSuccess(depsNode, taskCode)) {
-                return DependResult.FAILED;
+        List<String> indirectDepCodeList = new ArrayList<>();
+        getIndirectDepList(taskCode, indirectDepCodeList);
+        for (String depsNode : indirectDepCodeList) {
+            if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) {
+                // dependencies must be fully completed
+                if (!completeTaskMap.containsKey(depsNode)) {
+                    return DependResult.WAITING;
+                }
+                Integer depsTaskId = completeTaskMap.get(depsNode);
+                ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
+                if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
+                    return DependResult.NON_EXEC;
+                }
+                // ignore task state if current task is condition
+                if (taskNode.isConditionsTask()) {
+                    continue;
+                }
+                if (!dependTaskSuccess(depsNode, taskCode)) {
+                    return DependResult.FAILED;
+                }
             }
         }
         logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
         return DependResult.SUCCESS;
     }
 
+    /**
+     * This function is specially used to handle the dependency situation where the parent node is a prohibited node.
+     * When the parent node is a forbidden node, the dependency relationship should continue to be traced
+     *
+     * @param taskCode            taskCode
+     * @param indirectDepCodeList All indirectly dependent nodes
+     */
+    private void getIndirectDepList(String taskCode, List<String> indirectDepCodeList) {
+        TaskNode taskNode = dag.getNode(taskCode);
+        List<String> depCodeList = taskNode.getDepList();
+        for (String depsNode : depCodeList) {
+            if (forbiddenTaskMap.containsKey(depsNode)) {

Review comment:
       Does the `skip node` also has this bug? Can we return all the parent of the current node? I think this may not cause  a performance loss.

##########
File path: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
##########
@@ -1083,34 +1083,51 @@ private DependResult isTaskDepsComplete(String taskCode) {
             return DependResult.SUCCESS;
         }
         TaskNode taskNode = dag.getNode(taskCode);
-        List<String> depCodeList = taskNode.getDepList();
-        for (String depsNode : depCodeList) {
-            if (!dag.containsNode(depsNode)
-                    || forbiddenTaskMap.containsKey(depsNode)
-                    || skipTaskNodeMap.containsKey(depsNode)) {
-                continue;
-            }
-            // dependencies must be fully completed
-            if (!completeTaskMap.containsKey(depsNode)) {
-                return DependResult.WAITING;
-            }
-            Integer depsTaskId = completeTaskMap.get(depsNode);
-            ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
-            if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
-                return DependResult.NON_EXEC;
-            }
-            // ignore task state if current task is condition
-            if (taskNode.isConditionsTask()) {
-                continue;
-            }
-            if (!dependTaskSuccess(depsNode, taskCode)) {
-                return DependResult.FAILED;
+        List<String> indirectDepCodeList = new ArrayList<>();
+        getIndirectDepList(taskCode, indirectDepCodeList);
+        for (String depsNode : indirectDepCodeList) {
+            if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) {
+                // dependencies must be fully completed
+                if (!completeTaskMap.containsKey(depsNode)) {
+                    return DependResult.WAITING;
+                }
+                Integer depsTaskId = completeTaskMap.get(depsNode);
+                ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
+                if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
+                    return DependResult.NON_EXEC;
+                }
+                // ignore task state if current task is condition
+                if (taskNode.isConditionsTask()) {
+                    continue;
+                }
+                if (!dependTaskSuccess(depsNode, taskCode)) {
+                    return DependResult.FAILED;
+                }
             }
         }
         logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
         return DependResult.SUCCESS;
     }
 
+    /**
+     * This function is specially used to handle the dependency situation where the parent node is a prohibited node.
+     * When the parent node is a forbidden node, the dependency relationship should continue to be traced
+     *
+     * @param taskCode            taskCode
+     * @param indirectDepCodeList All indirectly dependent nodes
+     */
+    private void getIndirectDepList(String taskCode, List<String> indirectDepCodeList) {

Review comment:
       ```suggestion
       private List<String> getIndirectDepList(String taskCode) {
   
   }
   ``` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org