You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/19 09:41:20 UTC

[dolphinscheduler] branch 3.1.0-prepare updated (775ef98b64 -> 780a509f67)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a change to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


    from 775ef98b64 Add validations of possible malicious keys (#11966)
     new d7f40b19b5 [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)
     new 704043f229 fix import error
     new 1513363eed Fix cannot save processDefinition (#11931)
     new 780a509f67 fix workflow keep running when task fail (#11930)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/service/impl/ExecutorServiceImpl.java      |   8 +-
 .../service/impl/ProcessDefinitionServiceImpl.java | 196 +++++++---------
 .../service/impl/ProcessInstanceServiceImpl.java   |  13 ++
 .../api/service/ProcessDefinitionServiceTest.java  |  14 +-
 .../common/utils/CodeGenerateUtils.java            |  11 +-
 .../dolphinscheduler/dao/entity/Command.java       | 132 -----------
 .../dao/entity/DependentProcessDefinition.java     |  17 +-
 .../dao/entity/ProcessDefinition.java              | 250 +--------------------
 .../dao/entity/ProcessTaskRelation.java            |  30 +--
 .../dao/mapper/WorkFlowLineageMapper.xml           |   1 +
 .../dao/mapper/TaskInstanceMapperTest.java         |  37 ++-
 .../master/runner/WorkflowExecuteRunnable.java     |  15 +-
 .../service/process/ProcessServiceImpl.java        |  69 +++---
 13 files changed, 184 insertions(+), 609 deletions(-)


[dolphinscheduler] 02/04: fix import error

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 704043f229a1507b54ff222c9290114c9d637b45
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Mon Sep 19 17:07:02 2022 +0800

    fix import error
---
 .../api/service/impl/ProcessInstanceServiceImpl.java        | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 75830e8df2..22b91317bd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -34,6 +34,12 @@ import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
 import org.apache.dolphinscheduler.api.dto.gantt.Task;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.LoggerService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
+import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.UsersService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
@@ -81,6 +87,13 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 


[dolphinscheduler] 03/04: Fix cannot save processDefinition (#11931)

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 1513363eedfeca5c64f3478917ba084c66b61dfb
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Sep 16 13:54:18 2022 +0800

    Fix cannot save processDefinition (#11931)
---
 .../service/impl/ProcessDefinitionServiceImpl.java | 196 +++++++---------
 .../api/service/ProcessDefinitionServiceTest.java  |  14 +-
 .../common/utils/CodeGenerateUtils.java            |  11 +-
 .../dolphinscheduler/dao/entity/Command.java       | 132 -----------
 .../dao/entity/ProcessDefinition.java              | 250 +--------------------
 .../dao/entity/ProcessTaskRelation.java            |  30 +--
 .../dao/mapper/TaskInstanceMapperTest.java         |  37 ++-
 .../service/process/ProcessServiceImpl.java        |  69 +++---
 8 files changed, 142 insertions(+), 597 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 2effaefa5f..d2e957bd58 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -17,14 +17,28 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_EXPORT;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_EXPORT;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.COPY_SUFFIX;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
+import static org.apache.dolphinscheduler.common.Constants.IMPORT_SUFFIX;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMPLEX_TASK_TYPES;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL;
+
 import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
 import org.apache.dolphinscheduler.api.dto.ScheduleParam;
 import org.apache.dolphinscheduler.api.dto.treeview.Instance;
@@ -94,16 +108,10 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.MediaType;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.multipart.MultipartFile;
 
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -127,27 +135,6 @@ import java.util.stream.Collectors;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_EXPORT;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_EXPORT;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.COPY_SUFFIX;
-import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
-import static org.apache.dolphinscheduler.common.Constants.IMPORT_SUFFIX;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMPLEX_TASK_TYPES;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL;
 import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServletResponse;
 
@@ -273,44 +260,25 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        if(checkDescriptionLength(description)){
-            putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
-            return result;
+        if (checkDescriptionLength(description)) {
+            throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR);
         }
         // check whether the new process define name exist
         ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
         if (definition != null) {
-            putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
-            return result;
-        }
-        List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
-        Map<String, Object> checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson);
-        if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) {
-            return checkTaskDefinitions;
-        }
-        List<ProcessTaskRelationLog> taskRelationList =
-                JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
-        Map<String, Object> checkRelationJson =
-                checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
-        if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
-            return checkRelationJson;
+            throw new ServiceException(Status.PROCESS_DEFINITION_NAME_EXIST, name);
         }
+        List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
+        List<ProcessTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);
         int tenantId = -1;
         if (!Constants.DEFAULT.equals(tenantCode)) {
             Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
             if (tenant == null) {
-                putMsg(result, Status.TENANT_NOT_EXIST);
-                return result;
+                throw new ServiceException(Status.TENANT_NOT_EXIST);
             }
             tenantId = tenant.getId();
         }
-        long processDefinitionCode;
-        try {
-            processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
-        } catch (CodeGenerateException e) {
-            putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
-            return result;
-        }
+        long processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
         ProcessDefinition processDefinition =
                 new ProcessDefinition(projectCode, name, processDefinitionCode, description,
                         globalParams, locations, timeout, loginUser.getId(), tenantId);
@@ -330,66 +298,63 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             logger.info("The task has not changed, so skip");
         }
         if (saveTaskResult == Constants.DEFINITION_FAILURE) {
-            putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
         }
         int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
         if (insertVersion == 0) {
-            putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
         }
         int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
                 processDefinition.getCode(),
                 insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
-        if (insertResult == Constants.EXIT_CODE_SUCCESS) {
-            putMsg(result, Status.SUCCESS);
-            result.put(Constants.DATA_LIST, processDefinition);
-        } else {
-            putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+        if (insertResult != Constants.EXIT_CODE_SUCCESS) {
             throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
         }
         saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
+
+        putMsg(result, Status.SUCCESS);
+        result.put(Constants.DATA_LIST, processDefinition);
         return result;
     }
 
-    private Map<String, Object> checkTaskDefinitionList(List<TaskDefinitionLog> taskDefinitionLogs,
-                                                        String taskDefinitionJson) {
-        Map<String, Object> result = new HashMap<>();
+    private List<TaskDefinitionLog> generateTaskDefinitionList(String taskDefinitionJson) {
         try {
-            if (taskDefinitionLogs.isEmpty()) {
-                logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson);
-                putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
-                return result;
+            List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
+            if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
+                logger.error("Generate task definition list failed, the given taskDefinitionJson is invalided: {}",
+                        taskDefinitionJson);
+                throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJson);
             }
             for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
-
                 if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
                         .taskType(taskDefinitionLog.getTaskType())
                         .taskParams(taskDefinitionLog.getTaskParams())
                         .dependence(taskDefinitionLog.getDependence())
                         .build())) {
-                    logger.error("task definition {} parameter invalid", taskDefinitionLog.getName());
-                    putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
-                    return result;
+                    logger.error(
+                            "Generate task definition list failed, the given task definition parameter is invalided, taskName: {}, taskDefinition: {}",
+                            taskDefinitionLog.getName(), taskDefinitionLog);
+                    throw new ServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
                 }
             }
-            putMsg(result, Status.SUCCESS);
+            return taskDefinitionLogs;
+        } catch (ServiceException ex) {
+            throw ex;
         } catch (Exception e) {
-            result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
-            result.put(Constants.MSG, e.getMessage());
+            logger.error("Generate task definition list failed, meet an unknown exception", e);
+            throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR);
         }
-        return result;
     }
 
-    private Map<String, Object> checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList,
-                                                      String taskRelationJson,
-                                                      List<TaskDefinitionLog> taskDefinitionLogs) {
-        Map<String, Object> result = new HashMap<>();
+    private List<ProcessTaskRelationLog> generateTaskRelationList(String taskRelationJson,
+                                                                  List<TaskDefinitionLog> taskDefinitionLogs) {
         try {
-            if (taskRelationList == null || taskRelationList.isEmpty()) {
-                logger.error("task relation list is null");
-                putMsg(result, Status.DATA_IS_NOT_VALID, taskRelationJson);
-                return result;
+            List<ProcessTaskRelationLog> taskRelationList =
+                    JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
+            if (CollectionUtils.isEmpty(taskRelationList)) {
+                logger.error("Generate task relation list failed the taskRelation list is empty, taskRelationJson: {}",
+                        taskRelationJson);
+                throw new ServiceException(Status.DATA_IS_NOT_VALID);
             }
             List<ProcessTaskRelation> processTaskRelations = taskRelationList.stream()
                     .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog),
@@ -403,31 +368,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 Collection<Long> codes = CollectionUtils.subtract(postTaskCodes, taskNodeCodes);
                 if (CollectionUtils.isNotEmpty(codes)) {
                     logger.error("the task code is not exist");
-                    putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
-                            StringUtils.join(codes, Constants.COMMA));
-                    return result;
+                    throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(codes, Constants.COMMA));
                 }
             }
             if (graphHasCycle(taskNodeList)) {
                 logger.error("process DAG has cycle");
-                putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
-                return result;
+                throw new ServiceException(Status.PROCESS_NODE_HAS_CYCLE);
             }
 
             // check whether the task relation json is normal
             for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
                 if (processTaskRelationLog.getPostTaskCode() == 0) {
                     logger.error("the post_task_code or post_task_version can't be zero");
-                    putMsg(result, Status.CHECK_PROCESS_TASK_RELATION_ERROR);
-                    return result;
+                    throw new ServiceException(Status.CHECK_PROCESS_TASK_RELATION_ERROR);
                 }
             }
-            putMsg(result, Status.SUCCESS);
+            return taskRelationList;
+        } catch (ServiceException ex) {
+            throw ex;
         } catch (Exception e) {
-            result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
-            result.put(Constants.MSG, e.getMessage());
+            logger.error("Check task relation list error, meet an unknown exception, given taskRelationJson: {}",
+                    taskRelationJson, e);
+            throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR);
         }
-        return result;
     }
 
     /**
@@ -629,22 +592,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        if(checkDescriptionLength(description)){
+        if (checkDescriptionLength(description)) {
             putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
             return result;
         }
-        List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
-        Map<String, Object> checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson);
-        if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) {
-            return checkTaskDefinitions;
-        }
-        List<ProcessTaskRelationLog> taskRelationList =
-                JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
-        Map<String, Object> checkRelationJson =
-                checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
-        if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
-            return checkRelationJson;
-        }
+        List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
+        List<ProcessTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);
 
         int tenantId = -1;
         if (!Constants.DEFAULT.equals(tenantCode)) {
@@ -786,7 +739,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @return true if process definition name not exists, otherwise false
      */
     @Override
-    public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name, long processDefinitionCode) {
+    public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name,
+                                                           long processDefinitionCode) {
         Project project = projectMapper.queryByCode(projectCode);
         // check user access for project
         Map<String, Object> result =
@@ -2228,7 +2182,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        if(checkDescriptionLength(description)){
+        if (checkDescriptionLength(description)) {
             putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
             return result;
         }
@@ -2367,7 +2321,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        if(checkDescriptionLength(description)){
+        if (checkDescriptionLength(description)) {
             putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
             return result;
         }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index bfd8ecfa34..042cb78c5c 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -96,9 +96,6 @@ import org.springframework.mock.web.MockMultipartFile;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
-/**
- * process definition service test
- */
 @RunWith(MockitoJUnitRunner.class)
 public class ProcessDefinitionServiceTest {
 
@@ -756,10 +753,13 @@ public class ProcessDefinitionServiceTest {
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE))
                 .thenReturn(result);
 
-        Map<String, Object> updateResult =
-                processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
-                        "", "", "", 0, "root", null, "", null, ProcessExecutionTypeEnum.PARALLEL);
-        Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS));
+        try {
+            processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
+                    "", "", "", 0, "root", null, "", null, ProcessExecutionTypeEnum.PARALLEL);
+            Assert.fail();
+        } catch (ServiceException ex) {
+            Assert.assertEquals(new Integer(Status.DATA_IS_NOT_VALID.getCode()), ex.getCode());
+        }
     }
 
     @Test
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
index ffea87be51..f35523b59d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
@@ -10,8 +10,9 @@ import java.util.Objects;
  *  Rewriting based on Twitter snowflake algorithm
  */
 public class CodeGenerateUtils {
+
     // start timestamp
-    private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01 00:00:00
+    private static final long START_TIMESTAMP = 1609430400000L; // 2021-01-01 00:00:00
     // Each machine generates 32 in the same millisecond
     private static final long LOW_DIGIT_BIT = 5L;
     private static final long MIDDLE_BIT = 2L;
@@ -24,11 +25,12 @@ public class CodeGenerateUtils {
     private long recordMillisecond = -1L;
 
     private static final long SYSTEM_TIMESTAMP = System.currentTimeMillis();
-    private static final long SYSTEM_NANOTIME  = System.nanoTime();
+    private static final long SYSTEM_NANOTIME = System.nanoTime();
 
     private CodeGenerateUtils() throws CodeGenerateException {
         try {
-            this.machineHash = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1));
+            this.machineHash =
+                    Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1));
         } catch (UnknownHostException e) {
             throw new CodeGenerateException(e.getMessage());
         }
@@ -66,7 +68,8 @@ public class CodeGenerateUtils {
         return SYSTEM_TIMESTAMP + (System.nanoTime() - SYSTEM_NANOTIME) / 1000000;
     }
 
-    public static class CodeGenerateException extends Exception {
+    public static class CodeGenerateException extends RuntimeException {
+
         public CodeGenerateException(String message) {
             super(message);
         }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
index 12a094139a..ec9336d501 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
@@ -36,27 +36,15 @@ import com.baomidou.mybatisplus.annotation.TableName;
 @TableName("t_ds_command")
 public class Command {
 
-    /**
-     * id
-     */
     @TableId(value = "id", type = IdType.AUTO)
     private Integer id;
 
-    /**
-     * command type
-     */
     @TableField("command_type")
     private CommandType commandType;
 
-    /**
-     * process definition code
-     */
     @TableField("process_definition_code")
     private long processDefinitionCode;
 
-    /**
-     * executor id
-     */
     @TableField("executor_id")
     private int executorId;
 
@@ -66,69 +54,36 @@ public class Command {
     @TableField("command_param")
     private String commandParam;
 
-    /**
-     * task depend type
-     */
     @TableField("task_depend_type")
     private TaskDependType taskDependType;
 
-    /**
-     * failure strategy
-     */
     @TableField("failure_strategy")
     private FailureStrategy failureStrategy;
 
-    /**
-     * warning type
-     */
     @TableField("warning_type")
     private WarningType warningType;
 
-    /**
-     * warning group id
-     */
     @TableField("warning_group_id")
     private Integer warningGroupId;
 
-    /**
-     * schedule time
-     */
     @TableField("schedule_time")
     private Date scheduleTime;
 
-    /**
-     * start time
-     */
     @TableField("start_time")
     private Date startTime;
 
-    /**
-     * process instance priority
-     */
     @TableField("process_instance_priority")
     private Priority processInstancePriority;
 
-    /**
-     * update time
-     */
     @TableField("update_time")
     private Date updateTime;
 
-    /**
-     * worker group
-     */
     @TableField("worker_group")
     private String workerGroup;
 
-    /**
-     * environment code
-     */
     @TableField("environment_code")
     private Long environmentCode;
 
-    /**
-     * dry run flag
-     */
     @TableField("dry_run")
     private int dryRun;
 
@@ -180,91 +135,4 @@ public class Command {
         this.processDefinitionVersion = processDefinitionVersion;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        Command command = (Command) o;
-
-        if (id != command.id) {
-            return false;
-        }
-        if (processDefinitionCode != command.processDefinitionCode) {
-            return false;
-        }
-        if (executorId != command.executorId) {
-            return false;
-        }
-        if (workerGroup != null ? workerGroup.equals(command.workerGroup) : command.workerGroup == null) {
-            return false;
-        }
-
-        if (environmentCode != null ? environmentCode.equals(command.environmentCode)
-                : command.environmentCode == null) {
-            return false;
-        }
-
-        if (commandType != command.commandType) {
-            return false;
-        }
-        if (commandParam != null ? !commandParam.equals(command.commandParam) : command.commandParam != null) {
-            return false;
-        }
-        if (taskDependType != command.taskDependType) {
-            return false;
-        }
-        if (failureStrategy != command.failureStrategy) {
-            return false;
-        }
-        if (warningType != command.warningType) {
-            return false;
-        }
-        if (warningGroupId != null ? !warningGroupId.equals(command.warningGroupId) : command.warningGroupId != null) {
-            return false;
-        }
-        if (scheduleTime != null ? !scheduleTime.equals(command.scheduleTime) : command.scheduleTime != null) {
-            return false;
-        }
-        if (startTime != null ? !startTime.equals(command.startTime) : command.startTime != null) {
-            return false;
-        }
-        if (processInstancePriority != command.processInstancePriority) {
-            return false;
-        }
-        if (processInstanceId != command.processInstanceId) {
-            return false;
-        }
-        if (processDefinitionVersion != command.getProcessDefinitionVersion()) {
-            return false;
-        }
-        return !(updateTime != null ? !updateTime.equals(command.updateTime) : command.updateTime != null);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = id;
-        result = 31 * result + (commandType != null ? commandType.hashCode() : 0);
-        result = 31 * result + Long.hashCode(processDefinitionCode);
-        result = 31 * result + executorId;
-        result = 31 * result + (commandParam != null ? commandParam.hashCode() : 0);
-        result = 31 * result + (taskDependType != null ? taskDependType.hashCode() : 0);
-        result = 31 * result + (failureStrategy != null ? failureStrategy.hashCode() : 0);
-        result = 31 * result + (warningType != null ? warningType.hashCode() : 0);
-        result = 31 * result + (warningGroupId != null ? warningGroupId.hashCode() : 0);
-        result = 31 * result + (scheduleTime != null ? scheduleTime.hashCode() : 0);
-        result = 31 * result + (startTime != null ? startTime.hashCode() : 0);
-        result = 31 * result + (processInstancePriority != null ? processInstancePriority.hashCode() : 0);
-        result = 31 * result + (updateTime != null ? updateTime.hashCode() : 0);
-        result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0);
-        result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0);
-        result = 31 * result + dryRun;
-        result = 31 * result + processInstanceId;
-        result = 31 * result + processDefinitionVersion;
-        return result;
-    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index 089a035373..6e7414c286 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -27,18 +27,23 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import com.google.common.base.Strings;
 
-/**
- * process definition
- */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 @TableName("t_ds_process_definition")
 public class ProcessDefinition {
 
@@ -171,9 +176,6 @@ public class ProcessDefinition {
      */
     private ProcessExecutionTypeEnum executionType;
 
-    public ProcessDefinition() {
-    }
-
     public ProcessDefinition(long projectCode,
                              String name,
                              long code,
@@ -208,90 +210,6 @@ public class ProcessDefinition {
         this.flag = Flag.YES;
     }
 
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public int getVersion() {
-        return version;
-    }
-
-    public void setVersion(int version) {
-        this.version = version;
-    }
-
-    public Integer getId() {
-        return id;
-    }
-
-    public void setId(Integer id) {
-        this.id = id;
-    }
-
-    public ReleaseState getReleaseState() {
-        return releaseState;
-    }
-
-    public void setReleaseState(ReleaseState releaseState) {
-        this.releaseState = releaseState;
-    }
-
-    public Date getCreateTime() {
-        return createTime;
-    }
-
-    public void setCreateTime(Date createTime) {
-        this.createTime = createTime;
-    }
-
-    public Date getUpdateTime() {
-        return updateTime;
-    }
-
-    public void setUpdateTime(Date updateTime) {
-        this.updateTime = updateTime;
-    }
-
-    public Flag getFlag() {
-        return flag;
-    }
-
-    public void setFlag(Flag flag) {
-        this.flag = flag;
-    }
-
-    public int getUserId() {
-        return userId;
-    }
-
-    public void setUserId(int userId) {
-        this.userId = userId;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-
-    public String getProjectName() {
-        return projectName;
-    }
-
-    public void setProjectName(String projectName) {
-        this.projectName = projectName;
-    }
-
-    public String getGlobalParams() {
-        return globalParams;
-    }
-
     public void setGlobalParams(String globalParams) {
         this.globalParamList = JSONUtils.toList(globalParams, Property.class);
         if (this.globalParamList == null) {
@@ -300,14 +218,6 @@ public class ProcessDefinition {
         this.globalParams = globalParams;
     }
 
-    public List<Property> getGlobalParamList() {
-        return globalParamList;
-    }
-
-    public void setGlobalParamList(List<Property> globalParamList) {
-        this.globalParamList = globalParamList;
-    }
-
     public Map<String, String> getGlobalParamMap() {
         if (globalParamMap == null && !Strings.isNullOrEmpty(globalParams)) {
             List<Property> propList = JSONUtils.toList(globalParams, Property.class);
@@ -317,146 +227,4 @@ public class ProcessDefinition {
         return globalParamMap;
     }
 
-    public void setGlobalParamMap(Map<String, String> globalParamMap) {
-        this.globalParamMap = globalParamMap;
-    }
-
-    public String getLocations() {
-        return locations;
-    }
-
-    public void setLocations(String locations) {
-        this.locations = locations;
-    }
-
-    public ReleaseState getScheduleReleaseState() {
-        return scheduleReleaseState;
-    }
-
-    public void setScheduleReleaseState(ReleaseState scheduleReleaseState) {
-        this.scheduleReleaseState = scheduleReleaseState;
-    }
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    public void setTimeout(int timeout) {
-        this.timeout = timeout;
-    }
-
-    public int getTenantId() {
-        return tenantId;
-    }
-
-    public void setTenantId(int tenantId) {
-        this.tenantId = tenantId;
-    }
-
-    public String getTenantCode() {
-        return tenantCode;
-    }
-
-    public void setTenantCode(String tenantCode) {
-        this.tenantCode = tenantCode;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public String getModifyBy() {
-        return modifyBy;
-    }
-
-    public void setModifyBy(String modifyBy) {
-        this.modifyBy = modifyBy;
-    }
-
-    public long getCode() {
-        return code;
-    }
-
-    public void setCode(long code) {
-        this.code = code;
-    }
-
-    public long getProjectCode() {
-        return projectCode;
-    }
-
-    public void setProjectCode(long projectCode) {
-        this.projectCode = projectCode;
-    }
-
-    public int getWarningGroupId() {
-        return warningGroupId;
-    }
-
-    public void setWarningGroupId(int warningGroupId) {
-        this.warningGroupId = warningGroupId;
-    }
-
-    public ProcessExecutionTypeEnum getExecutionType() {
-        return executionType;
-    }
-
-    public void setExecutionType(ProcessExecutionTypeEnum executionType) {
-        this.executionType = executionType;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        ProcessDefinition that = (ProcessDefinition) o;
-        return projectCode == that.projectCode
-                && userId == that.userId
-                && timeout == that.timeout
-                && tenantId == that.tenantId
-                && Objects.equals(name, that.name)
-                && releaseState == that.releaseState
-                && Objects.equals(description, that.description)
-                && Objects.equals(globalParams, that.globalParams)
-                && flag == that.flag
-                && executionType == that.executionType
-                && Objects.equals(locations, that.locations);
-    }
-
-    @Override
-    public String toString() {
-        return "ProcessDefinition{"
-                + "id=" + id
-                + ", code=" + code
-                + ", name='" + name + '\''
-                + ", version=" + version
-                + ", releaseState=" + releaseState
-                + ", projectCode=" + projectCode
-                + ", description='" + description + '\''
-                + ", globalParams='" + globalParams + '\''
-                + ", globalParamList=" + globalParamList
-                + ", globalParamMap=" + globalParamMap
-                + ", createTime=" + createTime
-                + ", updateTime=" + updateTime
-                + ", flag=" + flag
-                + ", userId=" + userId
-                + ", userName='" + userName + '\''
-                + ", projectName='" + projectName + '\''
-                + ", locations='" + locations + '\''
-                + ", scheduleReleaseState=" + scheduleReleaseState
-                + ", timeout=" + timeout
-                + ", tenantId=" + tenantId
-                + ", tenantCode='" + tenantCode + '\''
-                + ", modifyBy='" + modifyBy + '\''
-                + ", warningGroupId=" + warningGroupId
-                + '}';
-    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
index 70a324f298..bef2630d8f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
@@ -21,9 +21,9 @@ import org.apache.dolphinscheduler.common.enums.ConditionType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.util.Date;
-import java.util.Objects;
 
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
@@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 @Data
+@NoArgsConstructor
 @TableName("t_ds_process_task_relation")
 public class ProcessTaskRelation {
 
@@ -103,9 +104,6 @@ public class ProcessTaskRelation {
      */
     private Date updateTime;
 
-    public ProcessTaskRelation() {
-    }
-
     public ProcessTaskRelation(String name,
                                int processDefinitionVersion,
                                long projectCode,
@@ -132,28 +130,4 @@ public class ProcessTaskRelation {
         this.updateTime = updateTime;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        ProcessTaskRelation that = (ProcessTaskRelation) o;
-        return processDefinitionVersion == that.processDefinitionVersion
-                && projectCode == that.projectCode
-                && processDefinitionCode == that.processDefinitionCode
-                && preTaskCode == that.preTaskCode
-                && preTaskVersion == that.preTaskVersion
-                && postTaskCode == that.postTaskCode
-                && postTaskVersion == that.postTaskVersion
-                && Objects.equals(name, that.name);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(name, processDefinitionVersion, projectCode, processDefinitionCode, preTaskCode,
-                preTaskVersion, postTaskCode, postTaskVersion);
-    }
 }
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
index 014a2a2c3c..0c4702517e 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
@@ -25,14 +25,13 @@ import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
@@ -50,15 +49,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
     @Autowired
     private ProcessInstanceMapper processInstanceMapper;
 
-    @Before
-    public void before() {
-        ProcessInstance processInstance = new ProcessInstance();
-        processInstance.setWarningGroupId(0);
-        processInstance.setCommandParam("");
-        processInstance.setProcessDefinitionCode(1L);
-        processInstanceMapper.insert(processInstance);
-    }
-
     /**
      * insert
      *
@@ -191,7 +181,7 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
         task2.setFlag(Flag.NO);
         taskInstanceMapper.updateById(task2);
         List<TaskInstance> taskInstances1 = taskInstanceMapper.findValidTaskListByProcessId(task.getProcessInstanceId(),
-            Flag.NO);
+                Flag.NO);
 
         taskInstanceMapper.deleteById(task2.getId());
         taskInstanceMapper.deleteById(task.getId());
@@ -377,18 +367,17 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
 
         Page<TaskInstance> page = new Page(1, 3);
         IPage<TaskInstance> taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
-            page,
-            definition.getProjectCode(),
-            task.getProcessInstanceId(),
-            "",
-            "",
-            "",
-            0,
-            new int[0],
-            "",
-            TaskExecuteType.BATCH,
-            null, null
-        );
+                page,
+                definition.getProjectCode(),
+                task.getProcessInstanceId(),
+                "",
+                "",
+                "",
+                0,
+                new int[0],
+                "",
+                TaskExecuteType.BATCH,
+                null, null);
         processInstanceMapper.deleteById(processInstance.getId());
         taskInstanceMapper.deleteById(task.getId());
         processDefinitionMapper.deleteById(definition.getId());
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 513f02f161..6c0bdb6d99 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -151,6 +151,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -2490,20 +2491,15 @@ public class ProcessServiceImpl implements ProcessService {
             taskDefinitionLog.setOperator(operator.getId());
             taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
             if (taskDefinitionLog.getCode() == 0) {
-                try {
-                    taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
-                } catch (CodeGenerateException e) {
-                    logger.error("Task code get error, ", e);
-                    return Constants.DEFINITION_FAILURE;
-                }
+                taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
             }
             if (taskDefinitionLog.getVersion() == 0) {
                 // init first version
                 taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
             }
 
-            TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
-                    .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
+            TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+                    taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
             if (definitionCodeAndVersion == null) {
                 taskDefinitionLog.setUserId(operator.getId());
                 taskDefinitionLog.setCreateTime(now);
@@ -2520,39 +2516,33 @@ public class ProcessServiceImpl implements ProcessService {
             taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
             updateTaskDefinitionLogs.add(taskDefinitionLog);
         }
-        int insertResult = 0;
-        int updateResult = 0;
-        if (!updateTaskDefinitionLogs.isEmpty()) {
-            List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(updateTaskDefinitionLogs
-                    .stream().map(TaskDefinition::getCode).distinct().collect(Collectors.toList()));
+        if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) {
+            List<Long> taskDefinitionCodes = updateTaskDefinitionLogs
+                    .stream()
+                    .map(TaskDefinition::getCode)
+                    .distinct()
+                    .collect(Collectors.toList());
+            Map<Long, TaskDefinition> taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes)
+                    .stream()
+                    .collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
             for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
-                TaskDefinition task = null;
-                for (TaskDefinition taskDefinition : taskDefinitions) {
-                    if (taskDefinitionToUpdate.getCode() == taskDefinition.getCode()) {
-                        task = taskDefinition;
-                        break;
-                    }
-                }
+                TaskDefinition task = taskDefinitionMap.get(taskDefinitionToUpdate.getCode());
                 if (task == null) {
                     newTaskDefinitionLogs.add(taskDefinitionToUpdate);
-                } else {
-                    insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
-                    if (Boolean.TRUE.equals(syncDefine)) {
-                        taskDefinitionToUpdate.setId(task.getId());
-                        updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate);
-                    } else {
-                        updateResult++;
-                    }
                 }
             }
         }
-        if (!newTaskDefinitionLogs.isEmpty()) {
-            insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
-            if (Boolean.TRUE.equals(syncDefine)) {
-                updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
-            } else {
-                updateResult += newTaskDefinitionLogs.size();
-            }
+
+        // for each taskDefinitionLog, we will insert a new version into db
+        // and update the origin one if exist
+        int updateResult = updateTaskDefinitionLogs.size();
+        int insertResult = newTaskDefinitionLogs.size();
+        if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+            insertResult = taskDefinitionLogMapper.batchInsert(taskDefinitionLogs);
+        }
+
+        if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) {
+            updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
         }
         return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS;
     }
@@ -2573,10 +2563,8 @@ public class ProcessServiceImpl implements ProcessService {
                         : ReleaseState.OFFLINE);
         processDefinitionLog.setOperator(operator.getId());
         processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
-        int insertLog = processDefineLogMapper.updateById(processDefinitionLog);
-        if (insertLog == 0) {
-            insertLog = processDefineLogMapper.insert(processDefinitionLog);
-        }
+        processDefinitionLog.setId(null);
+        int insertLog = processDefineLogMapper.insert(processDefinitionLog);
         int result = 1;
         if (Boolean.TRUE.equals(syncDefine)) {
             if (processDefinition.getId() == null) {
@@ -2603,7 +2591,8 @@ public class ProcessServiceImpl implements ProcessService {
         }
         Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
         if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
-            taskDefinitionLogMap = taskDefinitionLogs.stream()
+            taskDefinitionLogMap = taskDefinitionLogs
+                    .stream()
                     .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
         }
         Date now = new Date();


[dolphinscheduler] 04/04: fix workflow keep running when task fail (#11930)

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 780a509f673359c0a5c408e4f20cfc35fea024d3
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Thu Sep 15 09:06:10 2022 +0800

    fix workflow keep running when task fail (#11930)
---
 .../server/master/runner/WorkflowExecuteRunnable.java     | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 9e90b8c8a0..9d6eef3259 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -116,6 +116,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * Workflow execute task, used to execute a workflow instance.
@@ -180,9 +181,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
 
     /**
-     * depend failed task map, taskCode as key, taskId as value
+     * depend failed task set
      */
-    private final Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
+    private final Set<Long> dependFailedTaskSet = Sets.newConcurrentHashSet();
 
     /**
      * forbidden task map, code as key
@@ -804,7 +805,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
         taskFailedSubmit = false;
         activeTaskProcessorMaps.clear();
-        dependFailedTaskMap.clear();
+        dependFailedTaskSet.clear();
         completeTaskMap.clear();
         errorTaskMap.clear();
 
@@ -904,8 +905,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 }
             }
         }
-        logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}",
-                dependFailedTaskMap,
+        logger.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}",
+                dependFailedTaskSet,
                 completeTaskMap,
                 errorTaskMap);
     }
@@ -1484,7 +1485,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         if (this.errorTaskMap.size() > 0) {
             return true;
         }
-        return this.dependFailedTaskMap.size() > 0;
+        return this.dependFailedTaskSet.size() > 0;
     }
 
     /**
@@ -1835,7 +1836,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 }
             } else if (DependResult.FAILED == dependResult) {
                 // if the dependency fails, the current node is not submitted and the state changes to failure.
-                dependFailedTaskMap.put(task.getTaskCode(), task.getId());
+                dependFailedTaskSet.add(task.getTaskCode());
                 removeTaskFromStandbyList(task);
                 logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(),
                         dependResult);


[dolphinscheduler] 01/04: [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit d7f40b19b59e0c79a2f5c1cff0a66b49f4824d0d
Author: Stalary <st...@163.com>
AuthorDate: Thu Sep 8 15:08:10 2022 +0800

    [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)
    
    * FIX: dependent
    
    * FIX: version
    
    * MOD: for review
---
 .../api/service/impl/ExecutorServiceImpl.java           |  8 +++++---
 .../dao/entity/DependentProcessDefinition.java          | 17 +++++++++++++++--
 .../dao/mapper/WorkFlowLineageMapper.xml                |  1 +
 3 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 5755362d99..174da3f10c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -909,6 +909,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
             // and causing duplicate when clone it.
             dependentCommand.setId(null);
             dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
+            dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
             dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
             Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());
             cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
@@ -929,7 +930,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
 
         return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle,
-                workerGroup);
+                workerGroup, processDefinitionCode);
     }
 
     /**
@@ -940,7 +941,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(
                                                                                   List<DependentProcessDefinition> dependentProcessDefinitionList,
                                                                                   CycleEnum processDefinitionCycle,
-                                                                                  String workerGroup) {
+                                                                                  String workerGroup,
+                                                                                  long upstreamProcessDefinitionCode) {
         List<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<>();
 
         List<Long> processDefinitionCodeList =
@@ -951,7 +953,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
 
         for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
-            if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) {
+            if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) {
                 if (processDefinitionWorkerGroupMap
                         .get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
                     dependentProcessDefinition.setWorkerGroup(workerGroup);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
index 9de57dff33..87bb3d4234 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
@@ -41,6 +41,11 @@ public class DependentProcessDefinition {
      */
     private String processDefinitionName;
 
+    /**
+     * process definition version
+     **/
+    private int processDefinitionVersion;
+
     /**
      * task definition name
      */
@@ -60,14 +65,14 @@ public class DependentProcessDefinition {
      * get dependent cycle
      * @return CycleEnum
      */
-    public CycleEnum getDependentCycle() {
+    public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) {
         DependentParameters dependentParameters = this.getDependentParameters();
         List<DependentTaskModel> dependentTaskModelList = dependentParameters.getDependTaskList();
 
         for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
             List<DependentItem> dependentItemList = dependentTaskModel.getDependItemList();
             for (DependentItem dependentItem : dependentItemList) {
-                if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) {
+                if (upstreamProcessDefinitionCode == dependentItem.getDefinitionCode()) {
                     return cycle2CycleEnum(dependentItem.getCycle());
                 }
             }
@@ -122,6 +127,14 @@ public class DependentProcessDefinition {
         this.processDefinitionCode = code;
     }
 
+    public int getProcessDefinitionVersion() {
+        return processDefinitionVersion;
+    }
+
+    public void setProcessDefinitionVersion(int processDefinitionVersion) {
+        this.processDefinitionVersion = processDefinitionVersion;
+    }
+
     public long getTaskDefinitionCode() {
         return this.taskDefinitionCode;
     }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index b17499bb60..2689b6d50f 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -149,6 +149,7 @@
         SELECT
         c.code AS process_definition_code
         ,c.name AS process_definition_name
+        ,c.version as process_definition_version
         ,a.code AS task_definition_code
         ,a.task_params
         FROM