You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/07 02:42:06 UTC

[dolphinscheduler] branch 2.0.3-prepare updated: [Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry (#7808) (#7866)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 2.0.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.3-prepare by this push:
     new 5592e7b  [Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry (#7808) (#7866)
5592e7b is described below

commit 5592e7bb7b770c8d1503630c044cbb3f0b10b340
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Fri Jan 7 10:42:01 2022 +0800

    [Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry (#7808) (#7866)
    
    * [Bug-7788] fix submit duplicate tasks sometimes when retry
    
    * add exist check when add task to standby list
    
    * update
    
    * put queue contain judge first
    
    Co-authored-by: caishunfeng <53...@qq.com>
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../master/runner/WorkflowExecuteThread.java       | 31 +++++++++++++++++++---
 1 file changed, 27 insertions(+), 4 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 085c1d5..92862ae 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -76,6 +76,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -1171,13 +1172,35 @@ public class WorkflowExecuteThread implements Runnable {
      * @param taskInstance task instance
      */
     private void addTaskToStandByList(TaskInstance taskInstance) {
-        logger.info("add task to stand by list, task name: {} , task id:{}", taskInstance.getName(), taskInstance.getId());
         try {
-            if (!readyToSubmitTaskQueue.contains(taskInstance)) {
-                readyToSubmitTaskQueue.put(taskInstance);
+            if (readyToSubmitTaskQueue.contains(taskInstance)) {
+                logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
+                return;
             }
+            // need to check if the tasks with same task code is active
+            boolean active = false;
+            Map<Integer, TaskInstance> taskInstanceMap = taskInstanceHashMap.column(taskInstance.getTaskCode());
+            if (taskInstanceMap != null && taskInstanceMap.size() > 0) {
+                for (Entry<Integer, TaskInstance> entry : taskInstanceMap.entrySet()) {
+                    Integer taskInstanceId = entry.getKey();
+                    if (activeTaskProcessorMaps.containsKey(taskInstanceId)) {
+                        TaskInstance latestTaskInstance = processService.findTaskInstanceById(taskInstanceId);
+                        if (latestTaskInstance != null && !latestTaskInstance.getState().typeIsFailure()) {
+                            active = true;
+                            break;
+                        }
+                    }
+                }
+            }
+            if (active) {
+                logger.warn("task was found in active task list, task code:{}", taskInstance.getTaskCode());
+                return;
+            }
+            logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
+                    taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
+            readyToSubmitTaskQueue.put(taskInstance);
         } catch (Exception e) {
-            logger.error("add task instance to readyToSubmitTaskQueue, taskName: {}, task id: {}", taskInstance.getName(), taskInstance.getId(), e);
+            logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
         }
     }