You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/10/15 10:04:10 UTC
[dolphinscheduler] branch dev updated: [BUG-6543][Master] process
instance state is always running when failure task (#6547)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 52a550b [BUG-6543][Master] process instance state is always running when failure task (#6547)
52a550b is described below
commit 52a550b6aefa5652f17b7bbc9e056a83efa23c8a
Author: OS <29...@users.noreply.github.com>
AuthorDate: Fri Oct 15 18:04:04 2021 +0800
[BUG-6543][Master] process instance state is always running when failure task (#6547)
* fix-6543: process instance state is always running when failure task exists
* code style
* code style
---
.../dolphinscheduler/dao/entity/TaskInstance.java | 23 ++++++++
.../server/master/runner/EventExecuteService.java | 2 +-
.../master/runner/StateWheelExecuteThread.java | 4 ++
.../master/runner/WorkflowExecuteThread.java | 62 ++++++++++------------
.../service/process/ProcessService.java | 2 +
.../queue/PeerTaskInstancePriorityQueue.java | 8 +--
6 files changed, 63 insertions(+), 38 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index ac18975..4076900 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.dao.entity;
+import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
+
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -25,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
@@ -591,6 +594,26 @@ public class TaskInstance implements Serializable {
}
}
+ /**
+ * whether the retry interval is timed out
+ *
+ * @return Boolean
+ */
+ public boolean retryTaskIntervalOverTime() {
+ if (getState() != ExecutionStatus.FAILURE) {
+ return true;
+ }
+ if (getId() == 0
+ || getMaxRetryTimes() == 0
+ || getRetryInterval() == 0) {
+ return true;
+ }
+ Date now = new Date();
+ long failedTimeInterval = DateUtils.differSec(now, getEndTime());
+ // task retry does not over time, return false
+ return getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval;
+ }
+
public Priority getTaskInstancePriority() {
return taskInstancePriority;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index 3356842..7c4b321 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -122,7 +122,7 @@ public class EventExecuteService extends Thread {
continue;
}
int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
- logger.info("handle process instance : {} events, count:{}",
+ logger.info("handle process instance : {} , events count:{}",
processInstanceId,
workflowExecuteThread.eventSize());
logger.info("already exists handler process size:{}", this.eventHandlerMap.size());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index f205e2d..f2b10f7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -96,6 +96,10 @@ public class StateWheelExecuteThread extends Thread {
return;
}
}
+ if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
+ processDependCheck(taskInstance);
+ taskInstanceCheckList.remove(taskInstance.getId());
+ }
if (taskInstance.isSubProcess() || taskInstance.isDependTask()) {
processDependCheck(taskInstance);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 3525ae4..c501262 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -349,7 +349,7 @@ public class WorkflowExecuteThread implements Runnable {
private boolean taskStateChangeHandler(StateEvent stateEvent) {
TaskInstance task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
- if (stateEvent.getExecutionStatus().typeIsFinished()) {
+ if (task.getState().typeIsFinished()) {
taskFinished(task);
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
@@ -372,6 +372,18 @@ public class WorkflowExecuteThread implements Runnable {
task.getState());
if (task.taskCanRetry()) {
addTaskToStandByList(task);
+ if (!task.retryTaskIntervalOverTime()) {
+ logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{} / {}, interval:{}",
+ processInstance.getId(),
+ task.getId(),
+ task.getState(),
+ task.getRetryTimes(),
+ task.getMaxRetryTimes(),
+ task.getRetryInterval());
+ this.addTimeoutCheck(task);
+ } else {
+ submitStandByTask();
+ }
return;
}
ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId());
@@ -650,18 +662,20 @@ public class WorkflowExecuteThread implements Runnable {
}
private void addTimeoutCheck(TaskInstance taskInstance) {
-
+ if (taskTimeoutCheckList.containsKey(taskInstance.getId())) {
+ return;
+ }
TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion()
);
taskInstance.setTaskDefine(taskDefinition);
- if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
- this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
- return;
- }
- if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
+ if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
+ } else {
+ if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
+ this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
+ }
}
}
@@ -1131,7 +1145,9 @@ public class WorkflowExecuteThread implements Runnable {
private void addTaskToStandByList(TaskInstance taskInstance) {
logger.info("add task to stand by list: {}", taskInstance.getName());
try {
- readyToSubmitTaskQueue.put(taskInstance);
+ if (!readyToSubmitTaskQueue.contains(taskInstance)) {
+ readyToSubmitTaskQueue.put(taskInstance);
+ }
} catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue error, taskName: {}", taskInstance.getName(), e);
}
@@ -1191,7 +1207,6 @@ public class WorkflowExecuteThread implements Runnable {
this.addStateEvent(stateEvent);
}
}
-
}
public boolean workFlowFinish() {
@@ -1199,29 +1214,6 @@ public class WorkflowExecuteThread implements Runnable {
}
/**
- * whether the retry interval is timed out
- *
- * @param taskInstance task instance
- * @return Boolean
- */
- private boolean retryTaskIntervalOverTime(TaskInstance taskInstance) {
- if (taskInstance.getState() != ExecutionStatus.FAILURE) {
- return true;
- }
- if (taskInstance.getId() == 0
- ||
- taskInstance.getMaxRetryTimes() == 0
- ||
- taskInstance.getRetryInterval() == 0) {
- return true;
- }
- Date now = new Date();
- long failedTimeInterval = DateUtils.differSec(now, taskInstance.getEndTime());
- // task retry does not over time, return false
- return taskInstance.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval;
- }
-
- /**
* handling the list of tasks to be submitted
*/
private void submitStandByTask() {
@@ -1252,12 +1244,16 @@ public class WorkflowExecuteThread implements Runnable {
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
- if (retryTaskIntervalOverTime(task)) {
+ if (task.retryTaskIntervalOverTime()) {
+ int originalId = task.getId();
TaskInstance taskInstance = submitTaskExec(task);
if (taskInstance == null) {
this.taskFailedSubmit = true;
} else {
removeTaskFromStandbyList(task);
+ if (taskInstance.getId() != originalId) {
+ activeTaskProcessorMaps.remove(originalId);
+ }
}
}
} else if (DependResult.FAILED == dependResult) {
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 2f1f632..22dd7e8 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1334,6 +1334,8 @@ public class ProcessService {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
}
taskInstance.setSubmitTime(null);
+ taskInstance.setLogPath(null);
+ taskInstance.setExecutePath(null);
taskInstance.setStartTime(null);
taskInstance.setEndTime(null);
taskInstance.setFlag(Flag.YES);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index 59a0fe2..b558d42 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -111,15 +111,15 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
* @return true is contains
*/
public boolean contains(TaskInstance taskInstance) {
- return queue.contains(taskInstance);
+ return this.contains(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
}
- public boolean contains(int taskId) {
-
+ public boolean contains(long taskCode, int taskVersion) {
Iterator<TaskInstance> iterator = this.queue.iterator();
while (iterator.hasNext()) {
TaskInstance taskInstance = iterator.next();
- if (taskId == taskInstance.getId()) {
+ if (taskCode == taskInstance.getTaskCode()
+ && taskVersion == taskInstance.getTaskDefinitionVersion()) {
return true;
}
}