You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/08/11 01:26:00 UTC

[dolphinscheduler] branch dev updated: Add pause to task instance status (#11390)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5fa3e7b1ed Add pause to task instance status (#11390)
5fa3e7b1ed is described below

commit 5fa3e7b1ed68e203056112941a5be0f1285a6a7c
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Aug 11 09:25:53 2022 +0800

    Add pause to task instance status (#11390)
---
 .../server/master/metrics/TaskMetrics.java         | 16 +++---
 .../master/runner/WorkflowExecuteRunnable.java     | 58 ++++++++++++----------
 .../master/runner/task/BlockingTaskProcessor.java  |  2 +-
 .../master/runner/task/ConditionTaskProcessor.java |  2 +-
 .../master/runner/task/DependentTaskProcessor.java |  2 +-
 .../master/runner/task/SwitchTaskProcessor.java    |  2 +-
 .../server/master/BlockingTaskTest.java            |  2 +-
 .../command/WorkflowStateEventChangeCommand.java   |  2 +
 .../service/process/ProcessServiceImpl.java        |  4 +-
 .../plugin/task/api/enums/TaskExecutionStatus.java |  7 ++-
 10 files changed, 53 insertions(+), 44 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java
index 6f9634fa5d..e1b1307b80 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java
@@ -17,25 +17,24 @@
 
 package org.apache.dolphinscheduler.server.master.metrics;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Supplier;
-
 import com.facebook.presto.jdbc.internal.guava.collect.ImmutableSet;
-
 import io.micrometer.core.instrument.Counter;
 import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.Metrics;
 import lombok.experimental.UtilityClass;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
 @UtilityClass
 public class TaskMetrics {
 
     private final Map<String, Counter> taskInstanceCounters = new HashMap<>();
 
     private final Set<String> taskInstanceStates = ImmutableSet.of(
-            "submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "fail", "stop");
+            "submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "kill", "fail", "stop");
 
     static {
         for (final String state : taskInstanceStates) {
@@ -44,8 +43,7 @@ public class TaskMetrics {
                     Counter.builder("ds.task.instance.count")
                             .tags("state", state)
                             .description(String.format("Process instance %s total count", state))
-                            .register(Metrics.globalRegistry)
-            );
+                            .register(Metrics.globalRegistry));
         }
 
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 09a19f6d63..c952832c99 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -17,19 +17,11 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
-import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
-
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -68,7 +60,13 @@ import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
-import org.apache.dolphinscheduler.server.master.event.*;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandleException;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandler;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager;
+import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
+import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
@@ -79,10 +77,9 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeanUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -103,13 +100,18 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeanUtils;
-
-import com.google.common.collect.Lists;
-
-import lombok.NonNull;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
 
 /**
  * Workflow execute task, used to execute a workflow instance.
@@ -1509,7 +1511,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             return WorkflowExecutionStatus.FAILURE;
         }
 
-        if (processInstance.isBlocked() || !isComplementEnd() || readyToSubmitTaskQueue.size() > 0) {
+        List<TaskInstance> pauseList = getCompleteTaskByState(TaskExecutionStatus.PAUSE);
+        if (CollectionUtils.isNotEmpty(pauseList) || processInstance.isBlocked() || !isComplementEnd()
+                || readyToSubmitTaskQueue.size() > 0) {
             return WorkflowExecutionStatus.PAUSE;
         } else {
             return WorkflowExecutionStatus.SUCCESS;
@@ -1534,7 +1538,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
         if (readyToSubmitTaskQueue.size() > 0) {
             for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
-                iter.next().setState(TaskExecutionStatus.KILL);
+                iter.next().setState(TaskExecutionStatus.PAUSE);
             }
         }
         return WorkflowExecutionStatus.BLOCK;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index 5148a9733f..c6a1a99d88 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -85,7 +85,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
     @Override
     protected boolean pauseTask() {
         // todo: task cannot be pause
-        taskInstance.setState(TaskExecutionStatus.KILL);
+        taskInstance.setState(TaskExecutionStatus.PAUSE);
         taskInstance.setEndTime(new Date());
         processService.saveTaskInstance(taskInstance);
         logger.info("blocking task has been paused");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 91d414c590..2e4f7bf1ac 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -95,7 +95,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
 
     @Override
     protected boolean pauseTask() {
-        this.taskInstance.setState(TaskExecutionStatus.KILL);
+        this.taskInstance.setState(TaskExecutionStatus.PAUSE);
         this.taskInstance.setEndTime(new Date());
         processService.saveTaskInstance(taskInstance);
         return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index e3f109163f..1dfa4812cf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -140,7 +140,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
 
     @Override
     protected boolean pauseTask() {
-        this.taskInstance.setState(TaskExecutionStatus.KILL);
+        this.taskInstance.setState(TaskExecutionStatus.PAUSE);
         this.taskInstance.setEndTime(new Date());
         processService.saveTaskInstance(taskInstance);
         return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 8c7c46cd6e..82802d8e61 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -104,7 +104,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
 
     @Override
     protected boolean pauseTask() {
-        this.taskInstance.setState(TaskExecutionStatus.KILL);
+        this.taskInstance.setState(TaskExecutionStatus.PAUSE);
         this.taskInstance.setEndTime(new Date());
         processService.saveTaskInstance(taskInstance);
         return true;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
index eeef4b2c2e..9d0d663278 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
@@ -239,7 +239,7 @@ public class BlockingTaskTest {
         blockingTaskProcessor.action(TaskAction.SUBMIT);
         blockingTaskProcessor.action(TaskAction.PAUSE);
         TaskExecutionStatus status = taskInstance.getState();
-        Assert.assertEquals(TaskExecutionStatus.KILL, status);
+        Assert.assertEquals(TaskExecutionStatus.PAUSE, status);
     }
 
     @Test
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java
index 0b45c48215..5db49f8500 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.remote.command;
 
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
@@ -27,6 +28,7 @@ import java.io.Serializable;
  * db task final result response command
  */
 @Data
+@NoArgsConstructor
 public class WorkflowStateEventChangeCommand implements Serializable {
 
     private String key;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index f655d9b744..a48d783f9b 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1570,7 +1570,7 @@ public class ProcessServiceImpl implements ProcessService {
             return null;
         }
         if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) {
-            taskInstance.setState(TaskExecutionStatus.KILL);
+            taskInstance.setState(TaskExecutionStatus.PAUSE);
         }
         taskInstance.setExecutorId(processInstance.getExecutorId());
         taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
@@ -1614,7 +1614,7 @@ public class ProcessServiceImpl implements ProcessService {
         // return pasue /stop if process instance state is ready pause / stop
         // or return submit success
         if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
-            state = TaskExecutionStatus.KILL;
+            state = TaskExecutionStatus.PAUSE;
         } else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP
                 || !checkProcessStrategy(taskInstance, processInstance)) {
             state = TaskExecutionStatus.KILL;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
index 5ca3f893d2..a8d297cb0d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
@@ -26,6 +26,7 @@ public enum TaskExecutionStatus {
 
     SUBMITTED_SUCCESS(0, "submit success"),
     RUNNING_EXECUTION(1, "running"),
+    PAUSE(3, "pause"),
     FAILURE(6, "failure"),
     SUCCESS(7, "success"),
     NEED_FAULT_TOLERANCE(8, "need fault tolerance"),
@@ -82,8 +83,12 @@ public enum TaskExecutionStatus {
         return this == TaskExecutionStatus.FAILURE;
     }
 
+    public boolean isPause() {
+        return this == TaskExecutionStatus.PAUSE;
+    }
+
     public boolean isFinished() {
-        return isSuccess() || isKill() || isFailure();
+        return isSuccess() || isKill() || isFailure() || isPause();
     }
 
     public boolean isNeedFaultTolerance() {