You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/01/07 04:26:14 UTC

[dolphinscheduler] branch dev updated: [Feature-7769][API] Add batch start process api (#7771)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f254124  [Feature-7769][API] Add batch start process api (#7771)
f254124 is described below

commit f2541248eb85f7977f75a3be3c7419688d20d713
Author: xiangzihao <46...@qq.com>
AuthorDate: Fri Jan 7 12:26:06 2022 +0800

    [Feature-7769][API] Add batch start process api (#7771)
    
    * add notes
    Co-authored-by: SbloodyS <sb...@qq.com>
---
 .../api/controller/ExecutorController.java         | 104 ++++++++++++++++++++-
 .../apache/dolphinscheduler/api/enums/Status.java  |   1 +
 .../src/main/resources/i18n/messages.properties    |   5 +-
 .../main/resources/i18n/messages_en_US.properties  |   2 +
 .../main/resources/i18n/messages_zh_CN.properties  |   1 +
 5 files changed, 109 insertions(+), 4 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 3cff198..4f651c0 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANC
 
 import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.utils.Result;
@@ -36,8 +37,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 
-import java.util.Map;
-
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -55,6 +54,13 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import springfox.documentation.annotations.ApiIgnore;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * executor controller
  */
@@ -139,6 +145,100 @@ public class ExecutorController extends BaseController {
     }
 
     /**
+     * batch execute process instance
+     * If any processDefinitionCode cannot be found, the failure information is returned and the status is set to
+     * failed. The successful task will run normally and will not stop
+     * 
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param processDefinitionCodes process definition codes
+     * @param scheduleTime schedule time
+     * @param failureStrategy failure strategy
+     * @param startNodeList start nodes list
+     * @param taskDependType task depend type
+     * @param execType execute type
+     * @param warningType warning type
+     * @param warningGroupId warning group id
+     * @param runMode run mode
+     * @param processInstancePriority process instance priority
+     * @param workerGroup worker group
+     * @param timeout timeout
+     * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
+     * @return start process result code
+     */
+    @ApiOperation(value = "batchStartProcessInstance", notes = "BATCH_RUN_PROCESS_INSTANCE_NOTES")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "1,2,3"),
+            @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"),
+            @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
+            @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"),
+            @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"),
+            @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"),
+            @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"),
+            @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
+            @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"),
+            @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
+            @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
+            @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
+            @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
+            @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+    })
+    @PostMapping(value = "batch-start-process-instance")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(START_PROCESS_INSTANCE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                       @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                       @RequestParam(value = "processDefinitionCodes") String processDefinitionCodes,
+                                       @RequestParam(value = "scheduleTime", required = false) String scheduleTime,
+                                       @RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy,
+                                       @RequestParam(value = "startNodeList", required = false) String startNodeList,
+                                       @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
+                                       @RequestParam(value = "execType", required = false) CommandType execType,
+                                       @RequestParam(value = "warningType", required = true) WarningType warningType,
+                                       @RequestParam(value = "warningGroupId", required = false) int warningGroupId,
+                                       @RequestParam(value = "runMode", required = false) RunMode runMode,
+                                       @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
+                                       @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
+                                       @RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
+                                       @RequestParam(value = "timeout", required = false) Integer timeout,
+                                       @RequestParam(value = "startParams", required = false) String startParams,
+                                       @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
+                                       @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) {
+
+        if (timeout == null) {
+            timeout = Constants.MAX_TASK_TIMEOUT;
+        }
+
+        Map<String, String> startParamMap = null;
+        if (startParams != null) {
+            startParamMap = JSONUtils.toMap(startParams);
+        }
+
+        Map<String, Object> result = new HashMap<>();
+        List<String> processDefinitionCodeArray = Arrays.asList(processDefinitionCodes.split(Constants.COMMA));
+        List<String> startFailedProcessDefinitionCodeList = new ArrayList<>();
+
+        processDefinitionCodeArray = processDefinitionCodeArray.stream().distinct().collect(Collectors.toList());
+
+        for (String strProcessDefinitionCode : processDefinitionCodeArray) {
+            long processDefinitionCode = Long.parseLong(strProcessDefinitionCode);
+            result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy,
+                    startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun);
+
+            if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
+                startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode));
+            }
+        }
+
+        if (!startFailedProcessDefinitionCodeList.isEmpty()) {
+            putMsg(result, Status.BATCH_START_PROCESS_INSTANCE_ERROR, String.join(Constants.COMMA, startFailedProcessDefinitionCodeList));
+        }
+
+        return returnDataList(result);
+    }
+
+    /**
      * do action to process instance:pause, stop, repeat, recover from pause, recover from stop
      *
      * @param loginUser login user
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index ec4931b..b5202c5 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -251,6 +251,7 @@ public enum Status {
     COUNT_PROCESS_INSTANCE_STATE_ERROR(50012, "count process instance state error", "查询各状态流程实例数错误"),
     COUNT_PROCESS_DEFINITION_USER_ERROR(50013, "count process definition user error", "查询各用户流程定义数错误"),
     START_PROCESS_INSTANCE_ERROR(50014, "start process instance error", "运行工作流实例错误"),
+    BATCH_START_PROCESS_INSTANCE_ERROR(50014, "batch start process instance error: {0}", "批量运行工作流实例错误: {0}"),
     EXECUTE_PROCESS_INSTANCE_ERROR(50015, "execute process instance error", "操作工作流实例错误"),
     CHECK_PROCESS_DEFINITION_ERROR(50016, "check process definition error", "工作流定义错误"),
     QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR(50017, "query recipients and copyers by process definition error", "查询收件人和抄送人错误"),
diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages.properties b/dolphinscheduler-api/src/main/resources/i18n/messages.properties
index 7a4282a..928e608 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages.properties
@@ -18,13 +18,14 @@
 QUERY_SCHEDULE_LIST_NOTES=query schedule list
 EXECUTE_PROCESS_TAG=execute process related operation
 PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
-RUN_PROCESS_INSTANCE_NOTES=run process instance 
+RUN_PROCESS_INSTANCE_NOTES=run process instance
+BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance
 START_NODE_LIST=start node list(node name)
 TASK_DEPEND_TYPE=task depend type
 COMMAND_TYPE=command type
 RUN_MODE=run mode
 TIMEOUT=timeout
-EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance 
+EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
 EXECUTE_TYPE=execute type
 START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition 
 GET_RECEIVER_CC_NOTES=query receiver cc 
diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
index 56800f2..a6ef33a 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
@@ -18,6 +18,8 @@ QUERY_SCHEDULE_LIST_NOTES=query schedule list
 EXECUTE_PROCESS_TAG=execute process related operation
 PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
 RUN_PROCESS_INSTANCE_NOTES=run process instance
+BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance(If any processDefinitionCode cannot be found, the failure\
+  \ information is returned and the status is set to failed. The successful task will run normally and will not stop)
 START_NODE_LIST=start node list(node name)
 TASK_DEPEND_TYPE=task depend type
 COMMAND_TYPE=command type
diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
index 6343d89..e5e7476 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
@@ -19,6 +19,7 @@ PROCESS_INSTANCE_EXECUTOR_TAG=流程实例执行相关操作
 UI_PLUGINS_TAG=UI插件相关操作
 WORK_FLOW_LINEAGE_TAG=工作流血缘相关操作
 RUN_PROCESS_INSTANCE_NOTES=运行流程实例
+BATCH_RUN_PROCESS_INSTANCE_NOTES=批量运行流程实例(其中有任意一个processDefinitionCode找不到,则返回失败信息并且状态置为失败,成功的任务会正常运行,不会停止)
 START_NODE_LIST=开始节点列表(节点name)
 TASK_DEPEND_TYPE=任务依赖类型
 COMMAND_TYPE=指令类型