You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/14 06:47:42 UTC
[dolphinscheduler] branch dev updated: [Feature][API] New restful API for taskInstance (#13070)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 631e3a81d1 [Feature][API] New restful API for taskInstance (#13070)
631e3a81d1 is described below
commit 631e3a81d1667f36158fce15c6634d6230e0726c
Author: insist777 <84...@users.noreply.github.com>
AuthorDate: Wed Dec 14 14:47:36 2022 +0800
[Feature][API] New restful API for taskInstance (#13070)
---
.../api/controller/TaskInstanceV2Controller.java | 71 ++++++++++++++++++++++
.../apache/dolphinscheduler/api/enums/Status.java | 5 +-
.../api/service/TaskInstanceService.java | 11 ++++
.../api/service/impl/TaskInstanceServiceImpl.java | 13 ++++
.../controller/TaskInstanceV2ControllerTest.java | 38 ++++++++++++
.../dao/mapper/TaskInstanceMapper.xml | 2 +-
6 files changed, 136 insertions(+), 4 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java
index 8d416da1bb..521ba5bc10 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java
@@ -18,7 +18,10 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_INSTANCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceListPagingResponse;
@@ -28,6 +31,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
@@ -105,6 +109,50 @@ public class TaskInstanceV2Controller extends BaseController {
return new TaskInstanceListPagingResponse(result);
}
+ /**
+ * task savepoint, for stream task
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param id task instance id
+ * @return the result code and msg
+ */
+ @Operation(summary = "savepoint", description = "TASK_SAVEPOINT")
+ @Parameters({
+ @Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
+ })
+ @PostMapping(value = "/{id}/savepoint")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(TASK_SAVEPOINT_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<Object> taskSavePoint(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+ @PathVariable(value = "id") Integer id) {
+ return taskInstanceService.taskSavePoint(loginUser, projectCode, id);
+ }
+
+ /**
+ * task stop, for stream task
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param id task instance id
+ * @return the result code and msg
+ */
+ @Operation(summary = "stop", description = "TASK_INSTANCE_STOP")
+ @Parameters({
+ @Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
+ })
+ @PostMapping(value = "/{id}/stop")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(TASK_STOP_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<Object> stopTask(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+ @PathVariable(value = "id") Integer id) {
+ return taskInstanceService.stopTask(loginUser, projectCode, id);
+ }
+
/**
* change one task instance's state from FAILURE to FORCED_SUCCESS
*
@@ -127,4 +175,27 @@ public class TaskInstanceV2Controller extends BaseController {
Result result = taskInstanceService.forceTaskSuccess(loginUser, projectCode, id);
return new TaskInstanceSuccessResponse(result);
}
+
+ /**
+ * query taskInstance by taskInstanceCode
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param taskInstanceId taskInstance Id
+ * @return the result code and msg
+ */
+ @Operation(summary = "queryOneTaskInstance", description = "QUERY_ONE_TASK_INSTANCE")
+ @Parameters({
+ @Parameter(name = "taskInstanceId", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = Long.class), example = "1234567890")
+ })
+ @PostMapping(value = "/{taskInstanceId}", consumes = {"application/json"})
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(QUERY_TASK_INSTANCE_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public TaskInstance queryTaskInstanceByCode(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+ @PathVariable(value = "taskInstanceId") Long taskInstanceId) {
+ TaskInstance taskInstance = taskInstanceService.queryTaskInstanceById(loginUser, projectCode, taskInstanceId);
+ return taskInstance;
+ }
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index d82732d620..d0f82b3e99 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -260,10 +260,9 @@ public enum Status {
SCHEDULE_NOT_EXISTS(10203, "schedule {0} does not exist", "调度 id {0} 不存在"),
SCHEDULE_ALREADY_EXISTS(10204, "workflow {0} schedule {1} already exist, please update or delete it",
"工作流 {0} 的定时 {1} 已经存在,请更新或删除"),
-
- EXECUTE_NOT_DEFINE_TASK(10204, "please save and try again",
+ QUERY_TASK_INSTANCE_ERROR(10205, "query task instance error", "查询任务实例错误"),
+ EXECUTE_NOT_DEFINE_TASK(10206, "please save and try again",
"请先保存后再执行"),
-
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
index d5d05faa14..9fa259eb2b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -89,4 +90,14 @@ public interface TaskInstanceService {
* @return
*/
Result stopTask(User loginUser, long projectCode, Integer taskInstanceId);
+
+ /**
+ * query taskInstance by taskInstanceCode
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param taskInstanceId taskInstance id
+ * @return the result code and msg
+ */
+ TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index d830db5bf2..a212cb8299 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -306,4 +306,17 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
return result;
}
+
+ @Override
+ public TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId) {
+ Project project = projectMapper.queryByCode(projectCode);
+ // check user access for project
+ projectService.checkProjectAndAuthThrowException(loginUser, project, FORCED_SUCCESS);
+ TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
+ if (taskInstance == null) {
+ logger.error("Task instance can not be found, projectCode:{}, taskInstanceId:{}.", projectCode,
+ taskInstanceId);
+ }
+ return taskInstance;
+ }
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java
index 7624255087..bf1b5c5ac6 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java
@@ -93,4 +93,42 @@ public class TaskInstanceV2ControllerTest extends AbstractControllerTest {
}
+ @Test
+ public void testTaskSavePoint() {
+
+ Result mockResult = new Result();
+ putMsg(mockResult, Status.SUCCESS);
+
+ when(taskInstanceService.taskSavePoint(any(), Mockito.anyLong(), Mockito.anyInt())).thenReturn(mockResult);
+
+ Result taskResult = taskInstanceV2Controller.taskSavePoint(null, 1L, 1);
+ Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());
+
+ }
+
+ @Test
+ public void testStopTask() {
+
+ Result mockResult = new Result();
+ putMsg(mockResult, Status.SUCCESS);
+
+ when(taskInstanceService.stopTask(any(), Mockito.anyLong(), Mockito.anyInt())).thenReturn(mockResult);
+
+ Result taskResult = taskInstanceV2Controller.stopTask(null, 1L, 1);
+ Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());
+
+ }
+
+ @Test
+ public void testQueryTaskInstanceById() {
+ TaskInstance taskInstance = new TaskInstance();
+
+ when(taskInstanceService.queryTaskInstanceById(any(), Mockito.anyLong(), Mockito.anyLong()))
+ .thenReturn(taskInstance);
+
+ TaskInstance taskResult = taskInstanceV2Controller.queryTaskInstanceByCode(null, 1L, 1L);
+ Assertions.assertNotNull(taskResult);
+
+ }
+
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index d3726ebf95..fc454a6841 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -336,7 +336,7 @@
where instance.process_instance_id = #{processInstanceId}
and que.status = #{status}
</select>
-
+
<delete id="deleteByWorkflowInstanceId">
delete
from t_ds_task_instance