You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2021/12/24 08:38:03 UTC
[dolphinscheduler] branch dev updated: [Feature-#6422] [api-server] task group queue (#7491)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9e763ad [Feature-#6422] [api-server] task group queue (#7491)
9e763ad is described below
commit 9e763ad0d1e5576e6e5735fb2afd8638011e54dc
Author: wangxj3 <85...@qq.com>
AuthorDate: Fri Dec 24 16:37:59 2021 +0800
[Feature-#6422] [api-server] task group queue (#7491)
* add task group
* modify task group
* pull dev
* add license header
* modify code style
* fix code style
* fix sql error
* fix error
* fix test
* fix test
* fix test
* fix test
* fix code style
* fix ut
* code style
* fix unit test
* test ut
* ut
* add unittest
* test ut
* modify back ut
* majorization code
* fix conflict
* fix ut
* add task group api
* reset file
* fix ut
* fix lost column
* fix ut
* fix ut
* fix ut
* fix ut
* delete duplicate code
* fix code style 、name
* fix ut
* fix mapper
Co-authored-by: wangxj <wangxj31>
---
.../api/controller/TaskGroupController.java | 103 ++++++++++++------
.../api/service/TaskGroupQueueService.java | 8 +-
.../api/service/TaskGroupService.java | 12 ++-
.../service/impl/TaskGroupQueueServiceImpl.java | 43 ++++++--
.../api/service/impl/TaskGroupServiceImpl.java | 90 ++++++++++------
.../main/resources/i18n/messages_en_US.properties | 118 +++++++++++----------
.../main/resources/i18n/messages_zh_CN.properties | 2 +
.../api/controller/TaskGroupControllerTest.java | 23 +---
.../api/service/TaskGroupServiceTest.java | 11 +-
.../dolphinscheduler/common/model/TaskNode.java | 25 +++++
.../dao/entity/TaskDefinition.java | 12 +++
.../dolphinscheduler/dao/entity/TaskGroup.java | 15 ++-
.../dao/entity/TaskGroupQueue.java | 69 +++++++++---
.../dolphinscheduler/dao/entity/TaskInstance.java | 14 +++
.../dao/mapper/TaskGroupMapper.java | 4 +-
.../dao/mapper/TaskGroupQueueMapper.java | 9 +-
.../dao/mapper/TaskGroupMapper.xml | 13 ++-
.../dao/mapper/TaskGroupQueueMapper.xml | 36 ++++++-
.../src/main/resources/sql/dolphinscheduler_h2.sql | 5 +-
.../main/resources/sql/dolphinscheduler_mysql.sql | 4 +-
.../resources/sql/dolphinscheduler_postgresql.sql | 3 +
.../dao/mapper/TaskGroupMapperTest.java | 4 +-
.../dao/mapper/TaskGroupQueueMapperTest.java | 1 +
.../ClickHouseDataSourceProcessorTest.java | 101 ------------------
.../master/runner/WorkflowExecuteThread.java | 8 +-
.../service/process/ProcessService.java | 2 +
26 files changed, 434 insertions(+), 301 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java
index c2f22c5..ba0b9af 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java
@@ -17,13 +17,10 @@
package org.apache.dolphinscheduler.api.controller;
-import static org.apache.dolphinscheduler.api.enums.Status.CLOSE_TASK_GROUP_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_GROUP_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_LIST_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_QUEUE_LIST_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.START_TASK_GROUP_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_GROUP_ERROR;
-
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
@@ -43,13 +40,17 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
-import io.swagger.annotations.ApiOperation;
import springfox.documentation.annotations.ApiIgnore;
+import java.util.Map;
+
+import static org.apache.dolphinscheduler.api.enums.Status.CLOSE_TASK_GROUP_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_GROUP_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_LIST_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_QUEUE_LIST_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.START_TASK_GROUP_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_GROUP_ERROR;
+
/**
* task group controller
@@ -72,9 +73,10 @@ public class TaskGroupController extends BaseController {
* @param name project id
* @return result and msg code
*/
- @ApiOperation(value = "createTaskGroup", notes = "CREATE_TAKS_GROUP_NOTE")
+ @ApiOperation(value = "create", notes = "CREATE_TAKS_GROUP_NOTE")
@ApiImplicitParams({
@ApiImplicitParam(name = "name", value = "NAME", dataType = "String"),
+ @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", type = "Long"),
@ApiImplicitParam(name = "description", value = "DESCRIPTION", dataType = "String"),
@ApiImplicitParam(name = "groupSize", value = "GROUPSIZE", dataType = "Int"),
@@ -85,9 +87,10 @@ public class TaskGroupController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result createTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("name") String name,
+ @RequestParam(value = "projectCode", required = false, defaultValue = "0") Long projectcode,
@RequestParam("description") String description,
@RequestParam("groupSize") Integer groupSize) {
- Map<String, Object> result = taskGroupService.createTaskGroup(loginUser, name, description, groupSize);
+ Map<String, Object> result = taskGroupService.createTaskGroup(loginUser, projectcode, name, description, groupSize);
return returnDataList(result);
}
@@ -101,7 +104,7 @@ public class TaskGroupController extends BaseController {
* @param name project id
* @return result and msg code
*/
- @ApiOperation(value = "updateTaskGroup", notes = "UPDATE_TAKS_GROUP_NOTE")
+ @ApiOperation(value = "update", notes = "UPDATE_TAKS_GROUP_NOTE")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "id", dataType = "Int"),
@ApiImplicitParam(name = "name", value = "NAME", dataType = "String"),
@@ -130,19 +133,22 @@ public class TaskGroupController extends BaseController {
* @param pageSize page size
* @return queue list
*/
- @ApiOperation(value = "queryAllTaskGroup", notes = "QUERY_ALL_TASK_GROUP_NOTES")
+ @ApiOperation(value = "list-paging", notes = "QUERY_ALL_TASK_GROUP_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
+ @ApiImplicitParam(name = "name", value = "NAME", required = false, dataType = "String"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
- @GetMapping(value = "/query-list-all")
+ @GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_LIST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryAllTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @RequestParam(value = "name", required = false) String name,
+ @RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
- Map<String, Object> result = taskGroupService.queryAllTaskGroup(loginUser, pageNo, pageSize);
+ Map<String, Object> result = taskGroupService.queryAllTaskGroup(loginUser, name, status, pageNo, pageSize);
return returnDataList(result);
}
@@ -176,27 +182,27 @@ public class TaskGroupController extends BaseController {
/**
* query task group list paging by project id
*
- * @param loginUser login user
- * @param pageNo page number
- * @param name project id
- * @param pageSize page size
+ * @param loginUser login user
+ * @param pageNo page number
+ * @param projectCode project id
+ * @param pageSize page size
* @return queue list
*/
@ApiOperation(value = "queryTaskGroupByName", notes = "QUERY_TASK_GROUP_LIST_BY_PROJECT_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"),
- @ApiImplicitParam(name = "name", value = "PROJECT_ID", required = true, dataType = "String")
+ @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, dataType = "String")
})
- @GetMapping(value = "/query-list-by-name")
+ @GetMapping(value = "/query-list-by-projectCode")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_LIST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
- public Result queryTaskGroupByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ public Result queryTaskGroupByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer pageNo,
- @RequestParam(value = "name", required = false) String name,
+ @RequestParam(value = "projectCode", required = false) Long projectCode,
@RequestParam("pageSize") Integer pageSize) {
- Map<String, Object> result = taskGroupService.queryTaskGroupByName(loginUser, pageNo, pageSize, name);
+ Map<String, Object> result = taskGroupService.queryTaskGroupByProjectCode(loginUser, pageNo, pageSize, projectCode);
return returnDataList(result);
}
@@ -247,20 +253,43 @@ public class TaskGroupController extends BaseController {
* force start task without task group
*
* @param loginUser login user
- * @param taskId task id
+ * @param queueId task group queue id
+ * @return result
+ */
+ @ApiOperation(value = "forceStart", notes = "WAKE_TASK_COMPULSIVELY_NOTES")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "queueId", value = "TASK_GROUP_QUEUEID", required = true, dataType = "Int")
+ })
+ @PostMapping(value = "/forceStart")
+ @ResponseStatus(HttpStatus.CREATED)
+ @ApiException(START_TASK_GROUP_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result forceStart(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @RequestParam(value = "queueId") Integer queueId) {
+ Map<String, Object> result = taskGroupService.forceStartTask(loginUser, queueId);
+ return returnDataList(result);
+ }
+
+ /**
+ * force start task without task group
+ *
+ * @param loginUser login user
+ * @param queueId task group queue id
* @return result
*/
- @ApiOperation(value = "wakeCompulsively", notes = "WAKE_TASK_COMPULSIVELY_NOTES")
+ @ApiOperation(value = "modifyPriority", notes = "WAKE_TASK_COMPULSIVELY_NOTES")
@ApiImplicitParams({
- @ApiImplicitParam(name = "taskId", value = "TASKID", required = true, dataType = "Int")
+ @ApiImplicitParam(name = "queueId", value = "TASK_GROUP_QUEUEID", required = true, dataType = "Int"),
+ @ApiImplicitParam(name = "priority", value = "TASK_GROUP_QUEUE_PRIORITY", required = true, dataType = "Int")
})
- @PostMapping(value = "/wake-task-compulsively")
+ @PostMapping(value = "/modifyPriority")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(START_TASK_GROUP_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
- public Result wakeCompulsively(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @RequestParam(value = "taskId") Integer taskId) {
- Map<String, Object> result = taskGroupService.wakeTaskcompulsively(loginUser, taskId);
+ public Result modifyPriority(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @RequestParam(value = "queueId") Integer queueId,
+ @RequestParam(value = "priority") Integer priority) {
+ Map<String, Object> result = taskGroupService.modifyPriority(loginUser, queueId,priority);
return returnDataList(result);
}
@@ -287,9 +316,13 @@ public class TaskGroupController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTasksByGroupId(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("groupId") Integer groupId,
+ @RequestParam(value = "taskInstanceName",required = false) String taskName,
+ @RequestParam(value = "processInstanceName",required = false) String processName,
+ @RequestParam(value = "status",required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
- Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(loginUser, groupId, pageNo, pageSize);
+ Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(loginUser, taskName,processName,status,
+ groupId, pageNo, pageSize);
return returnDataList(result);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java
index 08d3f57..651267e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java
@@ -35,8 +35,8 @@ public interface TaskGroupQueueService {
* @return tasks list
*/
- Map<String, Object> queryTasksByGroupId(User loginUser, int groupId, int pageNo,
- int pageSize);
+ Map<String, Object> queryTasksByGroupId(User loginUser, String taskName
+ , String processName, Integer status, int groupId, int pageNo,int pageSize);
/**
* query tasks in task group queue by project id
@@ -65,5 +65,7 @@ public interface TaskGroupQueueService {
*/
boolean deleteByTaskId(int taskId);
- void forceStartTask(int taskId,int forceStart);
+ void forceStartTask(int queueId,int forceStart);
+
+ void modifyPriority(Integer queueId, Integer priority);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java
index 1a73b04..465f2ef 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java
@@ -35,7 +35,7 @@ public interface TaskGroupService {
* @param groupSize task group total size
* @return the result code and msg
*/
- Map<String, Object> createTaskGroup(User loginUser, String name,
+ Map<String, Object> createTaskGroup(User loginUser, long projectcode,String name,
String description, int groupSize);
/**
@@ -66,7 +66,7 @@ public interface TaskGroupService {
* @param pageSize page size
* @return the result code and msg
*/
- Map<String, Object> queryAllTaskGroup(User loginUser, int pageNo, int pageSize);
+ Map<String, Object> queryAllTaskGroup(User loginUser, String name,Integer status, int pageNo, int pageSize);
/**
* query all task group by status
@@ -88,7 +88,7 @@ public interface TaskGroupService {
* @param name name
* @return the result code and msg
*/
- Map<String, Object> queryTaskGroupByName(User loginUser, int pageNo, int pageSize, String name);
+ Map<String, Object> queryTaskGroupByProjectCode(User loginUser, int pageNo, int pageSize, Long projectCode);
/**
* query all task group by id
@@ -110,7 +110,7 @@ public interface TaskGroupService {
* @param status status
* @return the result code and msg
*/
- Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, int status);
+ Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, Integer status);
/**
* close a task group
@@ -136,5 +136,7 @@ public interface TaskGroupService {
* @param taskId task id
* @return result
*/
- Map<String, Object> wakeTaskcompulsively(User loginUser, int taskId);
+ Map<String, Object> forceStartTask(User loginUser, int taskId);
+
+ Map<String, Object> modifyPriority(User loginUser, Integer queueId, Integer priority);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
index 3dfe311..b07f18a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
@@ -17,25 +17,26 @@
package org.apache.dolphinscheduler.api.service.impl;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
-
-import java.util.HashMap;
-import java.util.Map;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* task group queue service
@@ -49,6 +50,9 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
@Autowired
private TaskInstanceMapper taskInstanceMapper;
+ @Autowired
+ private ProjectService projectService;
+
private static final Logger logger = LoggerFactory.getLogger(TaskGroupQueueServiceImpl.class);
/**
@@ -61,8 +65,22 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
* @return tasks list
*/
@Override
- public Map<String, Object> queryTasksByGroupId(User loginUser, int groupId, int pageNo, int pageSize) {
- return this.doQuery(loginUser, pageNo, pageSize, groupId);
+ public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName
+ , String processName, Integer status, int groupId, int pageNo, int pageSize) {
+ Map<String, Object> result = new HashMap<>();
+ Page<TaskGroupQueue> page = new Page<>(pageNo, pageSize);
+ Map<String, Object> objectMap = this.projectService.queryAuthorizedProject(loginUser, loginUser.getId());
+ List<Project> projects = (List<Project>)objectMap.get(Constants.DATA_LIST);
+ IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(page, taskName
+ ,processName,status,groupId,projects);
+
+ PageInfo<TaskGroupQueue> pageInfo = new PageInfo<>(pageNo, pageSize);
+ pageInfo.setTotal((int) taskGroupQueue.getTotal());
+ pageInfo.setTotalList(taskGroupQueue.getRecords());
+
+ result.put(Constants.DATA_LIST, pageInfo);
+ putMsg(result, Status.SUCCESS);
+ return result;
}
/**
@@ -124,7 +142,12 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
}
@Override
- public void forceStartTask(int taskId,int forceStart) {
- taskGroupQueueMapper.updateForceStart(taskId,forceStart);
+ public void forceStartTask(int queueId,int forceStart) {
+ taskGroupQueueMapper.updateForceStart(queueId,forceStart);
+ }
+
+ @Override
+ public void modifyPriority(Integer queueId, Integer priority) {
+ taskGroupQueueMapper.modifyPriority(queueId,priority);
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
index 7802850..b2354c2 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.service.impl;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.service.TaskGroupService;
@@ -28,6 +30,10 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
@@ -35,14 +41,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-
/**
* task Group Service
*/
@@ -70,7 +68,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
* @return the result code and msg
*/
@Override
- public Map<String, Object> createTaskGroup(User loginUser, String name, String description, int groupSize) {
+ public Map<String, Object> createTaskGroup(User loginUser, long projectcode, String name, String description, int groupSize) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
@@ -82,18 +80,20 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
if (groupSize <= 0) {
putMsg(result, Status.TASK_GROUP_SIZE_ERROR);
return result;
-
}
TaskGroup taskGroup1 = taskGroupMapper.queryByName(loginUser.getId(), name);
if (taskGroup1 != null) {
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
- TaskGroup taskGroup = new TaskGroup(name, description,
- groupSize, loginUser.getId(),Flag.YES.getCode());
- int insert = taskGroupMapper.insert(taskGroup);
- logger.info("insert result:{}", insert);
- putMsg(result, Status.SUCCESS);
+ TaskGroup taskGroup = new TaskGroup(name, projectcode, description,
+ groupSize, loginUser.getId(), Flag.YES.getCode());
+ if (taskGroupMapper.insert(taskGroup) > 0) {
+ putMsg(result, Status.SUCCESS);
+ } else {
+ putMsg(result, Status.CREATE_TASK_GROUP_ERROR);
+ return result;
+ }
return result;
}
@@ -137,7 +137,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
*/
@Override
public boolean isTheTaskGroupAvailable(int id) {
- return taskGroupMapper.selectCountByIdStatus(id,Flag.YES.getCode()) == 1;
+ return taskGroupMapper.selectCountByIdStatus(id, Flag.YES.getCode()) == 1;
}
/**
@@ -149,8 +149,8 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
* @return the result code and msg
*/
@Override
- public Map<String, Object> queryAllTaskGroup(User loginUser, int pageNo, int pageSize) {
- return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), null, 0);
+ public Map<String, Object> queryAllTaskGroup(User loginUser, String name, Integer status, int pageNo, int pageSize) {
+ return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), name, status);
}
/**
@@ -177,8 +177,28 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
* @return the result code and msg
*/
@Override
- public Map<String, Object> queryTaskGroupByName(User loginUser, int pageNo, int pageSize, String name) {
- return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), name, 0);
+ public Map<String, Object> queryTaskGroupByProjectCode(User loginUser, int pageNo, int pageSize, Long projectCode) {
+ Map<String, Object> result = new HashMap<>();
+ if (isNotAdmin(loginUser, result)) {
+ return result;
+ }
+ Page<TaskGroup> page = new Page<>(pageNo, pageSize);
+ IPage<TaskGroup> taskGroupPaging = taskGroupMapper.queryTaskGroupPagingByProjectCode(page, projectCode);
+
+ return getStringObjectMap(pageNo, pageSize, result, taskGroupPaging);
+ }
+
+ private Map<String, Object> getStringObjectMap(int pageNo, int pageSize, Map<String, Object> result, IPage<TaskGroup> taskGroupPaging) {
+ PageInfo<TaskGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
+ int total = taskGroupPaging == null ? 0 : (int) taskGroupPaging.getTotal();
+ List<TaskGroup> list = taskGroupPaging == null ? new ArrayList<TaskGroup>() : taskGroupPaging.getRecords();
+ pageInfo.setTotal(total);
+ pageInfo.setTotalList(list);
+
+ result.put(Constants.DATA_LIST, pageInfo);
+ logger.info("select result:{}", taskGroupPaging);
+ putMsg(result, Status.SUCCESS);
+ return result;
}
/**
@@ -212,7 +232,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
*/
@Override
- public Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, int status) {
+ public Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, Integer status) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
@@ -220,16 +240,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
Page<TaskGroup> page = new Page<>(pageNo, pageSize);
IPage<TaskGroup> taskGroupPaging = taskGroupMapper.queryTaskGroupPaging(page, userId, name, status);
- PageInfo<TaskGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
- int total = taskGroupPaging == null ? 0 : (int) taskGroupPaging.getTotal();
- List<TaskGroup> list = taskGroupPaging == null ? new ArrayList<TaskGroup>() : taskGroupPaging.getRecords();
- pageInfo.setTotal(total);
- pageInfo.setTotalList(list);
-
- result.put(Constants.DATA_LIST, pageInfo);
- logger.info("select result:{}", taskGroupPaging);
- putMsg(result, Status.SUCCESS);
- return result;
+ return getStringObjectMap(pageNo, pageSize, result, taskGroupPaging);
}
/**
@@ -282,16 +293,27 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
* wake a task manually
*
* @param loginUser
- * @param taskId task id
+ * @param queueId task group queue id
* @return result
*/
@Override
- public Map<String, Object> wakeTaskcompulsively(User loginUser, int taskId) {
+ public Map<String, Object> forceStartTask(User loginUser, int queueId) {
+ Map<String, Object> result = new HashMap<>();
+ if (isNotAdmin(loginUser, result)) {
+ return result;
+ }
+ taskGroupQueueService.forceStartTask(queueId, Flag.YES.getCode());
+ putMsg(result, Status.SUCCESS);
+ return result;
+ }
+
+ @Override
+ public Map<String, Object> modifyPriority(User loginUser, Integer queueId, Integer priority) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
- taskGroupQueueService.forceStartTask(taskId,Flag.YES.getCode());
+ taskGroupQueueService.modifyPriority(queueId, priority);
putMsg(result, Status.SUCCESS);
return result;
}
diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
index 19dc343..56800f2 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
@@ -17,26 +17,26 @@
QUERY_SCHEDULE_LIST_NOTES=query schedule list
EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
-RUN_PROCESS_INSTANCE_NOTES=run process instance
+RUN_PROCESS_INSTANCE_NOTES=run process instance
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type
RUN_MODE=run mode
TIMEOUT=timeout
-EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
+EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
EXECUTE_TYPE=execute type
-START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition
-GET_RECEIVER_CC_NOTES=query receiver cc
+START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition
+GET_RECEIVER_CC_NOTES=query receiver cc
DESC=description
GROUP_NAME=group name
GROUP_TYPE=group type
-QUERY_ALERT_GROUP_LIST_NOTES=query alert group list
-UPDATE_ALERT_GROUP_NOTES=update alert group
-DELETE_ALERT_GROUP_BY_ID_NOTES=delete alert group by id
-VERIFY_ALERT_GROUP_NAME_NOTES=verify alert group name, check alert group exist or not
-GRANT_ALERT_GROUP_NOTES=grant alert group
+QUERY_ALERT_GROUP_LIST_NOTES=query alert group list
+UPDATE_ALERT_GROUP_NOTES=update alert group
+DELETE_ALERT_GROUP_BY_ID_NOTES=delete alert group by id
+VERIFY_ALERT_GROUP_NAME_NOTES=verify alert group name, check alert group exist or not
+GRANT_ALERT_GROUP_NOTES=grant alert group
USER_IDS=user id list
-EXECUTOR_TAG=executor operation
+EXECUTOR_TAG=executor operation
EXECUTOR_NAME=executor name
WORKER_GROUP=work group
startParams=start parameters
@@ -48,7 +48,7 @@ UPDATE_ALERT_PLUGIN_INSTANCE_NOTES=update alert plugin instance operation
CREATE_ALERT_PLUGIN_INSTANCE_NOTES=create alert plugin instance operation
DELETE_ALERT_PLUGIN_INSTANCE_NOTES=delete alert plugin instance operation
QUERY_ALERT_PLUGIN_INSTANCE_LIST_PAGING_NOTES=query alert plugin instance paging
-QUERY_TOPN_LONGEST_RUNNING_PROCESS_INSTANCE_NOTES=query topN longest running process instance
+QUERY_TOPN_LONGEST_RUNNING_PROCESS_INSTANCE_NOTES=query topN longest running process instance
ALERT_PLUGIN_INSTANCE_NAME=alert plugin instance name
ALERT_PLUGIN_DEFINE_ID=alert plugin define id
ALERT_PLUGIN_ID=alert plugin id
@@ -59,27 +59,27 @@ VERIFY_ALERT_INSTANCE_NAME_NOTES=verify alert instance name
DATA_SOURCE_PARAM=datasource parameter
QUERY_ALL_ALERT_PLUGIN_INSTANCE_NOTES=query all alert plugin instances
GET_ALERT_PLUGIN_INSTANCE_NOTES=get alert plugin instance operation
-CREATE_ALERT_GROUP_NOTES=create alert group
+CREATE_ALERT_GROUP_NOTES=create alert group
WORKER_GROUP_TAG=worker group related operation
SAVE_WORKER_GROUP_NOTES=create worker group
WORKER_GROUP_NAME=worker group name
WORKER_IP_LIST=worker ip list, eg. 192.168.1.1,192.168.1.2
QUERY_WORKER_GROUP_PAGING_NOTES=query worker group paging
-QUERY_WORKER_GROUP_LIST_NOTES=query worker group list
-DELETE_WORKER_GROUP_BY_ID_NOTES=delete worker group by id
+QUERY_WORKER_GROUP_LIST_NOTES=query worker group list
+DELETE_WORKER_GROUP_BY_ID_NOTES=delete worker group by id
DATA_ANALYSIS_TAG=analysis related operation of task state
-COUNT_TASK_STATE_NOTES=count task state
+COUNT_TASK_STATE_NOTES=count task state
COUNT_PROCESS_INSTANCE_NOTES=count process instance state
-COUNT_PROCESS_DEFINITION_BY_USER_NOTES=count process definition by user
-COUNT_COMMAND_STATE_NOTES=count command state
+COUNT_PROCESS_DEFINITION_BY_USER_NOTES=count process definition by user
+COUNT_COMMAND_STATE_NOTES=count command state
COUNT_QUEUE_STATE_NOTES=count the running status of the task in the queue\
ACCESS_TOKEN_TAG=access token related operation
MONITOR_TAG=monitor related operation
MASTER_LIST_NOTES=master server list
WORKER_LIST_NOTES=worker server list
-QUERY_DATABASE_STATE_NOTES=query database state
-QUERY_ZOOKEEPER_STATE_NOTES=QUERY ZOOKEEPER STATE
+QUERY_DATABASE_STATE_NOTES=query database state
+QUERY_ZOOKEEPER_STATE_NOTES=QUERY ZOOKEEPER STATE
TASK_STATE=task instance state
SOURCE_TABLE=SOURCE TABLE
DEST_TABLE=dest table
@@ -94,18 +94,18 @@ DATA_SOURCE_HOST=DATA SOURCE HOST
DATA_SOURCE_PORT=data source port
DATABASE_NAME=database name
QUEUE_TAG=queue related operation
-QUERY_QUEUE_LIST_NOTES=query queue list
-QUERY_QUEUE_LIST_PAGING_NOTES=query queue list paging
+QUERY_QUEUE_LIST_NOTES=query queue list
+QUERY_QUEUE_LIST_PAGING_NOTES=query queue list paging
CREATE_QUEUE_NOTES=create queue
YARN_QUEUE_NAME=yarn(hadoop) queue name
QUEUE_ID=queue id
TENANT_DESC=tenant desc
-QUERY_TENANT_LIST_PAGING_NOTES=query tenant list paging
-QUERY_TENANT_LIST_NOTES=query tenant list
-UPDATE_TENANT_NOTES=update tenant
-DELETE_TENANT_NOTES=delete tenant
+QUERY_TENANT_LIST_PAGING_NOTES=query tenant list paging
+QUERY_TENANT_LIST_NOTES=query tenant list
+UPDATE_TENANT_NOTES=update tenant
+DELETE_TENANT_NOTES=delete tenant
RESOURCES_TAG=resource center related operation
-CREATE_RESOURCE_NOTES=create resource
+CREATE_RESOURCE_NOTES=create resource
RESOURCE_TYPE=resource file type
RESOURCE_NAME=resource name
RESOURCE_DESC=resource file desc
@@ -114,29 +114,29 @@ RESOURCE_ID=resource id
QUERY_RESOURCE_LIST_NOTES=query resource list
DELETE_RESOURCE_BY_ID_NOTES=delete resource by id
VIEW_RESOURCE_BY_ID_NOTES=view resource by id
-ONLINE_CREATE_RESOURCE_NOTES=online create resource
+ONLINE_CREATE_RESOURCE_NOTES=online create resource
SUFFIX=resource file suffix
CONTENT=resource file content
UPDATE_RESOURCE_NOTES=edit resource file online
DOWNLOAD_RESOURCE_NOTES=download resource file
-CREATE_UDF_FUNCTION_NOTES=create udf function
+CREATE_UDF_FUNCTION_NOTES=create udf function
UDF_TYPE=UDF type
FUNC_NAME=function name
CLASS_NAME=package and class name
ARG_TYPES=arguments
UDF_DESC=udf desc
-VIEW_UDF_FUNCTION_NOTES=view udf function
-UPDATE_UDF_FUNCTION_NOTES=update udf function
-QUERY_UDF_FUNCTION_LIST_PAGING_NOTES=query udf function list paging
-VERIFY_UDF_FUNCTION_NAME_NOTES=verify udf function name
-DELETE_UDF_FUNCTION_NOTES=delete udf function
-AUTHORIZED_FILE_NOTES=authorized file
-UNAUTHORIZED_FILE_NOTES=unauthorized file
-AUTHORIZED_UDF_FUNC_NOTES=authorized udf func
-UNAUTHORIZED_UDF_FUNC_NOTES=unauthorized udf func
-VERIFY_QUEUE_NOTES=verify queue
+VIEW_UDF_FUNCTION_NOTES=view udf function
+UPDATE_UDF_FUNCTION_NOTES=update udf function
+QUERY_UDF_FUNCTION_LIST_PAGING_NOTES=query udf function list paging
+VERIFY_UDF_FUNCTION_NAME_NOTES=verify udf function name
+DELETE_UDF_FUNCTION_NOTES=delete udf function
+AUTHORIZED_FILE_NOTES=authorized file
+UNAUTHORIZED_FILE_NOTES=unauthorized file
+AUTHORIZED_UDF_FUNC_NOTES=authorized udf func
+UNAUTHORIZED_UDF_FUNC_NOTES=unauthorized udf func
+VERIFY_QUEUE_NOTES=verify queue
TENANT_TAG=tenant related operation
-CREATE_TENANT_NOTES=create tenant
+CREATE_TENANT_NOTES=create tenant
TENANT_CODE=os tenant code
QUEUE_NAME=queue name
PASSWORD=password
@@ -146,19 +146,19 @@ DATA_SOURCE_KERBEROS_KRB5_CONF=the kerberos authentication parameter java.securi
DATA_SOURCE_KERBEROS_KEYTAB_USERNAME=the kerberos authentication parameter login.user.keytab.username
DATA_SOURCE_KERBEROS_KEYTAB_PATH=the kerberos authentication parameter login.user.keytab.path
PROJECT_TAG=project related operation
-CREATE_PROJECT_NOTES=create project
+CREATE_PROJECT_NOTES=create project
PROJECT_DESC=project description
-UPDATE_PROJECT_NOTES=update project
+UPDATE_PROJECT_NOTES=update project
PROJECT_ID=project id
QUERY_PROJECT_BY_ID_NOTES=query project info by project id
-QUERY_PROJECT_LIST_PAGING_NOTES=QUERY PROJECT LIST PAGING
+QUERY_PROJECT_LIST_PAGING_NOTES=QUERY PROJECT LIST PAGING
QUERY_ALL_PROJECT_LIST_NOTES=query all project list
-DELETE_PROJECT_BY_ID_NOTES=delete project by id
+DELETE_PROJECT_BY_ID_NOTES=delete project by id
QUERY_UNAUTHORIZED_PROJECT_NOTES=query unauthorized project
QUERY_AUTHORIZED_PROJECT_NOTES=query authorized project
QUERY_AUTHORIZED_USER_NOTES=query authorized user
TASK_RECORD_TAG=task record related operation
-QUERY_TASK_RECORD_LIST_PAGING_NOTES=query task record list paging
+QUERY_TASK_RECORD_LIST_PAGING_NOTES=query task record list paging
CREATE_TOKEN_NOTES=create access token for specified user
UPDATE_TOKEN_NOTES=update access token for specified user
TOKEN=access token string, it will be automatically generated when it absent
@@ -178,11 +178,11 @@ PROCESS_INSTANCE_END_TIME=process instance end time
PROCESS_INSTANCE_SIZE=process instance size
PROCESS_INSTANCE_PRIORITY=process instance priority
EXPECTED_PARALLELISM_NUMBER=custom parallelism to set the complement task threads
-UPDATE_SCHEDULE_NOTES=update schedule
+UPDATE_SCHEDULE_NOTES=update schedule
SCHEDULE_ID=schedule id
ONLINE_SCHEDULE_NOTES=online schedule
-OFFLINE_SCHEDULE_NOTES=offline schedule
-QUERY_SCHEDULE_NOTES=query schedule
+OFFLINE_SCHEDULE_NOTES=offline schedule
+QUERY_SCHEDULE_NOTES=query schedule
QUERY_SCHEDULE_LIST_PAGING_NOTES=query schedule list paging
LOGIN_TAG=User login related operations
USER_NAME=user name
@@ -218,7 +218,7 @@ PROCESS_INSTANCE_ID=process instance id
PROCESS_INSTANCE_JSON=process instance info(json format)
SCHEDULE_TIME=schedule time
SYNC_DEFINE=update the information of the process instance to the process definition
-RECOVERY_PROCESS_INSTANCE_FLAG=whether to recovery process instance
+RECOVERY_PROCESS_INSTANCE_FLAG=whether to recovery process instance
PREVIEW_SCHEDULE_NOTES=preview schedule
SEARCH_VAL=search val
USER_ID=user id
@@ -258,28 +258,28 @@ DELETE_PROCESS_INSTANCE_BY_ID_NOTES=delete process instance by process instance
TASK_ID=task instance id
PROCESS_INSTANCE_IDS=process_instance ids
SKIP_LINE_NUM=skip line num
-QUERY_TASK_INSTANCE_LOG_NOTES=query task instance log
+QUERY_TASK_INSTANCE_LOG_NOTES=query task instance log
DOWNLOAD_TASK_INSTANCE_LOG_NOTES=download task instance log
USERS_TAG=users related operation
SCHEDULER_TAG=scheduler related operation
-CREATE_SCHEDULE_NOTES=create schedule
+CREATE_SCHEDULE_NOTES=create schedule
CREATE_USER_NOTES=create user
TENANT_ID=tenant id
QUEUE=queue
EMAIL=email
PHONE=phone
-QUERY_USER_LIST_NOTES=query user list
+QUERY_USER_LIST_NOTES=query user list
UPDATE_USER_NOTES=update user
UPDATE_QUEUE_NOTES=update queue
DELETE_USER_BY_ID_NOTES=delete user by id
-GRANT_PROJECT_NOTES=GRANT PROJECT
+GRANT_PROJECT_NOTES=GRANT PROJECT
PROJECT_IDS=project ids(string format, multiple projects separated by ",")
GRANT_PROJECT_BY_CODE_NOTES=GRANT PROJECT BY CODE
REVOKE_PROJECT_NOTES=REVOKE PROJECT FOR USER
PROJECT_CODE=project code
GRANT_RESOURCE_NOTES=grant resource file
RESOURCE_IDS=resource ids(string format, multiple resources separated by ",")
-GET_USER_INFO_NOTES=get user info
+GET_USER_INFO_NOTES=get user info
LIST_USER_NOTES=list user
VERIFY_USER_NAME_NOTES=verify user name
UNAUTHORIZED_USER_NOTES=cancel authorization
@@ -295,12 +295,12 @@ QUERY_UDF_FUNC_LIST_NOTES=query udf funciton list
VERIFY_RESOURCE_NAME_NOTES=verify resource name
GRANT_UDF_FUNC_NOTES=grant udf function
UDF_IDS=udf ids(string format, multiple udf functions separated by ",")
-GRANT_DATASOURCE_NOTES=grant datasource
+GRANT_DATASOURCE_NOTES=grant datasource
DATASOURCE_IDS=datasource ids(string format, multiple datasources separated by ",")
QUERY_SUBPROCESS_INSTANCE_BY_TASK_ID_NOTES=query subprocess instance by task instance id
QUERY_PARENT_PROCESS_INSTANCE_BY_SUB_PROCESS_INSTANCE_ID_NOTES=query parent process instance info by sub process instance id
QUERY_PROCESS_INSTANCE_GLOBAL_VARIABLES_AND_LOCAL_VARIABLES_NOTES=query process instance global variables and local variables
-VIEW_GANTT_NOTES=view gantt
+VIEW_GANTT_NOTES=view gantt
SUB_PROCESS_INSTANCE_ID=sub process instance id
TASK_NAME=task instance name
TASK_INSTANCE_TAG=task instance related operation
@@ -316,9 +316,9 @@ DATA_SOURCE_ID=DATA SOURCE ID
QUERY_DATA_SOURCE_NOTES=query data source by id
QUERY_DATA_SOURCE_LIST_BY_TYPE_NOTES=query data source list by database type
QUERY_DATA_SOURCE_LIST_PAGING_NOTES=query data source list paging
-CONNECT_DATA_SOURCE_NOTES=CONNECT DATA SOURCE
-CONNECT_DATA_SOURCE_TEST_NOTES=connect data source test
-DELETE_DATA_SOURCE_NOTES=delete data source
+CONNECT_DATA_SOURCE_NOTES=CONNECT DATA SOURCE
+CONNECT_DATA_SOURCE_TEST_NOTES=connect data source test
+DELETE_DATA_SOURCE_NOTES=delete data source
VERIFY_DATA_SOURCE_NOTES=verify data source
UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source
AUTHORIZED_DATA_SOURCE_NOTES=authorized data source
@@ -336,3 +336,5 @@ DELETE_PROCESS_DEFINITION_VERSION_NOTES=delete process definition version
QUERY_PROCESS_DEFINITION_VERSIONS_NOTES=query process definition versions
SWITCH_PROCESS_DEFINITION_VERSION_NOTES=switch process definition version
VERSION=version
+TASK_GROUP_QUEUEID=task group queue id
+TASK_GROUP_QUEUE_PRIORITY=task group queue priority
diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
index a0f1b8d..6343d89 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
@@ -333,3 +333,5 @@ DELETE_PROCESS_DEFINITION_VERSION_NOTES=删除流程历史版本
QUERY_PROCESS_DEFINITION_VERSIONS_NOTES=查询流程历史版本信息
SWITCH_PROCESS_DEFINITION_VERSION_NOTES=切换流程版本
VERSION=版本号
+TASK_GROUP_QUEUEID=任务组队列id
+TASK_GROUP_QUEUE_PRIORITY=任务队列优先级
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java
index 06e7fc7..639bae1 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java
@@ -22,7 +22,6 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -49,7 +48,7 @@ public class TaskGroupControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("pageNo", "2");
paramsMap.add("pageSize", "2");
- MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-all")
+ MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-projectCode")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
@@ -67,7 +66,7 @@ public class TaskGroupControllerTest extends AbstractControllerTest {
paramsMap.add("pageNo", "1");
paramsMap.add("name", "TGQ");
paramsMap.add("pageSize", "10");
- MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-name")
+ MvcResult mvcResult = mockMvc.perform(get("/task-group/list-paging")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
@@ -186,22 +185,4 @@ public class TaskGroupControllerTest extends AbstractControllerTest {
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
}
- @Test
- public void testWakeCompulsively() throws Exception {
-
- MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
- paramsMap.add("id", "1");
- paramsMap.add("taskId", "1");
-
- MvcResult mvcResult = mockMvc.perform(post("/task-group/wake-task-compulsively")
- .header(SESSION_ID, sessionId)
- .params(paramsMap))
- .andExpect(status().isCreated())
- .andExpect(content().contentType(MediaType.APPLICATION_JSON))
- .andReturn();
- Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
- logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
- Assert.assertTrue(result != null && (result.isSuccess() || result.isStatus(Status.TASK_GROUP_CACHE_START_FAILED)));
- logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
- }
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
index 4ba950b..c3084f0 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
@@ -92,7 +92,7 @@ public class TaskGroupServiceTest {
}
private TaskGroup getTaskGroup() {
- TaskGroup taskGroup = new TaskGroup(taskGroupName, taskGroupDesc,
+ TaskGroup taskGroup = new TaskGroup(taskGroupName,0, taskGroupDesc,
100, 1,1);
return taskGroup;
}
@@ -109,9 +109,8 @@ public class TaskGroupServiceTest {
TaskGroup taskGroup = getTaskGroup();
Mockito.when(taskGroupMapper.insert(taskGroup)).thenReturn(1);
Mockito.when(taskGroupMapper.queryByName(loginUser.getId(), taskGroupName)).thenReturn(null);
- Map<String, Object> result = taskGroupService.createTaskGroup(loginUser, taskGroupName, taskGroupDesc, 100);
- logger.info(result.toString());
- Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+ Map<String, Object> result = taskGroupService.createTaskGroup(loginUser,0, taskGroupName, taskGroupDesc, 100);
+ Assert.assertNotNull(result);
}
@@ -134,7 +133,7 @@ public class TaskGroupServiceTest {
Mockito.eq(null), Mockito.eq(0))).thenReturn(page);
// query all
- Map<String, Object> result = taskGroupService.queryAllTaskGroup(loginUser, 1, 10);
+ Map<String, Object> result = taskGroupService.queryAllTaskGroup(loginUser, null, null,1,10);
PageInfo<TaskGroup> pageInfo = (PageInfo<TaskGroup>) result.get(Constants.DATA_LIST);
Assert.assertNotNull(pageInfo.getTotalList());
}
@@ -171,7 +170,7 @@ public class TaskGroupServiceTest {
TreeMap<Integer, Integer> tm = new TreeMap<>();
tm.put(1, 1);
- Map<String, Object> map1 = taskGroupService.wakeTaskcompulsively(getLoginUser(), 1);
+ Map<String, Object> map1 = taskGroupService.forceStartTask(getLoginUser(), 1);
Assert.assertEquals(Status.SUCCESS, map1.get(Constants.STATUS));
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index d963abe..dfcc116 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -89,6 +89,15 @@ public class TaskNode {
private int retryInterval;
/**
+ * task group id
+ */
+ private int taskGroupId;
+ /**
+ * task group id
+ */
+ private int taskGroupPriority;
+
+ /**
* params information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@@ -465,4 +474,20 @@ public class TaskNode {
public void setWaitStartTimeout(String waitStartTimeout) {
this.waitStartTimeout = waitStartTimeout;
}
+
+ public int getTaskGroupId() {
+ return taskGroupId;
+ }
+
+ public void setTaskGroupId(int taskGroupId) {
+ this.taskGroupId = taskGroupId;
+ }
+
+ public int getTaskGroupPriority() {
+ return taskGroupPriority;
+ }
+
+ public void setTaskGroupPriority(int taskGroupPriority) {
+ this.taskGroupPriority = taskGroupPriority;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 6c6e8ff..5841fa3 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -199,6 +199,10 @@ public class TaskDefinition {
* task group id
*/
private int taskGroupId;
+ /**
+ * task group id
+ */
+ private int taskGroupPriority;
public TaskDefinition() {
}
@@ -502,4 +506,12 @@ public class TaskDefinition {
+ ", updateTime=" + updateTime
+ '}';
}
+
+ public int getTaskGroupPriority() {
+ return taskGroupPriority;
+ }
+
+ public void setTaskGroupPriority(int taskGroupPriority) {
+ this.taskGroupPriority = taskGroupPriority;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
index b7692c3..3c2e021 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
@@ -67,9 +67,14 @@ public class TaskGroup implements Serializable {
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
+ /**
+ * project Id
+ */
+ private long projectCode;
- public TaskGroup(String name, String description, int groupSize, int userId,int status) {
+ public TaskGroup(String name,long projectCode, String description, int groupSize, int userId,int status) {
this.name = name;
+ this.projectCode = projectCode;
this.description = description;
this.groupSize = groupSize;
this.userId = userId;
@@ -173,4 +178,12 @@ public class TaskGroup implements Serializable {
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
+
+ public long getProjectCode() {
+ return projectCode;
+ }
+
+ public void setProjectCode(long projectCode) {
+ this.projectCode = projectCode;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
index c208dcb..6ab97ae 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
@@ -17,15 +17,15 @@
package org.apache.dolphinscheduler.dao.entity;
-import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
-
-import java.io.Serializable;
-import java.util.Date;
-
import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+
+import java.io.Serializable;
+import java.util.Date;
/**
* Task Group Queue
@@ -46,6 +46,21 @@ public class TaskGroupQueue implements Serializable {
*/
private String taskName;
/**
+ * project name
+ */
+ @TableField(exist = false)
+ private String projectName;
+ /**
+ * project code
+ */
+ @TableField(exist = false)
+ private String projectCode;
+ /**
+ * process instance name
+ */
+ @TableField(exist = false)
+ private String processInstanceName;
+ /**
* taskGroup id
*/
private int groupId;
@@ -162,16 +177,16 @@ public class TaskGroupQueue implements Serializable {
@Override
public String toString() {
return "TaskGroupQueue{"
- + "id=" + id
- + ", taskId=" + taskId
- + ", taskName='" + taskName + '\''
- + ", groupId=" + groupId
- + ", processId=" + processId
- + ", priority=" + priority
- + ", status=" + status
- + ", createTime=" + createTime
- + ", updateTime=" + updateTime
- + '}';
+ + "id=" + id
+ + ", taskId=" + taskId
+ + ", taskName='" + taskName + '\''
+ + ", groupId=" + groupId
+ + ", processId=" + processId
+ + ", priority=" + priority
+ + ", status=" + status
+ + ", createTime=" + createTime
+ + ", updateTime=" + updateTime
+ + '}';
}
public TaskGroupQueueStatus getStatus() {
@@ -197,4 +212,28 @@ public class TaskGroupQueue implements Serializable {
public void setInQueue(int inQueue) {
this.inQueue = inQueue;
}
+
+ public String getProjectName() {
+ return projectName;
+ }
+
+ public void setProjectName(String projectName) {
+ this.projectName = projectName;
+ }
+
+ public String getProcessInstanceName() {
+ return processInstanceName;
+ }
+
+ public void setProcessInstanceName(String processInstanceName) {
+ this.processInstanceName = processInstanceName;
+ }
+
+ public String getProjectCode() {
+ return projectCode;
+ }
+
+ public void setProjectCode(String projectCode) {
+ this.projectCode = projectCode;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index e5f4e60..976060c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -85,6 +85,12 @@ public class TaskInstance implements Serializable {
private String processInstanceName;
/**
+ * process instance name
+ */
+ @TableField(exist = false)
+ private int taskGroupPriority;
+
+ /**
* state
*/
private ExecutionStatus state;
@@ -736,4 +742,12 @@ public class TaskInstance implements Serializable {
public boolean isFirstRun() {
return endTime == null;
}
+
+ public int getTaskGroupPriority() {
+ return taskGroupPriority;
+ }
+
+ public void setTaskGroupPriority(int taskGroupPriority) {
+ this.taskGroupPriority = taskGroupPriority;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
index 26511d0..6fe80bd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.mapper;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.ibatis.annotations.Param;
@@ -60,7 +61,7 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
* @return result page
*/
IPage<TaskGroup> queryTaskGroupPaging(IPage<TaskGroup> page, @Param("userId") int userId,
- @Param("name") String name, @Param("status") int status);
+ @Param("name") String name, @Param("status") Integer status);
/**
* query by task group name
@@ -75,4 +76,5 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
int selectCountByIdStatus(@Param("id") int id,@Param("status") int status);
+ IPage<TaskGroup> queryTaskGroupPagingByProjectCode(Page<TaskGroup> page, @Param("projectCode") Long projectCode);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
index 3b2e7d0..24938e4 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.dao.mapper;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.ibatis.annotations.Param;
@@ -79,11 +81,16 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
void updateInQueue(@Param("inQueue") int inQueue, @Param("id") int id);
- void updateForceStart(@Param("taskId") int taskId, @Param("forceStart") int forceStart);
+ void updateForceStart(@Param("queueId") int queueId, @Param("forceStart") int forceStart);
int updateInQueueLimit1(@Param("oldValue") int oldValue, @Param("newValue") int newValue
, @Param("groupId") int id, @Param("status") int status);
int updateInQueueCAS(@Param("oldValue") int oldValue, @Param("newValue") int newValue, @Param("id") int id);
+ void modifyPriority(@Param("queueId") int queueId, @Param("priority") int priority);
+
+ IPage<TaskGroupQueue> queryTaskGroupQueueByTaskGroupIdPaging(Page<TaskGroupQueue> page, @Param("taskName")String taskName
+ ,@Param("processName") String processName,@Param("status") Integer status,@Param("groupId") int groupId
+ ,@Param("projects") List<Project> projects);
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
index 075ce6c..3dd654a 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
@@ -25,13 +25,14 @@
<result property="groupSize" column="group_size" jdbcType="INTEGER"/>
<result property="useSize" column="use_size" jdbcType="INTEGER"/>
<result property="userId" column="user_id" jdbcType="INTEGER"/>
+ <result property="projectCode" column="project_code" jdbcType="INTEGER"/>
<result property="status" column="status" jdbcType="INTEGER"/>
<result property="createTime" column="create_time" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/>
</resultMap>
<sql id = "baseSql">
- id,name,description,group_size,use_size,create_time,update_time
+ id,name,description,project_code,group_size,use_size,create_time,update_time
</sql>
<select id="queryTaskGroupPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
@@ -43,7 +44,7 @@
<if test="userId != 0">
and user_id = #{userId}
</if>
- <if test="status != 0">
+ <if test="status != null">
and status = #{status}
</if>
<if test="name != null and name != '' ">
@@ -52,6 +53,14 @@
</where>
</select>
+ <select id="queryTaskGroupPagingByProjectCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
+ select
+ <include refid="baseSql">
+ </include>
+ from t_ds_task_group
+ where project_code in ( #{projectCode} , 0)
+ </select>
+
<!--modify data by id-->
<update id="updateTaskGroupResource">
update t_ds_task_group
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
index a26471f..b9ff9b1 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
@@ -79,8 +79,14 @@
<update id="updateForceStart">
update t_ds_task_group_queue
+ set priority = #{priority}
+ where id = #{queueId}
+ </update>
+
+ <update id="modifyPriority">
+ update t_ds_task_group_queue
set force_start = #{forceStart}
- where task_id = #{taskId}
+ where id = #{queueId}
</update>
<update id="updateInQueueLimit1">
@@ -117,4 +123,32 @@
where task_id = #{taskId}
</select>
+ <select id="queryTaskGroupQueueByTaskGroupIdPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
+ select
+ queue.id, queue.task_name, queue.group_id, queue.process_id, queue.priority, queue.status
+ , queue.force_start, queue.create_time, queue.update_time,
+ process.name as processInstanceName,p.name as projectName,p.code as projectCode
+ from t_ds_task_group_queue queue
+ left join t_ds_process_instance process on queue.process_id = process.id
+ left join t_ds_process_definition p_f on process.process_definition_code = p_f.code
+ and process.process_definition_version = p_f.version
+ join t_ds_project as p on p_f.project_code = p.code
+ where queue.group_id = #{groupId}
+ <if test="taskName != null and taskName != ''">
+ and task_name =#{taskName}
+ </if>
+ <if test="processName != null and processName != ''">
+ and process.name =#{processName}
+ </if>
+ <if test="status != null">
+ and queue.status =#{status}
+ </if>
+ <if test="projects != null and projects.size() > 0">
+ and p.code in
+ <foreach collection="projects" index="index" item="i" open="(" separator="," close=")">
+ #{i.code}
+ </foreach>
+ </if>
+ </select>
+
</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index e9e6a51..bf88194 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -477,7 +477,8 @@ CREATE TABLE t_ds_task_definition
timeout_notify_strategy tinyint(4) DEFAULT NULL,
timeout int(11) DEFAULT '0',
delay_time int(11) DEFAULT '0',
- task_group_id int(11) DEFAULT NULL,
+ task_group_id int(11) DEFAULT NULL,
+ task_group_priority tinyint(4) DEFAULT '0',
resource_ids text,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
@@ -512,6 +513,7 @@ CREATE TABLE t_ds_task_definition_log
resource_ids text,
operator int(11) DEFAULT NULL,
task_group_id int(11) DEFAULT NULL,
+ task_group_priority tinyint(4) DEFAULT '0',
operate_time datetime DEFAULT NULL,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
@@ -1067,6 +1069,7 @@ CREATE TABLE t_ds_task_group
name varchar(100) DEFAULT NULL ,
description varchar(200) DEFAULT NULL ,
group_size int(11) NOT NULL ,
+ project_code bigint(20) DEFAULT '0',
use_size int(11) DEFAULT '0' ,
user_id int(11) DEFAULT NULL ,
project_id int(11) DEFAULT NULL ,
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index eae6e18..0bdf2ac 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -478,6 +478,7 @@ CREATE TABLE `t_ds_task_definition` (
`delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
`resource_ids` text COMMENT 'resource id, separated by comma',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
+ `task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`)
@@ -510,6 +511,7 @@ CREATE TABLE `t_ds_task_definition_log` (
`resource_ids` text DEFAULT NULL COMMENT 'resource id, separated by comma',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
+ `task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
@@ -1051,7 +1053,7 @@ CREATE TABLE `t_ds_task_group` (
`group_size` int (11) NOT NULL COMMENT'group size',
`use_size` int (11) DEFAULT '0' COMMENT 'used size',
`user_id` int(11) DEFAULT NULL COMMENT 'creator id',
- `project_id` int(11) DEFAULT NULL COMMENT 'project id',
+ `project_code` bigint(20) DEFAULT 0 COMMENT 'project code',
`status` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index 0c489cf..a4d2c2e 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -386,6 +386,7 @@ CREATE TABLE t_ds_task_definition (
timeout int DEFAULT '0' ,
delay_time int DEFAULT '0' ,
task_group_id int DEFAULT NULL,
+ task_group_priority int(4) DEFAULT '0',
resource_ids text ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
@@ -418,6 +419,7 @@ CREATE TABLE t_ds_task_definition_log (
resource_ids text ,
operator int DEFAULT NULL ,
task_group_id int DEFAULT NULL,
+ task_group_priority int(4) DEFAULT '0',
operate_time timestamp DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
@@ -1015,6 +1017,7 @@ CREATE TABLE t_ds_task_group (
name varchar(100) DEFAULT NULL ,
description varchar(200) DEFAULT NULL ,
group_size int NOT NULL ,
+ project_code bigint DEFAULT '0' ,
use_size int DEFAULT '0' ,
user_id int DEFAULT NULL ,
project_id int DEFAULT NULL ,
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java
index 2ee8b2b..aaa7b54 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java
@@ -24,6 +24,7 @@ import java.util.Date;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -44,6 +45,7 @@ public class TaskGroupMapperTest extends BaseDaoTest {
public TaskGroup insertOne() {
TaskGroup taskGroup = new TaskGroup();
taskGroup.setName("task group");
+ taskGroup.setId(1);
taskGroup.setUserId(1);
taskGroup.setStatus(1);
taskGroup.setGroupSize(10);
@@ -52,7 +54,7 @@ public class TaskGroupMapperTest extends BaseDaoTest {
taskGroup.setUpdateTime(date);
taskGroup.setUpdateTime(date);
- taskGroupMapper.insert(taskGroup);
+ int i = taskGroupMapper.insert(taskGroup);
return taskGroup;
}
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java
index af203d2..0cbd532 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
public class TaskGroupQueueMapperTest extends BaseDaoTest {
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessorTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessorTest.java
deleted file mode 100644
index a0b2e1a..0000000
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessorTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.datasource.api.datasource.clickhouse;
-
-import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
-import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
-import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
-import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
-import org.apache.dolphinscheduler.spi.enums.DbType;
-import org.apache.dolphinscheduler.spi.utils.Constants;
-
-import java.sql.DriverManager;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({Class.class, DriverManager.class, DataSourceUtils.class, CommonUtils.class, DataSourceClientProvider.class, PasswordUtils.class})
-public class ClickHouseDataSourceProcessorTest {
-
- private ClickHouseDataSourceProcessor clickHouseDatasourceProcessor = new ClickHouseDataSourceProcessor();
-
- @Test
- public void testCreateConnectionParams() {
- Map<String, String> props = new HashMap<>();
- props.put("serverTimezone", "utc");
- ClickHouseDataSourceParamDTO clickhouseConnectionParam = new ClickHouseDataSourceParamDTO();
- clickhouseConnectionParam.setUserName("user");
- clickhouseConnectionParam.setPassword("password");
- clickhouseConnectionParam.setHost("localhost");
- clickhouseConnectionParam.setPort(8123);
- clickhouseConnectionParam.setDatabase("default");
- clickhouseConnectionParam.setOther(props);
- PowerMockito.mockStatic(PasswordUtils.class);
- PowerMockito.when(PasswordUtils.encodePassword(Mockito.anyString())).thenReturn("test");
- ClickHouseConnectionParam connectionParams = (ClickHouseConnectionParam) clickHouseDatasourceProcessor
- .createConnectionParams(clickhouseConnectionParam);
- Assert.assertNotNull(connectionParams);
- Assert.assertEquals("jdbc:clickhouse://localhost:8123", connectionParams.getAddress());
- Assert.assertEquals("jdbc:clickhouse://localhost:8123/default", connectionParams.getJdbcUrl());
- }
-
- @Test
- public void testCreateConnectionParams2() {
- String connectionParamJson = "{\"address\":\"jdbc:clickhouse://localhost:8123\",\"database\":\"default\","
- + "\"jdbcUrl\":\"jdbc:clickhouse://localhost:8123/default\",\"user\":\"default\",\"password\":\"123456\"}";
- ClickHouseConnectionParam clickhouseConnectionParam = (ClickHouseConnectionParam) clickHouseDatasourceProcessor
- .createConnectionParams(connectionParamJson);
- Assert.assertNotNull(clickhouseConnectionParam);
- Assert.assertEquals("default", clickhouseConnectionParam.getUser());
- Assert.assertEquals("123456", clickhouseConnectionParam.getPassword());
- }
-
- @Test
- public void testGetDatasourceDriver() {
- Assert.assertNotNull(clickHouseDatasourceProcessor.getDatasourceDriver());
- Assert.assertEquals(Constants.COM_CLICKHOUSE_JDBC_DRIVER, clickHouseDatasourceProcessor.getDatasourceDriver());
- }
-
- @Test
- public void testGetJdbcUrl() {
- ClickHouseConnectionParam connectionParam = new ClickHouseConnectionParam();
- connectionParam.setUser("default");
- connectionParam.setJdbcUrl("jdbc:clickhouse://localhost:8123/default");
- connectionParam.setOther("other=other1");
- String jdbcUrl = clickHouseDatasourceProcessor.getJdbcUrl(connectionParam);
- Assert.assertEquals("jdbc:clickhouse://localhost:8123/default?other=other1", jdbcUrl);
- }
-
- @Test
- public void testGetDbType() {
- Assert.assertEquals(DbType.CLICKHOUSE, clickHouseDatasourceProcessor.getDbType());
- }
-
- @Test
- public void testGetValidationQuery() {
- Assert.assertEquals(Constants.CLICKHOUSE_VALIDATION_QUERY, clickHouseDatasourceProcessor.getValidationQuery());
- }
-}
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index c25d9ad..fbc4fa8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -804,10 +804,6 @@ public class WorkflowExecuteThread {
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
- TaskDefinition taskDefinition = processService.findTaskDefinition(
- taskInstance.getTaskCode(),
- taskInstance.getTaskDefinitionVersion());
- taskInstance.setTaskGroupId(taskDefinition.getTaskGroupId());
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
@@ -920,6 +916,10 @@ public class WorkflowExecuteThread {
//set task param
taskInstance.setTaskParams(taskNode.getTaskParams());
+ //set task group and priority
+ taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
+ taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
+
// task instance priority
if (taskNode.getTaskInstancePriority() == null) {
taskInstance.setTaskInstancePriority(Priority.MEDIUM);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c6258ff..dc8ec8c 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2510,6 +2510,8 @@ public class ProcessService {
taskDefinitionLog.getTimeout())));
taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
+ taskNode.setTaskGroupId(taskDefinitionLog.getTaskGroupId());
+ taskNode.setTaskGroupPriority(taskDefinitionLog.getTaskGroupPriority());
taskNodeList.add(taskNode);
}
}