You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/08/13 06:19:20 UTC
[dolphinscheduler] branch dev updated: Set master's task running status in `runTask` to avoid the task group acquire failed, but the task status is in running (#11451)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 05589606a2 Set master's task running status in `runTask` to avoid the task group acquire failed, but the task status is in running (#11451)
05589606a2 is described below
commit 05589606a2fe6b7287b146e82f927b1eceaed701
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Aug 13 14:19:13 2022 +0800
Set master's task running status in `runTask` to avoid the task group acquire failed, but the task status is in running (#11451)
---
.../master/runner/task/BlockingTaskProcessor.java | 6 ++--
.../master/runner/task/ConditionTaskProcessor.java | 6 ++--
.../master/runner/task/SwitchTaskProcessor.java | 34 ++++++++++------------
3 files changed, 23 insertions(+), 23 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index c6a1a99d88..96fed73c0d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -114,19 +114,21 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
return false;
}
this.setTaskExecutionLogger();
- initTaskParameters();
- logger.info("blocking task start");
+ logger.info("blocking task submit success");
return true;
}
@Override
protected boolean runTask() {
+ logger.info("blocking task starting");
+ initTaskParameters();
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
+ logger.info("blocking task finished");
return true;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 2e4f7bf1ac..ed6ec54baf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -67,19 +67,21 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
return false;
}
this.setTaskExecutionLogger();
- initTaskParameters();
- logger.info("condition task start");
+ logger.info("condition task submit success");
return true;
}
@Override
public boolean runTask() {
+ initTaskParameters();
+ logger.info("condition task start");
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
+ logger.info("condition task finished");
return true;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 82802d8e61..2c9396b244 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
-
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -30,9 +31,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.server.master.utils.SwitchTaskUtils;
import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -41,7 +39,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import com.google.auto.service.AutoService;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
/**
* switch task processor
@@ -65,6 +63,13 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return false;
}
this.setTaskExecutionLogger();
+ logger.info("switch task submit success");
+ return true;
+ }
+
+ @Override
+ public boolean runTask() {
+ logger.info("switch task starting");
taskInstance.setLogPath(
LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
@@ -74,21 +79,12 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
- return true;
- }
- @Override
- public boolean runTask() {
- try {
- if (!this.taskInstance().getState().isFinished() && setSwitchResult()) {
- endTaskState();
- }
- } catch (Exception e) {
- logger.error("update work flow {} switch task {} state error:",
- this.processInstance.getId(),
- this.taskInstance.getId(),
- e);
+ if (!this.taskInstance().getState().isFinished()) {
+ setSwitchResult();
}
+ endTaskState();
+ logger.info("switch task finished");
return true;
}