You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/01/05 14:10:40 UTC
[dolphinscheduler] branch dev updated: [Feature][workflow list edit] add task update with upstream (#7829)
This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new acf9c84 [Feature][workflow list edit] add task update with upstream (#7829)
acf9c84 is described below
commit acf9c84c503f326e16bb328ac3896a3ead66dc58
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Wed Jan 5 22:08:42 2022 +0800
[Feature][workflow list edit] add task update with upstream (#7829)
* add task save and binds workflow
* add task update with upstream
---
.../api/controller/TaskDefinitionController.java | 34 ++++-
.../apache/dolphinscheduler/api/enums/Status.java | 2 +-
.../api/service/TaskDefinitionService.java | 16 ++
.../service/impl/TaskDefinitionServiceImpl.java | 163 +++++++++++++++++----
.../dao/entity/TaskDefinition.java | 35 ++---
.../dolphinscheduler/dao/entity/TaskMainInfo.java | 13 ++
.../dao/mapper/TaskDefinitionMapper.xml | 2 +-
.../2.1.0_schema/mysql/dolphinscheduler_ddl.sql | 4 +
8 files changed, 223 insertions(+), 46 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
index 754ac20..d00a052 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
@@ -153,6 +153,36 @@ public class TaskDefinitionController extends BaseController {
}
/**
+ * update task definition
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param code task definition code
+ * @param taskDefinitionJsonObj task definition json object
+ * @param upstreamCodes upstream task codes, sep comma
+ * @return update result code
+ */
+ @ApiOperation(value = "updateWithUpstream", notes = "UPDATE_TASK_DEFINITION_NOTES")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
+ @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
+ @ApiImplicitParam(name = "taskDefinitionJsonObj", value = "TASK_DEFINITION_JSON", required = true, type = "String"),
+ @ApiImplicitParam(name = "upstreamCodes", value = "UPSTREAM_CODES", required = false, type = "String")
+ })
+ @PutMapping(value = "/{code}/with-upstream")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(UPDATE_TASK_DEFINITION_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result updateTaskWithUpstream(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+ @PathVariable(value = "code") long code,
+ @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj,
+ @RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) {
+ Map<String, Object> result = taskDefinitionService.updateTaskWithUpstream(loginUser, projectCode, code, taskDefinitionJsonObj, upstreamCodes);
+ return returnDataList(result);
+ }
+
+ /**
* query task definition version paging list info
*
* @param loginUser login user info
@@ -300,7 +330,7 @@ public class TaskDefinitionController extends BaseController {
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = false, type = "Long"),
@ApiImplicitParam(name = "searchWorkflowName", value = "SEARCH_WORKFLOW_NAME", required = false, type = "String"),
@ApiImplicitParam(name = "searchTaskName", value = "SEARCH_TASK_NAME", required = false, type = "String"),
- @ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = true, dataType = "TaskType", example = "SHELL"),
+ @ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = false, dataType = "TaskType", example = "SHELL"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
})
@@ -312,7 +342,7 @@ public class TaskDefinitionController extends BaseController {
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "searchWorkflowName", required = false) String searchWorkflowName,
@RequestParam(value = "searchTaskName", required = false) String searchTaskName,
- @RequestParam(value = "taskType", required = true) TaskType taskType,
+ @RequestParam(value = "taskType", required = false) TaskType taskType,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Result result = checkPageParams(pageNo, pageSize);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 2e5f3fc..ec4931b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -258,7 +258,7 @@ public enum Status {
DATA_IS_NULL(50018, "data {0} is null", "数据[{0}]不能为空"),
PROCESS_NODE_HAS_CYCLE(50019, "process node has cycle", "流程节点间存在循环依赖"),
PROCESS_NODE_S_PARAMETER_INVALID(50020, "process node {0} parameter invalid", "流程节点[{0}]参数无效"),
- PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"),
+ PROCESS_DEFINE_STATE_ONLINE(50021, "process definition [{0}] is already on line", "工作流定义[{0}]已上线"),
DELETE_PROCESS_DEFINE_BY_CODE_ERROR(50022, "delete process definition by code error", "删除工作流定义错误"),
SCHEDULE_CRON_STATE_ONLINE(50023, "the status of schedule {0} is already on line", "调度配置[{0}]已上线"),
DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024, "delete schedule by id error", "删除调度配置错误"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
index 1b55d8b..962e60a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
@@ -92,6 +92,22 @@ public interface TaskDefinitionService {
String taskDefinitionJsonObj);
/**
+ * update task definition and upstream
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param taskCode task definition code
+ * @param taskDefinitionJsonObj task definition json object
+ * @param upstreamCodes upstream task codes, sep comma
+ * @return update result code
+ */
+ Map<String, Object> updateTaskWithUpstream(User loginUser,
+ long projectCode,
+ long taskCode,
+ String taskDefinitionJsonObj,
+ String upstreamCodes);
+
+ /**
* update task definition
*
* @param loginUser login user
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index c1342a1..094ce9f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -173,10 +174,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}
+ if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
+ putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionCode);
+ return result;
+ }
TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinition == null) {
logger.error("taskDefinitionJsonObj is not valid json");
@@ -216,14 +221,17 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
processTaskRelationLog.setPreTaskVersion(upstreamTask.getVersion());
processTaskRelationLog.setPostTaskCode(taskCode);
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
+ processTaskRelationLog.setConditionType(ConditionType.NONE);
+ processTaskRelationLog.setConditionParams("{}");
processTaskRelationLogList.add(processTaskRelationLog);
}
+ List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+ if (!processTaskRelationList.isEmpty()) {
+ processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()));
+ }
int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(),
processTaskRelationLogList, null);
- if (insertResult == Constants.EXIT_CODE_SUCCESS) {
- putMsg(result, Status.SUCCESS);
- result.put(Constants.DATA_LIST, processDefinition);
- } else {
+ if (insertResult != Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
@@ -233,6 +241,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
+ putMsg(result, Status.SUCCESS);
+ result.put(Constants.DATA_LIST, taskDefinition);
return result;
}
@@ -286,7 +296,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
- if (taskDefinition.getFlag() == Flag.YES) {
+ if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
return result;
}
@@ -332,36 +342,81 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) {
+ Map<String, Object> result = new HashMap<>();
+ int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
+ if (version <= 0) {
+ return result;
+ }
+ List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
+ if (!processTaskRelationList.isEmpty()) {
+ List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
+ int delete = 0;
+ int deleteLog = 0;
+ Date now = new Date();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+ ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
+ delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
+ deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+ if (processTaskRelationLog.getPreTaskCode() == taskCode) {
+ processTaskRelationLog.setPreTaskVersion(version);
+ }
+ if (processTaskRelationLog.getPostTaskCode() == taskCode) {
+ processTaskRelationLog.setPostTaskVersion(version);
+ }
+ processTaskRelationLog.setOperator(loginUser.getId());
+ processTaskRelationLog.setOperateTime(now);
+ processTaskRelationLog.setUpdateTime(now);
+ processTaskRelationLogList.add(processTaskRelationLog);
+ }
+ if ((delete & deleteLog) == 0) {
+ throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+ } else {
+ int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
+ int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
+ if ((insertRelation & insertRelationLog) == 0) {
+ throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ }
+ }
+ }
+ result.put(Constants.DATA_LIST, taskCode);
+ putMsg(result, Status.SUCCESS);
+ return result;
+ }
+
+ private int updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map<String, Object> result) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
- Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode));
if (result.get(Constants.STATUS) != Status.SUCCESS) {
- return result;
+ return Constants.EXIT_CODE_FAILURE;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
- return result;
+ return Constants.EXIT_CODE_FAILURE;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
- return result;
+ return Constants.EXIT_CODE_FAILURE;
}
TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
+ if (taskDefinition.equals(taskDefinitionToUpdate)) {
+ return taskDefinition.getVersion();
+ }
if (taskDefinitionToUpdate == null) {
logger.error("taskDefinitionJson is not valid json");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
- return result;
+ return Constants.EXIT_CODE_FAILURE;
}
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate)) {
logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName());
- return result;
+ return Constants.EXIT_CODE_FAILURE;
}
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
if (version == null || version == 0) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
- return result;
+ return Constants.EXIT_CODE_FAILURE;
}
Date now = new Date();
taskDefinitionToUpdate.setCode(taskCode);
@@ -381,43 +436,101 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
+ return version;
+ }
+
+ /**
+ * update task definition and upstream
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param taskCode task definition code
+ * @param taskDefinitionJsonObj task definition json object
+ * @param upstreamCodes upstream task codes, sep comma
+ * @return update result code
+ */
+ @Override
+ public Map<String, Object> updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) {
+ Map<String, Object> result = new HashMap<>();
+ int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
+ if (version <= 0) {
+ return result;
+ }
+ Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
+ if (StringUtils.isNotBlank(upstreamCodes)) {
+ Set<Long> upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+ List<TaskDefinition> upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
+ queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition));
+ // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
+ upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet());
+ if (!upstreamTaskCodes.isEmpty()) {
+ putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(upstreamTaskCodes, Constants.COMMA));
+ return result;
+ }
+ } else {
+ queryUpStreamTaskCodeMap = new HashMap<>();
+ }
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
+ if (!queryUpStreamTaskCodeMap.isEmpty() && processTaskRelationList.isEmpty()) {
+ putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, StringUtils.join(queryUpStreamTaskCodeMap.keySet(), Constants.COMMA));
+ throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXIST);
+ }
if (!processTaskRelationList.isEmpty()) {
- List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
+ List<ProcessTaskRelationLog> relationLogs = new ArrayList<>();
+ Date now = new Date();
int delete = 0;
int deleteLog = 0;
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+ processTaskRelationLog.setOperator(loginUser.getId());
+ processTaskRelationLog.setOperateTime(now);
+ processTaskRelationLog.setUpdateTime(now);
if (processTaskRelationLog.getPreTaskCode() == taskCode) {
processTaskRelationLog.setPreTaskVersion(version);
}
if (processTaskRelationLog.getPostTaskCode() == taskCode) {
processTaskRelationLog.setPostTaskVersion(version);
+ TaskDefinition definition = queryUpStreamTaskCodeMap.remove(processTaskRelationLog.getPreTaskCode());
+ if (definition == null) {
+ processTaskRelationLog.setPreTaskCode(0L);
+ processTaskRelationLog.setPreTaskVersion(0);
+ }
}
- processTaskRelationLog.setOperator(loginUser.getId());
- processTaskRelationLog.setOperateTime(now);
- processTaskRelationLog.setUpdateTime(now);
- processTaskRelationLogList.add(processTaskRelationLog);
+ relationLogs.add(processTaskRelationLog);
}
if ((delete & deleteLog) == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
- } else {
- int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
- int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
- if ((insertRelation & insertRelationLog) == 0) {
- throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ }
+ if (!queryUpStreamTaskCodeMap.isEmpty()) {
+ ProcessTaskRelationLog taskRelationLogDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(relationLogs.get(0)), ProcessTaskRelationLog.class);
+ assert taskRelationLogDeepCopy != null;
+ for (TaskDefinition upstreamTask : queryUpStreamTaskCodeMap.values()) {
+ taskRelationLogDeepCopy.setPreTaskCode(upstreamTask.getCode());
+ taskRelationLogDeepCopy.setPreTaskVersion(upstreamTask.getVersion());
+ relationLogs.add(taskRelationLogDeepCopy);
}
}
+ Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
+ relationLogs.stream().collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelationLog -> processTaskRelationLog));
+ if (taskRelationLogMap.containsKey(0L) && taskRelationLogMap.size() >= 3) {
+ taskRelationLogMap.remove(0L);
+ }
+ int insertRelation = processTaskRelationMapper.batchInsert(relationLogs);
+ int insertRelationLog = processTaskRelationLogMapper.batchInsert(relationLogs);
+ if ((insertRelation & insertRelationLog) == 0) {
+ putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ }
}
result.put(Constants.DATA_LIST, taskCode);
- putMsg(result, Status.SUCCESS, update);
+ putMsg(result, Status.SUCCESS);
return result;
}
/**
- * update task definition
+ * Switch task definition
*
* @param loginUser login user
* @param projectCode project code
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 699b2ec..4d31e23 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -459,24 +459,25 @@ public class TaskDefinition {
}
TaskDefinition that = (TaskDefinition) o;
return failRetryTimes == that.failRetryTimes
- && failRetryInterval == that.failRetryInterval
- && timeout == that.timeout
- && delayTime == that.delayTime
- && Objects.equals(name, that.name)
- && Objects.equals(description, that.description)
- && Objects.equals(taskType, that.taskType)
- && Objects.equals(taskParams, that.taskParams)
- && flag == that.flag
- && taskPriority == that.taskPriority
- && Objects.equals(workerGroup, that.workerGroup)
- && timeoutFlag == that.timeoutFlag
- && timeoutNotifyStrategy == that.timeoutNotifyStrategy
- && Objects.equals(resourceIds, that.resourceIds)
- && environmentCode == that.environmentCode
- && taskGroupId == that.taskGroupId
- && taskGroupPriority == that.taskGroupPriority;
+ && failRetryInterval == that.failRetryInterval
+ && timeout == that.timeout
+ && delayTime == that.delayTime
+ && Objects.equals(name, that.name)
+ && Objects.equals(description, that.description)
+ && Objects.equals(taskType, that.taskType)
+ && Objects.equals(taskParams, that.taskParams)
+ && flag == that.flag
+ && taskPriority == that.taskPriority
+ && Objects.equals(workerGroup, that.workerGroup)
+ && timeoutFlag == that.timeoutFlag
+ && timeoutNotifyStrategy == that.timeoutNotifyStrategy
+ && (Objects.equals(resourceIds, that.resourceIds)
+ || (StringUtils.EMPTY.equals(resourceIds) && that.resourceIds == null)
+ || (StringUtils.EMPTY.equals(that.resourceIds) && resourceIds == null))
+ && environmentCode == that.environmentCode
+ && taskGroupId == that.taskGroupId
+ && taskGroupPriority == that.taskGroupPriority;
}
-
@Override
public String toString() {
return "TaskDefinition{"
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
index 580c683..e17392f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
@@ -45,6 +45,11 @@ public class TaskMainInfo {
private int taskVersion;
/**
+ * task type
+ */
+ private String taskType;
+
+ /**
* create time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@@ -115,6 +120,14 @@ public class TaskMainInfo {
this.taskVersion = taskVersion;
}
+ public String getTaskType() {
+ return taskType;
+ }
+
+ public void setTaskType(String taskType) {
+ this.taskType = taskType;
+ }
+
public Date getTaskCreateTime() {
return taskCreateTime;
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index 8dbff07..5c889d1 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -87,7 +87,7 @@
</foreach>
</insert>
<select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
- select td.name task_name,td.code task_code,td.version task_version,td.create_time task_create_time,td.update_time task_update_time,
+ select td.name task_name,td.code task_code,td.version task_version,td.task_type,td.create_time task_create_time,td.update_time task_update_time,
pd.code process_definition_code,pd.version process_definition_version,pd.name process_definition_name,pd.release_state process_release_state,
pt.pre_task_code upstream_task_code,up.name upstream_task_name
from t_ds_task_definition td
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
index d018144..0a5c91a 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -20,3 +20,7 @@ ALTER TABLE `t_ds_process_task_relation` ADD INDEX `idx_project_code_process_def
ALTER TABLE `t_ds_process_task_relation_log` ADD INDEX `idx_project_code_process_definition_code` (`project_code`, `process_definition_code`) USING BTREE;
ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version` (`code`,`version`) USING BTREE;
+alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;
+alter table t_ds_task_definition_log add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`;
+alter table t_ds_task_definition add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;
+alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`;
\ No newline at end of file