You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/11/02 03:55:23 UTC

[dolphinscheduler] branch 2.0.8-prepare updated: [Bugfix-12568] [Master] The retry task was submitted in advance will block other task (#12570)

This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch 2.0.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.8-prepare by this push:
     new ddfdc5dc62 [Bugfix-12568] [Master] The retry task was submitted in advance will block other task (#12570)
ddfdc5dc62 is described below

commit ddfdc5dc6258e764d075b1a81d4a4d0a8aa5ef47
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Wed Nov 2 11:55:17 2022 +0800

    [Bugfix-12568] [Master] The retry task was submitted in advance will block other task (#12570)
    
    * fix 12568
    
    * fix 12568
    
    * code style
    
    Co-authored-by: JinyLeeChina <ji...@foxmail.com>
---
 .../server/master/runner/WorkflowExecuteThread.java                | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 4d84227c40..89090cb639 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -1452,6 +1452,7 @@ public class WorkflowExecuteThread implements Runnable {
     private void submitStandByTask() {
         try {
             int length = readyToSubmitTaskQueue.size();
+            List<TaskInstance> skipSubmitInstances = new ArrayList<>();
             for (int i = 0; i < length; i++) {
                 TaskInstance task = readyToSubmitTaskQueue.peek();
                 if (task == null) {
@@ -1472,6 +1473,8 @@ public class WorkflowExecuteThread implements Runnable {
                         long failedTimeInterval = DateUtils.differSec(new Date(), retryTask.getEndTime());
                         if ((long) retryTask.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT > failedTimeInterval) {
                             logger.info("task name: {} retry waiting has not exceeded the interval time, and skip submission this time, task id:{}", task.getName(), task.getId());
+                            readyToSubmitTaskQueue.remove(task);
+                            skipSubmitInstances.add(task);
                             continue;
                         }
                     }
@@ -1512,6 +1515,10 @@ public class WorkflowExecuteThread implements Runnable {
                     logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);
                 }
             }
+            for (TaskInstance task : skipSubmitInstances) {
+                readyToSubmitTaskQueue.put(task);
+            }
+            skipSubmitInstances.clear();
         } catch (Exception e) {
             logger.error("submit standby task error", e);
         }