You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/02/15 11:04:30 UTC
[dolphinscheduler] branch dev updated: [Fix-8337][Master] Process instance can not be kill when task is failure and can be retry (#8387)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 08f642c [Fix-8337][Master] Process instance can not be kill when task is failure and can be retry (#8387)
08f642c is described below
commit 08f642c756707c37aca7b6fc9a3d1b381f667742
Author: xiangzihao <46...@qq.com>
AuthorDate: Tue Feb 15 19:04:19 2022 +0800
[Fix-8337][Master] Process instance can not be kill when task is failure and can be retry (#8387)
* fix bug_8337
* fix bug_8337
* fix bug_8337
* fix bug_8337
* fix bug_8337
* fix bug_8337
* fix bug_8337
* test
* fix dev_bug_8337
* fix dev_bug_8337
* fix dev_bug_8337
* fix bug_8337
---
.../master/runner/StateWheelExecuteThread.java | 19 +++++++++
.../master/runner/WorkflowExecuteThread.java | 47 +++++++++++++++++++---
.../service/process/ProcessService.java | 2 +-
.../queue/PeerTaskInstancePriorityQueue.java | 8 ++++
4 files changed, 69 insertions(+), 7 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 3abfdd2..f19dfbe 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -250,18 +250,29 @@ public class StateWheelExecuteThread extends Thread {
if (taskInstanceRetryCheckList.isEmpty()) {
return;
}
+
for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
+
TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
+ ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
+
+ if (processInstance.getState() == ExecutionStatus.READY_STOP) {
+ addProcessStopEvent(processInstance);
+ taskInstanceRetryCheckList.remove(taskInstanceKey);
+ break;
+ }
+
if (taskInstance == null) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
@@ -320,6 +331,14 @@ public class StateWheelExecuteThread extends Thread {
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
+ private void addProcessStopEvent(ProcessInstance processInstance) {
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
+ stateEvent.setProcessInstanceId(processInstance.getId());
+ stateEvent.setExecutionStatus(ExecutionStatus.STOP);
+ workflowExecuteThreadPool.submitStateEvent(stateEvent);
+ }
+
private void addTaskRetryEvent(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_RETRY);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 2bc16a3..676bb03 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -441,7 +441,7 @@ public class WorkflowExecuteThread {
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
submitPostNode(Long.toString(taskInstance.getTaskCode()));
- } else if (taskInstance.taskCanRetry()) {
+ } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
// retry task
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) {
@@ -490,7 +490,7 @@ public class WorkflowExecuteThread {
if (!taskInstance.taskCanRetry()) {
return;
}
- TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
+ TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) {
logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
return;
@@ -577,6 +577,7 @@ public class WorkflowExecuteThread {
logger.error("task instance id null, state event:{}", stateEvent);
return false;
}
+
if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) {
logger.error("mismatch task instance id, event:{}", stateEvent);
return false;
@@ -636,6 +637,12 @@ public class WorkflowExecuteThread {
private boolean processStateChangeHandler(StateEvent stateEvent) {
try {
logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
+
+ if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
+ this.updateProcessInstanceState(stateEvent);
+ return true;
+ }
+
if (processComplementData()) {
return true;
}
@@ -1441,10 +1448,6 @@ public class WorkflowExecuteThread {
// active task and retry task exists
return runningState(state);
}
- // process failure
- if (processFailed()) {
- return ExecutionStatus.FAILURE;
- }
// waiting thread
if (hasWaitingThreadTask()) {
@@ -1460,8 +1463,10 @@ public class WorkflowExecuteThread {
if (state == ExecutionStatus.READY_STOP) {
List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP);
List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
+ List<TaskInstance> faillist = getCompleteTaskByState(ExecutionStatus.FAILURE);
if (CollectionUtils.isNotEmpty(stopList)
|| CollectionUtils.isNotEmpty(killList)
+ || CollectionUtils.isNotEmpty(faillist)
|| !isComplementEnd()) {
return ExecutionStatus.STOP;
} else {
@@ -1469,6 +1474,11 @@ public class WorkflowExecuteThread {
}
}
+ // process failure
+ if (processFailed()) {
+ return ExecutionStatus.FAILURE;
+ }
+
// success
if (state == ExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
@@ -1535,6 +1545,26 @@ public class WorkflowExecuteThread {
}
/**
+ * stateEvent's execution status as process instance state
+ */
+ private void updateProcessInstanceState(StateEvent stateEvent) {
+ ExecutionStatus state = stateEvent.getExecutionStatus();
+ if (processInstance.getState() != state) {
+ logger.info(
+ "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+ processInstance.getId(), processInstance.getName(),
+ processInstance.getState(), state,
+ processInstance.getCommandType());
+
+ processInstance.setState(state);
+ if (state.typeIsFinished()) {
+ processInstance.setEndTime(new Date());
+ }
+ processService.updateProcessInstance(processInstance);
+ }
+ }
+
+ /**
* get task dependency result
*
* @param taskInstance task instance
@@ -1607,6 +1637,11 @@ public class WorkflowExecuteThread {
private void killAllTasks() {
logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
activeTaskProcessorMaps.size());
+
+ if (readyToSubmitTaskQueue.size() > 0) {
+ readyToSubmitTaskQueue.clear();
+ }
+
for (long taskCode : activeTaskProcessorMaps.keySet()) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode);
Integer taskInstanceId = validTaskMap.get(taskCode);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index d6251d5..56906cd 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1162,7 +1162,7 @@ public class ProcessService {
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
Thread.sleep(commitInterval);
} catch (Exception e) {
- logger.error("task commit to mysql failed", e);
+ logger.error("task commit to db failed", e);
}
retryTimes += 1;
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index b558d42..7502607 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -105,6 +105,14 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
}
/**
+ * clear task
+ *
+ */
+ public void clear() {
+ queue.clear();
+ }
+
+ /**
* whether contains the task instance
*
* @param taskInstance task instance