You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/15 02:39:06 UTC

[dolphinscheduler] branch 3.1.0-prepare updated: [Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838 (#11864) (#11950)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.0-prepare by this push:
     new 27b69e608a [Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838 (#11864) (#11950)
27b69e608a is described below

commit 27b69e608a6e4dd43483922d53afea60352a7b06
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Thu Sep 15 10:39:00 2022 +0800

    [Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838 (#11864) (#11950)
    
    * [Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838
    
    Co-authored-by: Yann Ann <29...@qq.com>
---
 .../master/event/TaskTimeoutStateEventHandler.java | 37 +++++++++++++++-------
 .../master/runner/WorkflowExecuteRunnable.java     |  1 -
 2 files changed, 25 insertions(+), 13 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
index 22f3f9dd8f..1678594323 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -28,35 +28,48 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
 
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoService(StateEventHandler.class)
 public class TaskTimeoutStateEventHandler implements StateEventHandler {
 
+    private static final Logger logger = LoggerFactory.getLogger(TaskTimeoutStateEventHandler.class);
+
     @Override
     public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
-                                    StateEvent stateEvent) throws StateEventHandleError {
+        StateEvent stateEvent) throws StateEventHandleError {
         TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;
 
         TaskMetrics.incTaskInstanceByState("timeout");
         workflowExecuteRunnable.checkTaskInstanceByStateEvent(taskStateEvent);
 
         TaskInstance taskInstance =
-                workflowExecuteRunnable.getTaskInstance(taskStateEvent.getTaskInstanceId()).orElseThrow(
-                        () -> new StateEventHandleError(String.format(
-                                "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s",
-                                taskStateEvent.getTaskInstanceId())));
+            workflowExecuteRunnable.getTaskInstance(taskStateEvent.getTaskInstanceId()).orElseThrow(
+                () -> new StateEventHandleError(String.format(
+                    "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s",
+                    taskStateEvent.getTaskInstanceId())));
 
         if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
             return true;
         }
-        TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
-        Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
-        if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
-                || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
-            ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
-            taskProcessor.action(TaskAction.TIMEOUT);
+        TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine()
+            .getTimeoutNotifyStrategy();
+        Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable
+            .getActiveTaskProcessMap();
+        if ((TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
+            || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy)) {
+            if (activeTaskProcessMap.containsKey(taskInstance.getTaskCode())) {
+                ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
+                taskProcessor.action(TaskAction.TIMEOUT);
+            } else {
+                logger.warn(
+                    "cannot find the task processor for task {}, so skip task processor action.",
+                    taskInstance.getTaskCode());
+            }
         }
-        if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
+        if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy
+            || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
             workflowExecuteRunnable.processTimeout();
             workflowExecuteRunnable.taskTimeout(taskInstance);
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index a868f9fa51..75ed5cf205 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -300,7 +300,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             } finally {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
-
         }
     }