You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/01/05 14:10:40 UTC

[dolphinscheduler] branch dev updated: [Feature][workflow list edit] add task update with upstream (#7829)

This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new acf9c84  [Feature][workflow list edit] add task update with upstream (#7829)
acf9c84 is described below

commit acf9c84c503f326e16bb328ac3896a3ead66dc58
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Wed Jan 5 22:08:42 2022 +0800

    [Feature][workflow list edit] add task update with upstream (#7829)
    
    * add task save and  binds workflow
    
    * add task update with upstream
---
 .../api/controller/TaskDefinitionController.java   |  34 ++++-
 .../apache/dolphinscheduler/api/enums/Status.java  |   2 +-
 .../api/service/TaskDefinitionService.java         |  16 ++
 .../service/impl/TaskDefinitionServiceImpl.java    | 163 +++++++++++++++++----
 .../dao/entity/TaskDefinition.java                 |  35 ++---
 .../dolphinscheduler/dao/entity/TaskMainInfo.java  |  13 ++
 .../dao/mapper/TaskDefinitionMapper.xml            |   2 +-
 .../2.1.0_schema/mysql/dolphinscheduler_ddl.sql    |   4 +
 8 files changed, 223 insertions(+), 46 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
index 754ac20..d00a052 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
@@ -153,6 +153,36 @@ public class TaskDefinitionController extends BaseController {
     }
 
     /**
+     * update task definition
+     *
+     * @param loginUser             login user
+     * @param projectCode           project code
+     * @param code                  task definition code
+     * @param taskDefinitionJsonObj task definition json object
+     * @param upstreamCodes         upstream task codes, sep comma
+     * @return update result code
+     */
+    @ApiOperation(value = "updateWithUpstream", notes = "UPDATE_TASK_DEFINITION_NOTES")
+    @ApiImplicitParams({
+        @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
+        @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
+        @ApiImplicitParam(name = "taskDefinitionJsonObj", value = "TASK_DEFINITION_JSON", required = true, type = "String"),
+        @ApiImplicitParam(name = "upstreamCodes", value = "UPSTREAM_CODES", required = false, type = "String")
+    })
+    @PutMapping(value = "/{code}/with-upstream")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(UPDATE_TASK_DEFINITION_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result updateTaskWithUpstream(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                         @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                         @PathVariable(value = "code") long code,
+                                         @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj,
+                                         @RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) {
+        Map<String, Object> result = taskDefinitionService.updateTaskWithUpstream(loginUser, projectCode, code, taskDefinitionJsonObj, upstreamCodes);
+        return returnDataList(result);
+    }
+
+    /**
      * query task definition version paging list info
      *
      * @param loginUser login user info
@@ -300,7 +330,7 @@ public class TaskDefinitionController extends BaseController {
         @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = false, type = "Long"),
         @ApiImplicitParam(name = "searchWorkflowName", value = "SEARCH_WORKFLOW_NAME", required = false, type = "String"),
         @ApiImplicitParam(name = "searchTaskName", value = "SEARCH_TASK_NAME", required = false, type = "String"),
-        @ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = true, dataType = "TaskType", example = "SHELL"),
+        @ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = false, dataType = "TaskType", example = "SHELL"),
         @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
         @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
     })
@@ -312,7 +342,7 @@ public class TaskDefinitionController extends BaseController {
                                                 @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
                                                 @RequestParam(value = "searchWorkflowName", required = false) String searchWorkflowName,
                                                 @RequestParam(value = "searchTaskName", required = false) String searchTaskName,
-                                                @RequestParam(value = "taskType", required = true) TaskType taskType,
+                                                @RequestParam(value = "taskType", required = false) TaskType taskType,
                                                 @RequestParam("pageNo") Integer pageNo,
                                                 @RequestParam("pageSize") Integer pageSize) {
         Result result = checkPageParams(pageNo, pageSize);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 2e5f3fc..ec4931b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -258,7 +258,7 @@ public enum Status {
     DATA_IS_NULL(50018, "data {0} is null", "数据[{0}]不能为空"),
     PROCESS_NODE_HAS_CYCLE(50019, "process node has cycle", "流程节点间存在循环依赖"),
     PROCESS_NODE_S_PARAMETER_INVALID(50020, "process node {0} parameter invalid", "流程节点[{0}]参数无效"),
-    PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"),
+    PROCESS_DEFINE_STATE_ONLINE(50021, "process definition [{0}] is already on line", "工作流定义[{0}]已上线"),
     DELETE_PROCESS_DEFINE_BY_CODE_ERROR(50022, "delete process definition by code error", "删除工作流定义错误"),
     SCHEDULE_CRON_STATE_ONLINE(50023, "the status of schedule {0} is already on line", "调度配置[{0}]已上线"),
     DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024, "delete schedule by id error", "删除调度配置错误"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
index 1b55d8b..962e60a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
@@ -92,6 +92,22 @@ public interface TaskDefinitionService {
                                              String taskDefinitionJsonObj);
 
     /**
+     * update task definition and upstream
+     *
+     * @param loginUser             login user
+     * @param projectCode           project code
+     * @param taskCode              task definition code
+     * @param taskDefinitionJsonObj task definition json object
+     * @param upstreamCodes         upstream task codes, sep comma
+     * @return update result code
+     */
+    Map<String, Object> updateTaskWithUpstream(User loginUser,
+                                               long projectCode,
+                                               long taskCode,
+                                               String taskDefinitionJsonObj,
+                                               String upstreamCodes);
+
+    /**
      * update task definition
      *
      * @param loginUser login user
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index c1342a1..094ce9f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.ConditionType;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -173,10 +174,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             return result;
         }
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
-        if (processDefinition == null) {
+        if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
             return result;
         }
+        if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
+            putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionCode);
+            return result;
+        }
         TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
         if (taskDefinition == null) {
             logger.error("taskDefinitionJsonObj is not valid json");
@@ -216,14 +221,17 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
                 processTaskRelationLog.setPreTaskVersion(upstreamTask.getVersion());
                 processTaskRelationLog.setPostTaskCode(taskCode);
                 processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
+                processTaskRelationLog.setConditionType(ConditionType.NONE);
+                processTaskRelationLog.setConditionParams("{}");
                 processTaskRelationLogList.add(processTaskRelationLog);
             }
+            List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
+            if (!processTaskRelationList.isEmpty()) {
+                processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()));
+            }
             int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(),
                 processTaskRelationLogList, null);
-            if (insertResult == Constants.EXIT_CODE_SUCCESS) {
-                putMsg(result, Status.SUCCESS);
-                result.put(Constants.DATA_LIST, processDefinition);
-            } else {
+            if (insertResult != Constants.EXIT_CODE_SUCCESS) {
                 putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
                 throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
             }
@@ -233,6 +241,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
         }
+        putMsg(result, Status.SUCCESS);
+        result.put(Constants.DATA_LIST, taskDefinition);
         return result;
     }
 
@@ -286,7 +296,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
             return result;
         }
-        if (taskDefinition.getFlag() == Flag.YES) {
+        if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
             putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
             return result;
         }
@@ -332,36 +342,81 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
     @Transactional(rollbackFor = RuntimeException.class)
     @Override
     public Map<String, Object> updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) {
+        Map<String, Object> result = new HashMap<>();
+        int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
+        if (version <= 0) {
+            return result;
+        }
+        List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
+        if (!processTaskRelationList.isEmpty()) {
+            List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
+            int delete = 0;
+            int deleteLog = 0;
+            Date now = new Date();
+            for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
+                ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
+                delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
+                deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+                if (processTaskRelationLog.getPreTaskCode() == taskCode) {
+                    processTaskRelationLog.setPreTaskVersion(version);
+                }
+                if (processTaskRelationLog.getPostTaskCode() == taskCode) {
+                    processTaskRelationLog.setPostTaskVersion(version);
+                }
+                processTaskRelationLog.setOperator(loginUser.getId());
+                processTaskRelationLog.setOperateTime(now);
+                processTaskRelationLog.setUpdateTime(now);
+                processTaskRelationLogList.add(processTaskRelationLog);
+            }
+            if ((delete & deleteLog) == 0) {
+                throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
+            } else {
+                int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
+                int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
+                if ((insertRelation & insertRelationLog) == 0) {
+                    throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+                }
+            }
+        }
+        result.put(Constants.DATA_LIST, taskCode);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    private int updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map<String, Object> result) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode));
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
+            return Constants.EXIT_CODE_FAILURE;
         }
         TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
         if (taskDefinition == null) {
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
-            return result;
+            return Constants.EXIT_CODE_FAILURE;
         }
         if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
             putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
-            return result;
+            return Constants.EXIT_CODE_FAILURE;
         }
         TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
+        if (taskDefinition.equals(taskDefinitionToUpdate)) {
+            return taskDefinition.getVersion();
+        }
         if (taskDefinitionToUpdate == null) {
             logger.error("taskDefinitionJson is not valid json");
             putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
-            return result;
+            return Constants.EXIT_CODE_FAILURE;
         }
         if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate)) {
             logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName());
             putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName());
-            return result;
+            return Constants.EXIT_CODE_FAILURE;
         }
         Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
         if (version == null || version == 0) {
             putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
-            return result;
+            return Constants.EXIT_CODE_FAILURE;
         }
         Date now = new Date();
         taskDefinitionToUpdate.setCode(taskCode);
@@ -381,43 +436,101 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
         }
+        return version;
+    }
+
+    /**
+     * update task definition and upstream
+     *
+     * @param loginUser             login user
+     * @param projectCode           project code
+     * @param taskCode              task definition code
+     * @param taskDefinitionJsonObj task definition json object
+     * @param upstreamCodes         upstream task codes, sep comma
+     * @return update result code
+     */
+    @Override
+    public Map<String, Object> updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) {
+        Map<String, Object> result = new HashMap<>();
+        int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
+        if (version <= 0) {
+            return result;
+        }
+        Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
+        if (StringUtils.isNotBlank(upstreamCodes)) {
+            Set<Long> upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+            List<TaskDefinition> upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
+            queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition));
+            // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
+            upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet());
+            if (!upstreamTaskCodes.isEmpty()) {
+                putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(upstreamTaskCodes, Constants.COMMA));
+                return result;
+            }
+        } else {
+            queryUpStreamTaskCodeMap = new HashMap<>();
+        }
         List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
+        if (!queryUpStreamTaskCodeMap.isEmpty() && processTaskRelationList.isEmpty()) {
+            putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, StringUtils.join(queryUpStreamTaskCodeMap.keySet(), Constants.COMMA));
+            throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXIST);
+        }
         if (!processTaskRelationList.isEmpty()) {
-            List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
+            List<ProcessTaskRelationLog> relationLogs = new ArrayList<>();
+            Date now = new Date();
             int delete = 0;
             int deleteLog = 0;
             for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
                 ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
                 delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
                 deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
+                processTaskRelationLog.setOperator(loginUser.getId());
+                processTaskRelationLog.setOperateTime(now);
+                processTaskRelationLog.setUpdateTime(now);
                 if (processTaskRelationLog.getPreTaskCode() == taskCode) {
                     processTaskRelationLog.setPreTaskVersion(version);
                 }
                 if (processTaskRelationLog.getPostTaskCode() == taskCode) {
                     processTaskRelationLog.setPostTaskVersion(version);
+                    TaskDefinition definition = queryUpStreamTaskCodeMap.remove(processTaskRelationLog.getPreTaskCode());
+                    if (definition == null) {
+                        processTaskRelationLog.setPreTaskCode(0L);
+                        processTaskRelationLog.setPreTaskVersion(0);
+                    }
                 }
-                processTaskRelationLog.setOperator(loginUser.getId());
-                processTaskRelationLog.setOperateTime(now);
-                processTaskRelationLog.setUpdateTime(now);
-                processTaskRelationLogList.add(processTaskRelationLog);
+                relationLogs.add(processTaskRelationLog);
             }
             if ((delete & deleteLog) == 0) {
                 throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
-            } else {
-                int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
-                int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
-                if ((insertRelation & insertRelationLog) == 0) {
-                    throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+            }
+            if (!queryUpStreamTaskCodeMap.isEmpty()) {
+                ProcessTaskRelationLog taskRelationLogDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(relationLogs.get(0)), ProcessTaskRelationLog.class);
+                assert taskRelationLogDeepCopy != null;
+                for (TaskDefinition upstreamTask : queryUpStreamTaskCodeMap.values()) {
+                    taskRelationLogDeepCopy.setPreTaskCode(upstreamTask.getCode());
+                    taskRelationLogDeepCopy.setPreTaskVersion(upstreamTask.getVersion());
+                    relationLogs.add(taskRelationLogDeepCopy);
                 }
             }
+            Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
+                relationLogs.stream().collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelationLog -> processTaskRelationLog));
+            if (taskRelationLogMap.containsKey(0L) && taskRelationLogMap.size() >= 3) {
+                taskRelationLogMap.remove(0L);
+            }
+            int insertRelation = processTaskRelationMapper.batchInsert(relationLogs);
+            int insertRelationLog = processTaskRelationLogMapper.batchInsert(relationLogs);
+            if ((insertRelation & insertRelationLog) == 0) {
+                putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+                throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+            }
         }
         result.put(Constants.DATA_LIST, taskCode);
-        putMsg(result, Status.SUCCESS, update);
+        putMsg(result, Status.SUCCESS);
         return result;
     }
 
     /**
-     * update task definition
+     * Switch task definition
      *
      * @param loginUser login user
      * @param projectCode project code
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 699b2ec..4d31e23 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -459,24 +459,25 @@ public class TaskDefinition {
         }
         TaskDefinition that = (TaskDefinition) o;
         return failRetryTimes == that.failRetryTimes
-                && failRetryInterval == that.failRetryInterval
-                && timeout == that.timeout
-                && delayTime == that.delayTime
-                && Objects.equals(name, that.name)
-                && Objects.equals(description, that.description)
-                && Objects.equals(taskType, that.taskType)
-                && Objects.equals(taskParams, that.taskParams)
-                && flag == that.flag
-                && taskPriority == that.taskPriority
-                && Objects.equals(workerGroup, that.workerGroup)
-                && timeoutFlag == that.timeoutFlag
-                && timeoutNotifyStrategy == that.timeoutNotifyStrategy
-                && Objects.equals(resourceIds, that.resourceIds)
-                && environmentCode == that.environmentCode
-                && taskGroupId == that.taskGroupId
-                && taskGroupPriority == that.taskGroupPriority;
+            && failRetryInterval == that.failRetryInterval
+            && timeout == that.timeout
+            && delayTime == that.delayTime
+            && Objects.equals(name, that.name)
+            && Objects.equals(description, that.description)
+            && Objects.equals(taskType, that.taskType)
+            && Objects.equals(taskParams, that.taskParams)
+            && flag == that.flag
+            && taskPriority == that.taskPriority
+            && Objects.equals(workerGroup, that.workerGroup)
+            && timeoutFlag == that.timeoutFlag
+            && timeoutNotifyStrategy == that.timeoutNotifyStrategy
+            && (Objects.equals(resourceIds, that.resourceIds)
+            || (StringUtils.EMPTY.equals(resourceIds) && that.resourceIds == null)
+            || (StringUtils.EMPTY.equals(that.resourceIds) && resourceIds == null))
+            && environmentCode == that.environmentCode
+            && taskGroupId == that.taskGroupId
+            && taskGroupPriority == that.taskGroupPriority;
     }
-
     @Override
     public String toString() {
         return "TaskDefinition{"
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
index 580c683..e17392f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java
@@ -45,6 +45,11 @@ public class TaskMainInfo {
     private int taskVersion;
 
     /**
+     * task type
+     */
+    private String taskType;
+
+    /**
      * create time
      */
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@@ -115,6 +120,14 @@ public class TaskMainInfo {
         this.taskVersion = taskVersion;
     }
 
+    public String getTaskType() {
+        return taskType;
+    }
+
+    public void setTaskType(String taskType) {
+        this.taskType = taskType;
+    }
+
     public Date getTaskCreateTime() {
         return taskCreateTime;
     }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index 8dbff07..5c889d1 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -87,7 +87,7 @@
         </foreach>
     </insert>
     <select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
-        select td.name task_name,td.code task_code,td.version task_version,td.create_time task_create_time,td.update_time task_update_time,
+        select td.name task_name,td.code task_code,td.version task_version,td.task_type,td.create_time task_create_time,td.update_time task_update_time,
         pd.code process_definition_code,pd.version process_definition_version,pd.name process_definition_name,pd.release_state process_release_state,
         pt.pre_task_code upstream_task_code,up.name upstream_task_name
         from t_ds_task_definition td
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
index d018144..0a5c91a 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -20,3 +20,7 @@ ALTER TABLE `t_ds_process_task_relation` ADD INDEX `idx_project_code_process_def
 ALTER TABLE `t_ds_process_task_relation_log` ADD INDEX `idx_project_code_process_definition_code` (`project_code`, `process_definition_code`) USING BTREE;
 
 ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version` (`code`,`version`) USING BTREE;
+alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;
+alter table t_ds_task_definition_log add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`;
+alter table t_ds_task_definition add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;
+alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`;
\ No newline at end of file