You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/15 02:39:06 UTC
[dolphinscheduler] branch 3.1.0-prepare updated: [Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838 (#11864) (#11950)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.0-prepare by this push:
new 27b69e608a [Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838 (#11864) (#11950)
27b69e608a is described below
commit 27b69e608a6e4dd43483922d53afea60352a7b06
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Thu Sep 15 10:39:00 2022 +0800
[Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838 (#11864) (#11950)
* [Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838
Co-authored-by: Yann Ann <29...@qq.com>
---
.../master/event/TaskTimeoutStateEventHandler.java | 37 +++++++++++++++-------
.../master/runner/WorkflowExecuteRunnable.java | 1 -
2 files changed, 25 insertions(+), 13 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
index 22f3f9dd8f..1678594323 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -28,35 +28,48 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@AutoService(StateEventHandler.class)
public class TaskTimeoutStateEventHandler implements StateEventHandler {
+ private static final Logger logger = LoggerFactory.getLogger(TaskTimeoutStateEventHandler.class);
+
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
- StateEvent stateEvent) throws StateEventHandleError {
+ StateEvent stateEvent) throws StateEventHandleError {
TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;
TaskMetrics.incTaskInstanceByState("timeout");
workflowExecuteRunnable.checkTaskInstanceByStateEvent(taskStateEvent);
TaskInstance taskInstance =
- workflowExecuteRunnable.getTaskInstance(taskStateEvent.getTaskInstanceId()).orElseThrow(
- () -> new StateEventHandleError(String.format(
- "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s",
- taskStateEvent.getTaskInstanceId())));
+ workflowExecuteRunnable.getTaskInstance(taskStateEvent.getTaskInstanceId()).orElseThrow(
+ () -> new StateEventHandleError(String.format(
+ "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s",
+ taskStateEvent.getTaskInstanceId())));
if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
return true;
}
- TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
- Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
- if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
- || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
- ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
- taskProcessor.action(TaskAction.TIMEOUT);
+ TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine()
+ .getTimeoutNotifyStrategy();
+ Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable
+ .getActiveTaskProcessMap();
+ if ((TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
+ || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy)) {
+ if (activeTaskProcessMap.containsKey(taskInstance.getTaskCode())) {
+ ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
+ taskProcessor.action(TaskAction.TIMEOUT);
+ } else {
+ logger.warn(
+ "cannot find the task processor for task {}, so skip task processor action.",
+ taskInstance.getTaskCode());
+ }
}
- if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
+ if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy
+ || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
workflowExecuteRunnable.processTimeout();
workflowExecuteRunnable.taskTimeout(taskInstance);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index a868f9fa51..75ed5cf205 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -300,7 +300,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
-
}
}