You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/11/02 03:55:23 UTC
[dolphinscheduler] branch 2.0.8-prepare updated: [Bugfix-12568] [Master] The retry task was submitted in advance will block other task (#12570)
This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch 2.0.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.8-prepare by this push:
new ddfdc5dc62 [Bugfix-12568] [Master] The retry task was submitted in advance will block other task (#12570)
ddfdc5dc62 is described below
commit ddfdc5dc6258e764d075b1a81d4a4d0a8aa5ef47
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Wed Nov 2 11:55:17 2022 +0800
[Bugfix-12568] [Master] The retry task was submitted in advance will block other task (#12570)
* fix 12568
* fix 12568
* code style
Co-authored-by: JinyLeeChina <ji...@foxmail.com>
---
.../server/master/runner/WorkflowExecuteThread.java | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 4d84227c40..89090cb639 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -1452,6 +1452,7 @@ public class WorkflowExecuteThread implements Runnable {
private void submitStandByTask() {
try {
int length = readyToSubmitTaskQueue.size();
+ List<TaskInstance> skipSubmitInstances = new ArrayList<>();
for (int i = 0; i < length; i++) {
TaskInstance task = readyToSubmitTaskQueue.peek();
if (task == null) {
@@ -1472,6 +1473,8 @@ public class WorkflowExecuteThread implements Runnable {
long failedTimeInterval = DateUtils.differSec(new Date(), retryTask.getEndTime());
if ((long) retryTask.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT > failedTimeInterval) {
logger.info("task name: {} retry waiting has not exceeded the interval time, and skip submission this time, task id:{}", task.getName(), task.getId());
+ readyToSubmitTaskQueue.remove(task);
+ skipSubmitInstances.add(task);
continue;
}
}
@@ -1512,6 +1515,10 @@ public class WorkflowExecuteThread implements Runnable {
logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);
}
}
+ for (TaskInstance task : skipSubmitInstances) {
+ readyToSubmitTaskQueue.put(task);
+ }
+ skipSubmitInstances.clear();
} catch (Exception e) {
logger.error("submit standby task error", e);
}