You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/08/11 01:26:00 UTC
[dolphinscheduler] branch dev updated: Add pause to task instance status (#11390)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5fa3e7b1ed Add pause to task instance status (#11390)
5fa3e7b1ed is described below
commit 5fa3e7b1ed68e203056112941a5be0f1285a6a7c
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Aug 11 09:25:53 2022 +0800
Add pause to task instance status (#11390)
---
.../server/master/metrics/TaskMetrics.java | 16 +++---
.../master/runner/WorkflowExecuteRunnable.java | 58 ++++++++++++----------
.../master/runner/task/BlockingTaskProcessor.java | 2 +-
.../master/runner/task/ConditionTaskProcessor.java | 2 +-
.../master/runner/task/DependentTaskProcessor.java | 2 +-
.../master/runner/task/SwitchTaskProcessor.java | 2 +-
.../server/master/BlockingTaskTest.java | 2 +-
.../command/WorkflowStateEventChangeCommand.java | 2 +
.../service/process/ProcessServiceImpl.java | 4 +-
.../plugin/task/api/enums/TaskExecutionStatus.java | 7 ++-
10 files changed, 53 insertions(+), 44 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java
index 6f9634fa5d..e1b1307b80 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java
@@ -17,25 +17,24 @@
package org.apache.dolphinscheduler.server.master.metrics;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Supplier;
-
import com.facebook.presto.jdbc.internal.guava.collect.ImmutableSet;
-
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import lombok.experimental.UtilityClass;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
@UtilityClass
public class TaskMetrics {
private final Map<String, Counter> taskInstanceCounters = new HashMap<>();
private final Set<String> taskInstanceStates = ImmutableSet.of(
- "submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "fail", "stop");
+ "submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "kill", "fail", "stop");
static {
for (final String state : taskInstanceStates) {
@@ -44,8 +43,7 @@ public class TaskMetrics {
Counter.builder("ds.task.instance.count")
.tags("state", state)
.description(String.format("Process instance %s total count", state))
- .register(Metrics.globalRegistry)
- );
+ .register(Metrics.globalRegistry));
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 09a19f6d63..c952832c99 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -17,19 +17,11 @@
package org.apache.dolphinscheduler.server.master.runner;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
-import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
-
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -68,7 +60,13 @@ import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
-import org.apache.dolphinscheduler.server.master.event.*;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandleException;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandler;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager;
+import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
+import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
@@ -79,10 +77,9 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeanUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -103,13 +100,18 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeanUtils;
-
-import com.google.common.collect.Lists;
-
-import lombok.NonNull;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
/**
* Workflow execute task, used to execute a workflow instance.
@@ -1509,7 +1511,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return WorkflowExecutionStatus.FAILURE;
}
- if (processInstance.isBlocked() || !isComplementEnd() || readyToSubmitTaskQueue.size() > 0) {
+ List<TaskInstance> pauseList = getCompleteTaskByState(TaskExecutionStatus.PAUSE);
+ if (CollectionUtils.isNotEmpty(pauseList) || processInstance.isBlocked() || !isComplementEnd()
+ || readyToSubmitTaskQueue.size() > 0) {
return WorkflowExecutionStatus.PAUSE;
} else {
return WorkflowExecutionStatus.SUCCESS;
@@ -1534,7 +1538,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
if (readyToSubmitTaskQueue.size() > 0) {
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
- iter.next().setState(TaskExecutionStatus.KILL);
+ iter.next().setState(TaskExecutionStatus.PAUSE);
}
}
return WorkflowExecutionStatus.BLOCK;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index 5148a9733f..c6a1a99d88 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -85,7 +85,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean pauseTask() {
// todo: task cannot be pause
- taskInstance.setState(TaskExecutionStatus.KILL);
+ taskInstance.setState(TaskExecutionStatus.PAUSE);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
logger.info("blocking task has been paused");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 91d414c590..2e4f7bf1ac 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -95,7 +95,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean pauseTask() {
- this.taskInstance.setState(TaskExecutionStatus.KILL);
+ this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index e3f109163f..1dfa4812cf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -140,7 +140,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean pauseTask() {
- this.taskInstance.setState(TaskExecutionStatus.KILL);
+ this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 8c7c46cd6e..82802d8e61 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -104,7 +104,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean pauseTask() {
- this.taskInstance.setState(TaskExecutionStatus.KILL);
+ this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
index eeef4b2c2e..9d0d663278 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
@@ -239,7 +239,7 @@ public class BlockingTaskTest {
blockingTaskProcessor.action(TaskAction.SUBMIT);
blockingTaskProcessor.action(TaskAction.PAUSE);
TaskExecutionStatus status = taskInstance.getState();
- Assert.assertEquals(TaskExecutionStatus.KILL, status);
+ Assert.assertEquals(TaskExecutionStatus.PAUSE, status);
}
@Test
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java
index 0b45c48215..5db49f8500 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.remote.command;
import lombok.Data;
+import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -27,6 +28,7 @@ import java.io.Serializable;
* db task final result response command
*/
@Data
+@NoArgsConstructor
public class WorkflowStateEventChangeCommand implements Serializable {
private String key;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index f655d9b744..a48d783f9b 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1570,7 +1570,7 @@ public class ProcessServiceImpl implements ProcessService {
return null;
}
if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) {
- taskInstance.setState(TaskExecutionStatus.KILL);
+ taskInstance.setState(TaskExecutionStatus.PAUSE);
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
@@ -1614,7 +1614,7 @@ public class ProcessServiceImpl implements ProcessService {
// return pasue /stop if process instance state is ready pause / stop
// or return submit success
if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
- state = TaskExecutionStatus.KILL;
+ state = TaskExecutionStatus.PAUSE;
} else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP
|| !checkProcessStrategy(taskInstance, processInstance)) {
state = TaskExecutionStatus.KILL;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
index 5ca3f893d2..a8d297cb0d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
@@ -26,6 +26,7 @@ public enum TaskExecutionStatus {
SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"),
+ PAUSE(3, "pause"),
FAILURE(6, "failure"),
SUCCESS(7, "success"),
NEED_FAULT_TOLERANCE(8, "need fault tolerance"),
@@ -82,8 +83,12 @@ public enum TaskExecutionStatus {
return this == TaskExecutionStatus.FAILURE;
}
+ public boolean isPause() {
+ return this == TaskExecutionStatus.PAUSE;
+ }
+
public boolean isFinished() {
- return isSuccess() || isKill() || isFailure();
+ return isSuccess() || isKill() || isFailure() || isPause();
}
public boolean isNeedFaultTolerance() {