You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/14 06:47:42 UTC

[dolphinscheduler] branch dev updated: [Feature][API] New restful API for taskInstance (#13070)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 631e3a81d1  [Feature][API] New restful API for taskInstance (#13070)
631e3a81d1 is described below

commit 631e3a81d1667f36158fce15c6634d6230e0726c
Author: insist777 <84...@users.noreply.github.com>
AuthorDate: Wed Dec 14 14:47:36 2022 +0800

     [Feature][API] New restful API for taskInstance (#13070)
---
 .../api/controller/TaskInstanceV2Controller.java   | 71 ++++++++++++++++++++++
 .../apache/dolphinscheduler/api/enums/Status.java  |  5 +-
 .../api/service/TaskInstanceService.java           | 11 ++++
 .../api/service/impl/TaskInstanceServiceImpl.java  | 13 ++++
 .../controller/TaskInstanceV2ControllerTest.java   | 38 ++++++++++++
 .../dao/mapper/TaskInstanceMapper.xml              |  2 +-
 6 files changed, 136 insertions(+), 4 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java
index 8d416da1bb..521ba5bc10 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java
@@ -18,7 +18,10 @@
 package org.apache.dolphinscheduler.api.controller;
 
 import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_INSTANCE_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR;
 
 import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
 import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceListPagingResponse;
@@ -28,6 +31,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.TaskInstanceService;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
@@ -105,6 +109,50 @@ public class TaskInstanceV2Controller extends BaseController {
         return new TaskInstanceListPagingResponse(result);
     }
 
+    /**
+     * task savepoint, for stream task
+     *
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param id task instance id
+     * @return the result code and msg
+     */
+    @Operation(summary = "savepoint", description = "TASK_SAVEPOINT")
+    @Parameters({
+            @Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
+    })
+    @PostMapping(value = "/{id}/savepoint")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(TASK_SAVEPOINT_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Object> taskSavePoint(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                        @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                        @PathVariable(value = "id") Integer id) {
+        return taskInstanceService.taskSavePoint(loginUser, projectCode, id);
+    }
+
+    /**
+     * task stop, for stream task
+     *
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param id task instance id
+     * @return the result code and msg
+     */
+    @Operation(summary = "stop", description = "TASK_INSTANCE_STOP")
+    @Parameters({
+            @Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
+    })
+    @PostMapping(value = "/{id}/stop")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(TASK_STOP_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<Object> stopTask(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                   @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                   @PathVariable(value = "id") Integer id) {
+        return taskInstanceService.stopTask(loginUser, projectCode, id);
+    }
+
     /**
      * change one task instance's state from FAILURE to FORCED_SUCCESS
      *
@@ -127,4 +175,27 @@ public class TaskInstanceV2Controller extends BaseController {
         Result result = taskInstanceService.forceTaskSuccess(loginUser, projectCode, id);
         return new TaskInstanceSuccessResponse(result);
     }
+
+    /**
+     * query taskInstance by taskInstanceCode
+     *
+     * @param loginUser   login user
+     * @param projectCode project code
+     * @param taskInstanceId  taskInstance Id
+     * @return the result code and msg
+     */
+    @Operation(summary = "queryOneTaskInstance", description = "QUERY_ONE_TASK_INSTANCE")
+    @Parameters({
+            @Parameter(name = "taskInstanceId", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = Long.class), example = "1234567890")
+    })
+    @PostMapping(value = "/{taskInstanceId}", consumes = {"application/json"})
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(QUERY_TASK_INSTANCE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public TaskInstance queryTaskInstanceByCode(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                                @PathVariable(value = "taskInstanceId") Long taskInstanceId) {
+        TaskInstance taskInstance = taskInstanceService.queryTaskInstanceById(loginUser, projectCode, taskInstanceId);
+        return taskInstance;
+    }
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index d82732d620..d0f82b3e99 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -260,10 +260,9 @@ public enum Status {
     SCHEDULE_NOT_EXISTS(10203, "schedule {0} does not exist", "调度 id {0} 不存在"),
     SCHEDULE_ALREADY_EXISTS(10204, "workflow {0} schedule {1} already exist, please update or delete it",
             "工作流 {0} 的定时 {1} 已经存在,请更新或删除"),
-
-    EXECUTE_NOT_DEFINE_TASK(10204, "please save and try again",
+    QUERY_TASK_INSTANCE_ERROR(10205, "query task instance error", "查询任务实例错误"),
+    EXECUTE_NOT_DEFINE_TASK(10206, "please save and try again",
             "请先保存后再执行"),
-
     UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
     UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
     RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
index d5d05faa14..9fa259eb2b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 
@@ -89,4 +90,14 @@ public interface TaskInstanceService {
      * @return
      */
     Result stopTask(User loginUser, long projectCode, Integer taskInstanceId);
+
+    /**
+     * query taskInstance by taskInstanceCode
+     *
+     * @param loginUser   login user
+     * @param projectCode project code
+     * @param taskInstanceId taskInstance id
+     * @return the result code and msg
+     */
+    TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId);
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index d830db5bf2..a212cb8299 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -306,4 +306,17 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
 
         return result;
     }
+
+    @Override
+    public TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId) {
+        Project project = projectMapper.queryByCode(projectCode);
+        // check user access for project
+        projectService.checkProjectAndAuthThrowException(loginUser, project, FORCED_SUCCESS);
+        TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
+        if (taskInstance == null) {
+            logger.error("Task instance can not be found, projectCode:{}, taskInstanceId:{}.", projectCode,
+                    taskInstanceId);
+        }
+        return taskInstance;
+    }
 }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java
index 7624255087..bf1b5c5ac6 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java
@@ -93,4 +93,42 @@ public class TaskInstanceV2ControllerTest extends AbstractControllerTest {
 
     }
 
+    @Test
+    public void testTaskSavePoint() {
+
+        Result mockResult = new Result();
+        putMsg(mockResult, Status.SUCCESS);
+
+        when(taskInstanceService.taskSavePoint(any(), Mockito.anyLong(), Mockito.anyInt())).thenReturn(mockResult);
+
+        Result taskResult = taskInstanceV2Controller.taskSavePoint(null, 1L, 1);
+        Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());
+
+    }
+
+    @Test
+    public void testStopTask() {
+
+        Result mockResult = new Result();
+        putMsg(mockResult, Status.SUCCESS);
+
+        when(taskInstanceService.stopTask(any(), Mockito.anyLong(), Mockito.anyInt())).thenReturn(mockResult);
+
+        Result taskResult = taskInstanceV2Controller.stopTask(null, 1L, 1);
+        Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());
+
+    }
+
+    @Test
+    public void testQueryTaskInstanceById() {
+        TaskInstance taskInstance = new TaskInstance();
+
+        when(taskInstanceService.queryTaskInstanceById(any(), Mockito.anyLong(), Mockito.anyLong()))
+                .thenReturn(taskInstance);
+
+        TaskInstance taskResult = taskInstanceV2Controller.queryTaskInstanceByCode(null, 1L, 1L);
+        Assertions.assertNotNull(taskResult);
+
+    }
+
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index d3726ebf95..fc454a6841 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -336,7 +336,7 @@
         where instance.process_instance_id = #{processInstanceId}
         and que.status = #{status}
     </select>
-
+    
     <delete id="deleteByWorkflowInstanceId">
         delete
         from t_ds_task_instance