You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/01/07 01:45:01 UTC

[dolphinscheduler] branch revert-7808-fix_retry created (now 0ce3840)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a change to branch revert-7808-fix_retry
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git.


      at 0ce3840  Revert "[Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry (#7808)"

This branch includes the following new commits:

     new 0ce3840  Revert "[Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry (#7808)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[dolphinscheduler] 01/01: Revert "[Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry (#7808)"

Posted by le...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch revert-7808-fix_retry
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 0ce38403d1f7412a353ad234c6519cef8d121b69
Author: BaoLiang <29...@users.noreply.github.com>
AuthorDate: Fri Jan 7 09:44:56 2022 +0800

    Revert "[Bug-7788][MasterServer] fix submit duplicate tasks sometimes when retry (#7808)"
    
    This reverts commit ff3f5717c4247e3be10ef087983ac6b7129519f6.
---
 .../master/runner/WorkflowExecuteThread.java       | 31 +++-------------------
 1 file changed, 4 insertions(+), 27 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 92862ae..085c1d5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -76,7 +76,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -1172,35 +1171,13 @@ public class WorkflowExecuteThread implements Runnable {
      * @param taskInstance task instance
      */
     private void addTaskToStandByList(TaskInstance taskInstance) {
+        logger.info("add task to stand by list, task name: {} , task id:{}", taskInstance.getName(), taskInstance.getId());
         try {
-            if (readyToSubmitTaskQueue.contains(taskInstance)) {
-                logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
-                return;
+            if (!readyToSubmitTaskQueue.contains(taskInstance)) {
+                readyToSubmitTaskQueue.put(taskInstance);
             }
-            // need to check if the tasks with same task code is active
-            boolean active = false;
-            Map<Integer, TaskInstance> taskInstanceMap = taskInstanceHashMap.column(taskInstance.getTaskCode());
-            if (taskInstanceMap != null && taskInstanceMap.size() > 0) {
-                for (Entry<Integer, TaskInstance> entry : taskInstanceMap.entrySet()) {
-                    Integer taskInstanceId = entry.getKey();
-                    if (activeTaskProcessorMaps.containsKey(taskInstanceId)) {
-                        TaskInstance latestTaskInstance = processService.findTaskInstanceById(taskInstanceId);
-                        if (latestTaskInstance != null && !latestTaskInstance.getState().typeIsFailure()) {
-                            active = true;
-                            break;
-                        }
-                    }
-                }
-            }
-            if (active) {
-                logger.warn("task was found in active task list, task code:{}", taskInstance.getTaskCode());
-                return;
-            }
-            logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
-                    taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
-            readyToSubmitTaskQueue.put(taskInstance);
         } catch (Exception e) {
-            logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
+            logger.error("add task instance to readyToSubmitTaskQueue, taskName: {}, task id: {}", taskInstance.getName(), taskInstance.getId(), e);
         }
     }