You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/02/15 11:04:30 UTC

[dolphinscheduler] branch dev updated: [Fix-8337][Master] Process instance can not be kill when task is failure and can be retry (#8387)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 08f642c  [Fix-8337][Master] Process instance can not be kill when task is failure and can be retry (#8387)
08f642c is described below

commit 08f642c756707c37aca7b6fc9a3d1b381f667742
Author: xiangzihao <46...@qq.com>
AuthorDate: Tue Feb 15 19:04:19 2022 +0800

    [Fix-8337][Master] Process instance can not be kill when task is failure and can be retry (#8387)
    
    * fix bug_8337
    
    * fix bug_8337
    
    * fix bug_8337
    
    * fix bug_8337
    
    * fix bug_8337
    
    * fix bug_8337
    
    * fix bug_8337
    
    * test
    
    * fix dev_bug_8337
    
    * fix dev_bug_8337
    
    * fix dev_bug_8337
    
    * fix bug_8337
---
 .../master/runner/StateWheelExecuteThread.java     | 19 +++++++++
 .../master/runner/WorkflowExecuteThread.java       | 47 +++++++++++++++++++---
 .../service/process/ProcessService.java            |  2 +-
 .../queue/PeerTaskInstancePriorityQueue.java       |  8 ++++
 4 files changed, 69 insertions(+), 7 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 3abfdd2..f19dfbe 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -250,18 +250,29 @@ public class StateWheelExecuteThread extends Thread {
         if (taskInstanceRetryCheckList.isEmpty()) {
             return;
         }
+
         for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) {
             int processInstanceId = taskInstanceKey.getProcessInstanceId();
             long taskCode = taskInstanceKey.getTaskCode();
 
             WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+
             if (workflowExecuteThread == null) {
                 logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
                         processInstanceId, taskCode);
                 taskInstanceRetryCheckList.remove(taskInstanceKey);
                 continue;
             }
+
             TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
+            ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
+
+            if (processInstance.getState() == ExecutionStatus.READY_STOP) {
+                addProcessStopEvent(processInstance);
+                taskInstanceRetryCheckList.remove(taskInstanceKey);
+                break;
+            }
+
             if (taskInstance == null) {
                 logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
                         processInstanceId, taskCode);
@@ -320,6 +331,14 @@ public class StateWheelExecuteThread extends Thread {
         workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 
+    private void addProcessStopEvent(ProcessInstance processInstance) {
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
+        stateEvent.setProcessInstanceId(processInstance.getId());
+        stateEvent.setExecutionStatus(ExecutionStatus.STOP);
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
+    }
+
     private void addTaskRetryEvent(TaskInstance taskInstance) {
         StateEvent stateEvent = new StateEvent();
         stateEvent.setType(StateEventType.TASK_RETRY);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 2bc16a3..676bb03 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -441,7 +441,7 @@ public class WorkflowExecuteThread {
             processInstance.setVarPool(taskInstance.getVarPool());
             processService.saveProcessInstance(processInstance);
             submitPostNode(Long.toString(taskInstance.getTaskCode()));
-        } else if (taskInstance.taskCanRetry()) {
+        } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
             // retry task
             retryTaskInstance(taskInstance);
         } else if (taskInstance.getState().typeIsFailure()) {
@@ -490,7 +490,7 @@ public class WorkflowExecuteThread {
         if (!taskInstance.taskCanRetry()) {
             return;
         }
-        TaskInstance newTaskInstance =  cloneRetryTaskInstance(taskInstance);
+        TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
         if (newTaskInstance == null) {
             logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
             return;
@@ -577,6 +577,7 @@ public class WorkflowExecuteThread {
             logger.error("task instance id null, state event:{}", stateEvent);
             return false;
         }
+
         if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) {
             logger.error("mismatch task instance id, event:{}", stateEvent);
             return false;
@@ -636,6 +637,12 @@ public class WorkflowExecuteThread {
     private boolean processStateChangeHandler(StateEvent stateEvent) {
         try {
             logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
+
+            if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
+                this.updateProcessInstanceState(stateEvent);
+                return true;
+            }
+
             if (processComplementData()) {
                 return true;
             }
@@ -1441,10 +1448,6 @@ public class WorkflowExecuteThread {
             // active task and retry task exists
             return runningState(state);
         }
-        // process failure
-        if (processFailed()) {
-            return ExecutionStatus.FAILURE;
-        }
 
         // waiting thread
         if (hasWaitingThreadTask()) {
@@ -1460,8 +1463,10 @@ public class WorkflowExecuteThread {
         if (state == ExecutionStatus.READY_STOP) {
             List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP);
             List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
+            List<TaskInstance> faillist = getCompleteTaskByState(ExecutionStatus.FAILURE);
             if (CollectionUtils.isNotEmpty(stopList)
                 || CollectionUtils.isNotEmpty(killList)
+                || CollectionUtils.isNotEmpty(faillist)
                 || !isComplementEnd()) {
                 return ExecutionStatus.STOP;
             } else {
@@ -1469,6 +1474,11 @@ public class WorkflowExecuteThread {
             }
         }
 
+        // process failure
+        if (processFailed()) {
+            return ExecutionStatus.FAILURE;
+        }
+
         // success
         if (state == ExecutionStatus.RUNNING_EXECUTION) {
             List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
@@ -1535,6 +1545,26 @@ public class WorkflowExecuteThread {
     }
 
     /**
+     * stateEvent's execution status as process instance state
+     */
+    private void updateProcessInstanceState(StateEvent stateEvent) {
+        ExecutionStatus state = stateEvent.getExecutionStatus();
+        if (processInstance.getState() != state) {
+            logger.info(
+                    "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+                    processInstance.getId(), processInstance.getName(),
+                    processInstance.getState(), state,
+                    processInstance.getCommandType());
+
+            processInstance.setState(state);
+            if (state.typeIsFinished()) {
+                processInstance.setEndTime(new Date());
+            }
+            processService.updateProcessInstance(processInstance);
+        }
+    }
+
+    /**
      * get task dependency result
      *
      * @param taskInstance task instance
@@ -1607,6 +1637,11 @@ public class WorkflowExecuteThread {
     private void killAllTasks() {
         logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
             activeTaskProcessorMaps.size());
+
+        if (readyToSubmitTaskQueue.size() > 0) {
+            readyToSubmitTaskQueue.clear();
+        }
+
         for (long taskCode : activeTaskProcessorMaps.keySet()) {
             ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode);
             Integer taskInstanceId = validTaskMap.get(taskCode);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index d6251d5..56906cd 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1162,7 +1162,7 @@ public class ProcessService {
                 logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
                 Thread.sleep(commitInterval);
             } catch (Exception e) {
-                logger.error("task commit to mysql failed", e);
+                logger.error("task commit to db failed", e);
             }
             retryTimes += 1;
         }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index b558d42..7502607 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -105,6 +105,14 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
     }
 
     /**
+     * clear task
+     *
+     */
+    public void clear() {
+        queue.clear();
+    }
+
+    /**
      * whether contains the task instance
      *
      * @param taskInstance task instance