You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/10/15 10:04:10 UTC

[dolphinscheduler] branch dev updated: [BUG-6543][Master] process instance state is always running when failure task (#6547)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 52a550b  [BUG-6543][Master] process instance state is always running when failure task (#6547)
52a550b is described below

commit 52a550b6aefa5652f17b7bbc9e056a83efa23c8a
Author: OS <29...@users.noreply.github.com>
AuthorDate: Fri Oct 15 18:04:04 2021 +0800

    [BUG-6543][Master] process instance state is always running when failure task (#6547)
    
    * fix-6543: process instance state is always running when failure task exists
    
    * code style
    
    * code style
---
 .../dolphinscheduler/dao/entity/TaskInstance.java  | 23 ++++++++
 .../server/master/runner/EventExecuteService.java  |  2 +-
 .../master/runner/StateWheelExecuteThread.java     |  4 ++
 .../master/runner/WorkflowExecuteThread.java       | 62 ++++++++++------------
 .../service/process/ProcessService.java            |  2 +
 .../queue/PeerTaskInstancePriorityQueue.java       |  8 +--
 6 files changed, 63 insertions(+), 38 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index ac18975..4076900 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.dao.entity;
 
+import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
+
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -25,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
 import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
@@ -591,6 +594,26 @@ public class TaskInstance implements Serializable {
         }
     }
 
+    /**
+     * whether the retry interval is timed out
+     *
+     * @return Boolean
+     */
+    public boolean retryTaskIntervalOverTime() {
+        if (getState() != ExecutionStatus.FAILURE) {
+            return true;
+        }
+        if (getId() == 0
+                || getMaxRetryTimes() == 0
+                || getRetryInterval() == 0) {
+            return true;
+        }
+        Date now = new Date();
+        long failedTimeInterval = DateUtils.differSec(now, getEndTime());
+        // task retry does not over time, return false
+        return getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval;
+    }
+
     public Priority getTaskInstancePriority() {
         return taskInstancePriority;
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index 3356842..7c4b321 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -122,7 +122,7 @@ public class EventExecuteService extends Thread {
                 continue;
             }
             int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
-            logger.info("handle process instance : {} events, count:{}",
+            logger.info("handle process instance : {} , events count:{}",
                     processInstanceId,
                     workflowExecuteThread.eventSize());
             logger.info("already exists handler process size:{}", this.eventHandlerMap.size());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index f205e2d..f2b10f7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -96,6 +96,10 @@ public class StateWheelExecuteThread extends Thread {
                     return;
                 }
             }
+            if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
+                processDependCheck(taskInstance);
+                taskInstanceCheckList.remove(taskInstance.getId());
+            }
             if (taskInstance.isSubProcess() || taskInstance.isDependTask()) {
                 processDependCheck(taskInstance);
             }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 3525ae4..c501262 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -349,7 +349,7 @@ public class WorkflowExecuteThread implements Runnable {
 
     private boolean taskStateChangeHandler(StateEvent stateEvent) {
         TaskInstance task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
-        if (stateEvent.getExecutionStatus().typeIsFinished()) {
+        if (task.getState().typeIsFinished()) {
             taskFinished(task);
         } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
             ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
@@ -372,6 +372,18 @@ public class WorkflowExecuteThread implements Runnable {
                 task.getState());
         if (task.taskCanRetry()) {
             addTaskToStandByList(task);
+            if (!task.retryTaskIntervalOverTime()) {
+                logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{} / {}, interval:{}",
+                        processInstance.getId(),
+                        task.getId(),
+                        task.getState(),
+                        task.getRetryTimes(),
+                        task.getMaxRetryTimes(),
+                        task.getRetryInterval());
+                this.addTimeoutCheck(task);
+            } else {
+                submitStandByTask();
+            }
             return;
         }
         ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId());
@@ -650,18 +662,20 @@ public class WorkflowExecuteThread implements Runnable {
     }
 
     private void addTimeoutCheck(TaskInstance taskInstance) {
-
+        if (taskTimeoutCheckList.containsKey(taskInstance.getId())) {
+            return;
+        }
         TaskDefinition taskDefinition = processService.findTaskDefinition(
                 taskInstance.getTaskCode(),
                 taskInstance.getTaskDefinitionVersion()
         );
         taskInstance.setTaskDefine(taskDefinition);
-        if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
-            this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
-            return;
-        }
-        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
+        if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) {
             this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
+        } else {
+            if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
+                this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
+            }
         }
     }
 
@@ -1131,7 +1145,9 @@ public class WorkflowExecuteThread implements Runnable {
     private void addTaskToStandByList(TaskInstance taskInstance) {
         logger.info("add task to stand by list: {}", taskInstance.getName());
         try {
-            readyToSubmitTaskQueue.put(taskInstance);
+            if (!readyToSubmitTaskQueue.contains(taskInstance)) {
+                readyToSubmitTaskQueue.put(taskInstance);
+            }
         } catch (Exception e) {
             logger.error("add task instance to readyToSubmitTaskQueue error, taskName: {}", taskInstance.getName(), e);
         }
@@ -1191,7 +1207,6 @@ public class WorkflowExecuteThread implements Runnable {
                 this.addStateEvent(stateEvent);
             }
         }
-
     }
 
     public boolean workFlowFinish() {
@@ -1199,29 +1214,6 @@ public class WorkflowExecuteThread implements Runnable {
     }
 
     /**
-     * whether the retry interval is timed out
-     *
-     * @param taskInstance task instance
-     * @return Boolean
-     */
-    private boolean retryTaskIntervalOverTime(TaskInstance taskInstance) {
-        if (taskInstance.getState() != ExecutionStatus.FAILURE) {
-            return true;
-        }
-        if (taskInstance.getId() == 0
-                ||
-                taskInstance.getMaxRetryTimes() == 0
-                ||
-                taskInstance.getRetryInterval() == 0) {
-            return true;
-        }
-        Date now = new Date();
-        long failedTimeInterval = DateUtils.differSec(now, taskInstance.getEndTime());
-        // task retry does not over time, return false
-        return taskInstance.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval;
-    }
-
-    /**
      * handling the list of tasks to be submitted
      */
     private void submitStandByTask() {
@@ -1252,12 +1244,16 @@ public class WorkflowExecuteThread implements Runnable {
                 }
                 DependResult dependResult = getDependResultForTask(task);
                 if (DependResult.SUCCESS == dependResult) {
-                    if (retryTaskIntervalOverTime(task)) {
+                    if (task.retryTaskIntervalOverTime()) {
+                        int originalId = task.getId();
                         TaskInstance taskInstance = submitTaskExec(task);
                         if (taskInstance == null) {
                             this.taskFailedSubmit = true;
                         } else {
                             removeTaskFromStandbyList(task);
+                            if (taskInstance.getId() != originalId) {
+                                activeTaskProcessorMaps.remove(originalId);
+                            }
                         }
                     }
                 } else if (DependResult.FAILED == dependResult) {
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 2f1f632..22dd7e8 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1334,6 +1334,8 @@ public class ProcessService {
                         taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
                     }
                     taskInstance.setSubmitTime(null);
+                    taskInstance.setLogPath(null);
+                    taskInstance.setExecutePath(null);
                     taskInstance.setStartTime(null);
                     taskInstance.setEndTime(null);
                     taskInstance.setFlag(Flag.YES);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index 59a0fe2..b558d42 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -111,15 +111,15 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
      * @return true is contains
      */
     public boolean contains(TaskInstance taskInstance) {
-        return queue.contains(taskInstance);
+        return this.contains(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
     }
 
-    public boolean contains(int taskId) {
-
+    public boolean contains(long taskCode, int taskVersion) {
         Iterator<TaskInstance> iterator = this.queue.iterator();
         while (iterator.hasNext()) {
             TaskInstance taskInstance = iterator.next();
-            if (taskId == taskInstance.getId()) {
+            if (taskCode == taskInstance.getTaskCode()
+                    && taskVersion == taskInstance.getTaskDefinitionVersion()) {
                 return true;
             }
         }