You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/22 01:52:39 UTC

[dolphinscheduler] branch dev updated: Fix insert command error due to the id is not null (#12092)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new fba5a8eaa0 Fix insert command error due to the id is not null (#12092)
fba5a8eaa0 is described below

commit fba5a8eaa0945f399733275341dfdcec11ab6b3d
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Sep 22 09:52:32 2022 +0800

    Fix insert command error due to the id is not null (#12092)
---
 .../api/service/impl/ExecutorServiceImpl.java      | 141 ++++++++++++++-------
 .../service/impl/ProcessDefinitionServiceImpl.java | 135 ++++++++++++++------
 .../service/process/ProcessServiceImpl.java        |   1 +
 3 files changed, 193 insertions(+), 84 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 7524a0c15f..32de874b1d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -177,7 +177,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                                                    Priority processInstancePriority, String workerGroup,
                                                    Long environmentCode, Integer timeout,
                                                    Map<String, String> startParams, Integer expectedParallelismNumber,
-                                                   int dryRun, int testFlag, ComplementDependentMode complementDependentMode) {
+                                                   int dryRun, int testFlag,
+                                                   ComplementDependentMode complementDependentMode) {
         Project project = projectMapper.queryByCode(projectCode);
         // check user access for project
         Map<String, Object> result =
@@ -201,7 +202,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         }
 
         if (!checkTenantSuitable(processDefinition)) {
-            logger.error("There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
+            logger.error(
+                    "There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
                     processDefinition.getCode(), processDefinition.getName());
             putMsg(result, Status.TENANT_NOT_SUITABLE);
             return result;
@@ -224,15 +226,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                         startNodeList,
                         cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
                         workerGroup,
-                        environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, complementDependentMode);
+                        environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag,
+                        complementDependentMode);
 
         if (create > 0) {
             processDefinition.setWarningGroupId(warningGroupId);
             processDefinitionMapper.updateById(processDefinition);
-            logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", processDefinition.getCode(), create);
+            logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.",
+                    processDefinition.getCode(), create);
             putMsg(result, Status.SUCCESS);
         } else {
-            logger.error("Start process instance failed because create command error, processDefinitionCode:{}.", processDefinition.getCode());
+            logger.error("Start process instance failed because create command error, processDefinitionCode:{}.",
+                    processDefinition.getCode());
             putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
         }
         return result;
@@ -295,15 +300,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         Map<String, Object> result = new HashMap<>();
         if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
             // check process definition exists
-            logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefineCode);
+            logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
+                    processDefineCode);
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
         } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
             // check process definition online
-            logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.", ReleaseState.ONLINE.getDescp(), processDefineCode, version);
+            logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.",
+                    ReleaseState.ONLINE.getDescp(), processDefineCode, version);
             putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version);
         } else if (!checkSubProcessDefinitionValid(processDefinition)) {
             // check sub process definition online
-            logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.", ReleaseState.ONLINE.getDescp(), processDefineCode);
+            logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.",
+                    ReleaseState.ONLINE.getDescp(), processDefineCode);
             putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
         } else {
             result.put(Constants.STATUS, Status.SUCCESS);
@@ -398,7 +406,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
             return result;
         }
         if (!checkTenantSuitable(processDefinition)) {
-            logger.error("There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
+            logger.error(
+                    "There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
                     processDefinition.getId(), processDefinition.getName());
             putMsg(result, Status.TENANT_NOT_SUITABLE);
         }
@@ -418,19 +427,23 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         switch (executeType) {
             case REPEAT_RUNNING:
                 result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
-                        processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams, processInstance.getTestFlag());
+                        processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams,
+                        processInstance.getTestFlag());
                 break;
             case RECOVER_SUSPENDED_PROCESS:
                 result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
-                        processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams, processInstance.getTestFlag());
+                        processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams,
+                        processInstance.getTestFlag());
                 break;
             case START_FAILURE_TASK_PROCESS:
                 result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
-                        processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams, processInstance.getTestFlag());
+                        processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams,
+                        processInstance.getTestFlag());
                 break;
             case STOP:
                 if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP) {
-                    logger.warn("Process instance status is already {}, processInstanceName:{}.", WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
+                    logger.warn("Process instance status is already {}, processInstanceName:{}.",
+                            WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
                     putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
                             processInstance.getState());
                 } else {
@@ -441,7 +454,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 break;
             case PAUSE:
                 if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
-                    logger.warn("Process instance status is already {}, processInstanceName:{}.", WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
+                    logger.warn("Process instance status is already {}, processInstanceName:{}.",
+                            WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
                     putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
                             processInstance.getState());
                 } else {
@@ -450,7 +464,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 }
                 break;
             default:
-                logger.warn("Unknown execute type for process instance, processInstanceId:{}.", processInstance.getId());
+                logger.warn("Unknown execute type for process instance, processInstanceId:{}.",
+                        processInstance.getId());
                 putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
 
                 break;
@@ -465,7 +480,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         // check process instance exist
         ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
         if (processInstance == null) {
-            logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
+            logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.",
+                    taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
             putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
             return result;
         }
@@ -558,7 +574,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
 
         // determine whether the process is normal
         if (update > 0) {
-            logger.info("Process instance state is updated to {} in database, processInstanceName:{}.", executionStatus.getDesc(), processInstance.getName());
+            logger.info("Process instance state is updated to {} in database, processInstanceName:{}.",
+                    executionStatus.getDesc(), processInstance.getName());
             // directly send the process instance state change event to target master, not guarantee the event send
             // success
             WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand(
@@ -607,7 +624,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      * @return insert result code
      */
     private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode,
-                                              int processVersion, CommandType commandType, String startParams, int testFlag) {
+                                              int processVersion, CommandType commandType, String startParams,
+                                              int testFlag) {
         Map<String, Object> result = new HashMap<>();
 
         // To add startParams only when repeat running is needed
@@ -626,7 +644,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         command.setProcessInstanceId(instanceId);
         command.setTestFlag(testFlag);
         if (!processService.verifyIsNeedCreateCommand(command)) {
-            logger.warn("Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
+            logger.warn(
+                    "Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
                     processDefinitionCode, processVersion, instanceId);
             putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
             return result;
@@ -640,8 +659,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                     command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion);
             putMsg(result, Status.SUCCESS);
         } else {
-            logger.error("Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
-                    command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion, instanceId);
+            logger.error(
+                    "Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
+                    command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion,
+                    instanceId);
             putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
         }
 
@@ -676,7 +697,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                      * if there is no online process, exit directly
                      */
                     if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) {
-                        logger.warn("Subprocess definition {} of process definition {} is not {}.", processDefinitionTmp.getName(),
+                        logger.warn("Subprocess definition {} of process definition {} is not {}.",
+                                processDefinitionTmp.getName(),
                                 processDefinition.getName(), ReleaseState.ONLINE.getDescp());
                         putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName());
                         return result;
@@ -759,14 +781,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         // determine whether to complement
         if (commandType == CommandType.COMPLEMENT_DATA) {
             if (schedule == null || StringUtils.isEmpty(schedule)) {
-                logger.error("Create {} type command error because parameter schedule is invalid.", command.getCommandType().getDescp());
+                logger.error("Create {} type command error because parameter schedule is invalid.",
+                        command.getCommandType().getDescp());
                 return 0;
             }
             if (!isValidateScheduleTime(schedule)) {
                 return 0;
             }
             try {
-                logger.info("Start to create {} command, processDefinitionCode:{}.", command.getCommandType().getDescp(), processDefineCode);
+                logger.info("Start to create {} command, processDefinitionCode:{}.",
+                        command.getCommandType().getDescp(), processDefineCode);
                 return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber,
                         complementDependentMode);
             } catch (CronParseException cronParseException) {
@@ -811,16 +835,19 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         }
         switch (runMode) {
             case RUN_MODE_SERIAL: {
-                logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.",
+                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
                 if (StringUtils.isNotEmpty(dateList)) {
                     cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
                     command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                     logger.info("Creating command, commandInfo:{}.", command);
                     createCount = processService.createCommand(command);
                     if (createCount > 0)
-                        logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                        logger.info("Create {} command complete, processDefinitionCode:{}",
+                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
                     else
-                        logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                        logger.error("Create {} command error, processDefinitionCode:{}",
+                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
                 }
                 if (startDate != null && endDate != null) {
                     cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
@@ -829,37 +856,46 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                     logger.info("Creating command, commandInfo:{}.", command);
                     createCount = processService.createCommand(command);
                     if (createCount > 0)
-                        logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                        logger.info("Create {} command complete, processDefinitionCode:{}",
+                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
                     else
-                        logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                        logger.error("Create {} command error, processDefinitionCode:{}",
+                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
                     // dependent process definition
                     List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
                             command.getProcessDefinitionCode());
 
                     if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
-                        logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
+                        logger.info(
+                                "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
+                                command.getProcessDefinitionCode());
                     } else {
-                        logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
+                        logger.info(
+                                "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
+                                command.getProcessDefinitionCode());
                         dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
                     }
                 }
                 break;
             }
             case RUN_MODE_PARALLEL: {
-                logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.",
+                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
                 if (startDate != null && endDate != null) {
                     List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
                             command.getProcessDefinitionCode());
-                    List<ZonedDateTime> listDate = new ArrayList<>(
-                            CronUtils.getSelfFireDateList(DateUtils.stringToZoneDateTime(startDate),
-                                    DateUtils.stringToZoneDateTime(endDate), schedules));
+                    List<ZonedDateTime> listDate = CronUtils.getSelfFireDateList(
+                            DateUtils.stringToZoneDateTime(startDate),
+                            DateUtils.stringToZoneDateTime(endDate),
+                            schedules);
                     int listDateSize = listDate.size();
                     createCount = listDate.size();
                     if (!CollectionUtils.isEmpty(listDate)) {
                         if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
                             createCount = Math.min(createCount, expectedParallelismNumber);
                         }
-                        logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", createCount);
+                        logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
+                                createCount);
 
                         // Distribute the number of tasks equally to each command.
                         // The last command with insufficient quantity will be assigned to the remaining tasks.
@@ -886,13 +922,19 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                             logger.info("Creating command, commandInfo:{}.", command);
                             if (processService.createCommand(command) > 0)
-                                logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                                logger.info("Create {} command complete, processDefinitionCode:{}",
+                                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
                             else
-                                logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                                logger.error("Create {} command error, processDefinitionCode:{}",
+                                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
                             if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
-                                logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
+                                logger.info(
+                                        "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
+                                        command.getProcessDefinitionCode());
                             } else {
-                                logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
+                                logger.info(
+                                        "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
+                                        command.getProcessDefinitionCode());
                                 dependentProcessDefinitionCreateCount +=
                                         createComplementDependentCommand(schedules, command);
                             }
@@ -906,15 +948,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                         if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
                             createCount = Math.min(createCount, expectedParallelismNumber);
                         }
-                        logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", createCount);
+                        logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
+                                createCount);
                         for (List<String> stringDate : Lists.partition(listDate, createCount)) {
                             cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
                             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                             logger.info("Creating command, commandInfo:{}.", command);
                             if (processService.createCommand(command) > 0)
-                                logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                                logger.info("Create {} command complete, processDefinitionCode:{}",
+                                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
                             else
-                                logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                                logger.error("Create {} command error, processDefinitionCode:{}",
+                                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
                         }
                     }
                 }
@@ -1036,7 +1081,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                     return false;
                 }
                 if (start.isAfter(end)) {
-                    logger.error("Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.", start, end);
+                    logger.error(
+                            "Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.",
+                            start, end);
                     return false;
                 }
             } catch (Exception ex) {
@@ -1078,7 +1125,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         org.apache.dolphinscheduler.remote.command.Command command =
                 stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
         if (command == null) {
-            logger.error("Query executing process instance from master error, processInstanceId:{}.", processInstanceId);
+            logger.error("Query executing process instance from master error, processInstanceId:{}.",
+                    processInstanceId);
             return null;
         }
         WorkflowExecutingDataResponseCommand responseCommand =
@@ -1126,7 +1174,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
             logger.info("Send task execute start command complete, response is {}.", response);
             putMsg(result, Status.SUCCESS);
         } else {
-            logger.error("Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.",
+            logger.error(
+                    "Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.",
                     projectCode, taskDefinitionCode, taskDefinitionVersion);
             putMsg(result, Status.START_TASK_INSTANCE_ERROR);
         }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 6a172d4936..c3569a5e00 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -117,7 +117,19 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -299,7 +311,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             logger.error("Save process definition error, processCode:{}.", processDefinition.getCode());
             throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
         } else
-            logger.info("Save process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion);
+            logger.info("Save process definition complete, processCode:{}, processVersion:{}.",
+                    processDefinition.getCode(), insertVersion);
         int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
                 processDefinition.getCode(),
                 insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
@@ -692,7 +705,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             logger.info("The task has not changed, so skip");
         }
         if (saveTaskResult == Constants.DEFINITION_FAILURE) {
-            logger.error("Update task definitions error, projectCode:{}, processCode:{}.", processDefinition.getProjectCode(), processDefinition.getCode());
+            logger.error("Update task definitions error, projectCode:{}, processCode:{}.",
+                    processDefinition.getProjectCode(), processDefinition.getCode());
             putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
         }
@@ -728,13 +742,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
                 throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
             } else
-                logger.info("Update process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion);
+                logger.info("Update process definition complete, processCode:{}, processVersion:{}.",
+                        processDefinition.getCode(), insertVersion);
 
             taskUsedInOtherTaskValid(processDefinition, taskRelationList);
             int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
                     processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
             if (insertResult == Constants.EXIT_CODE_SUCCESS) {
-                logger.info("Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+                logger.info(
+                        "Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
                         processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
                 putMsg(result, Status.SUCCESS);
                 result.put(Constants.DATA_LIST, processDefinition);
@@ -746,7 +762,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             }
             saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
         } else {
-            logger.info("Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
+            logger.info(
+                    "Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
                     processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion());
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
@@ -763,7 +780,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @return true if process definition name not exists, otherwise false
      */
     @Override
-    public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name, long processDefinitionCode) {
+    public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name,
+                                                           long processDefinitionCode) {
         Project project = projectMapper.queryByCode(projectCode);
         // check user access for project
         Map<String, Object> result =
@@ -806,7 +824,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         List<ProcessInstance> processInstances = processInstanceService
                 .queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
         if (CollectionUtils.isNotEmpty(processInstances)) {
-            logger.warn("Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}",
+            logger.warn(
+                    "Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}",
                     processInstances.size(), processDefinition.getCode());
             throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, processInstances.size());
         }
@@ -819,7 +838,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                     .map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(),
                             task.getTaskName()))
                     .collect(Collectors.joining(Constants.COMMA));
-            logger.warn("Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}",
+            logger.warn(
+                    "Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}",
                     taskDepDetail, processDefinition.getCode());
             throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail);
         }
@@ -852,7 +872,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
 
         // Determine if the login user is the owner of the process definition
         if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
-            logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.", loginUser.getId(), code);
+            logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.",
+                    loginUser.getId(), code);
             putMsg(result, Status.USER_NO_OPERATION_PERM);
             return result;
         }
@@ -865,13 +886,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) {
                 int delete = scheduleMapper.deleteById(scheduleObj.getId());
                 if (delete == 0) {
-                    logger.error("Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.", code, scheduleObj.getId());
+                    logger.error(
+                            "Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.",
+                            code, scheduleObj.getId());
                     putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
                     throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
                 }
             }
             if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) {
-                logger.warn("Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.",
+                logger.warn(
+                        "Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.",
                         ReleaseState.ONLINE.getDescp(), processDefinition.getCode(), scheduleObj.getId());
                 putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId());
                 return result;
@@ -885,7 +909,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
         if (deleteRelation == 0) {
-            logger.warn("The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.", code);
+            logger.warn(
+                    "The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.",
+                    code);
         }
         deleteOtherRelation(project, result, processDefinition);
         putMsg(result, Status.SUCCESS);
@@ -936,24 +962,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 }
                 processDefinition.setReleaseState(releaseState);
                 processDefinitionMapper.updateById(processDefinition);
-                logger.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
+                logger.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode,
+                        code);
                 break;
             case OFFLINE:
                 processDefinition.setReleaseState(releaseState);
                 int updateProcess = processDefinitionMapper.updateById(processDefinition);
                 Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code);
                 if (updateProcess > 0) {
-                    logger.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
+                    logger.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.",
+                            projectCode, code);
                     if (schedule != null) {
                         // set status
                         schedule.setReleaseState(releaseState);
                         int updateSchedule = scheduleMapper.updateById(schedule);
                         if (updateSchedule == 0) {
-                            logger.error("Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId());
+                            logger.error(
+                                    "Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
+                                    projectCode, code, schedule.getId());
                             putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
                             throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
                         } else
-                            logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId());
+                            logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
+                                    projectCode, code, schedule.getId());
                         schedulerService.deleteSchedule(project.getId(), schedule.getId());
                     }
                 }
@@ -1321,7 +1352,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         try {
             processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
         } catch (CodeGenerateException e) {
-            logger.error("Save process definition error because generate process definition code error, projectCode:{}.", projectCode, e);
+            logger.error(
+                    "Save process definition error because generate process definition code error, projectCode:{}.",
+                    projectCode, e);
             putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
             return false;
         }
@@ -1344,7 +1377,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 taskCodeMap.put(taskDefinitionLog.getCode(), code);
                 taskDefinitionLog.setCode(code);
             } catch (CodeGenerateException e) {
-                logger.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}", projectCode, processDefinition.getCode(), e);
+                logger.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}",
+                        projectCode, processDefinition.getCode(), e);
                 putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
                 return false;
             }
@@ -1353,7 +1387,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList);
         int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
         if ((logInsert & insert) == 0) {
-            logger.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode, processDefinition.getCode());
+            logger.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode,
+                    processDefinition.getCode());
             putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
         }
@@ -1396,7 +1431,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(createDagResult, Status.SUCCESS);
         } else {
             result.putAll(createDagResult);
-            logger.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
+            logger.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode,
+                    processDefinition.getCode());
             throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
         }
 
@@ -1409,13 +1445,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             schedule.setUpdateTime(now);
             int scheduleInsert = scheduleMapper.insert(schedule);
             if (0 == scheduleInsert) {
-                logger.error("Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
+                logger.error(
+                        "Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.",
+                        projectCode, processDefinition.getCode());
                 putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
                 throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
             }
         }
 
-        logger.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
+        logger.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode,
+                processDefinition.getCode());
         return true;
     }
 
@@ -1992,7 +2031,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                     putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
                     throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
                 }
-                processDefinition.setId(0);
+                processDefinition.setId(null);
                 processDefinition.setUserId(loginUser.getId());
                 processDefinition.setName(getNewName(processDefinition.getName(), COPY_SUFFIX));
                 final Date date = new Date();
@@ -2026,7 +2065,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                     result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs,
                             otherParamsJson));
                 } catch (Exception e) {
-                    logger.error("Copy process definition error, processDefinitionCode from {} to {}.", oldProcessDefinitionCode, processDefinition.getCode(), e);
+                    logger.error("Copy process definition error, processDefinitionCode from {} to {}.",
+                            oldProcessDefinitionCode, processDefinition.getCode(), e);
                     putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR);
                     throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR);
                 }
@@ -2036,7 +2076,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                     result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null,
                             Lists.newArrayList(), otherParamsJson));
                 } catch (Exception e) {
-                    logger.error("Move process definition error, processDefinitionCode:{}.", processDefinition.getCode(), e);
+                    logger.error("Move process definition error, processDefinitionCode:{}.",
+                            processDefinition.getCode(), e);
                     putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR);
                     throw new ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR);
                 }
@@ -2092,7 +2133,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
 
         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
         if (Objects.isNull(processDefinition) || projectCode != processDefinition.getProjectCode()) {
-            logger.error("Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
+            logger.error(
+                    "Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.",
+                    projectCode, code);
             putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code);
             return result;
         }
@@ -2100,18 +2143,23 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         ProcessDefinitionLog processDefinitionLog =
                 processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
         if (Objects.isNull(processDefinitionLog)) {
-            logger.error("Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
+            logger.error(
+                    "Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.",
+                    projectCode, code, version);
             putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR,
                     processDefinition.getCode(), version);
             return result;
         }
         int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
         if (switchVersion <= 0) {
-            logger.error("Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
+            logger.error(
+                    "Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.",
+                    projectCode, code, version);
             putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
             throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
         }
-        logger.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
+        logger.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
+                projectCode, code, version);
         putMsg(result, Status.SUCCESS);
         return result;
     }
@@ -2130,16 +2178,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         if (!failedProcessList.isEmpty()) {
             String failedProcess = String.join(",", failedProcessList);
             if (isCopy) {
-                logger.error("Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
+                logger.error(
+                        "Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
                         srcProjectCode, targetProjectCode, failedProcess);
                 putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess);
             } else {
-                logger.error("Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
+                logger.error(
+                        "Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
                         srcProjectCode, targetProjectCode, failedProcess);
                 putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess);
             }
         } else {
-            logger.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.", isCopy?"copy":"move", srcProjectCode, targetProjectCode);
+            logger.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.",
+                    isCopy ? "copy" : "move", srcProjectCode, targetProjectCode);
             putMsg(result, Status.SUCCESS);
         }
     }
@@ -2207,7 +2258,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
         } else {
             if (processDefinition.getVersion() == version) {
-                logger.warn("Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.",
+                logger.warn(
+                        "Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.",
                         projectCode, code, version);
                 putMsg(result, Status.MAIN_TABLE_USING_VERSION);
                 return result;
@@ -2215,12 +2267,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             int deleteLog = processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, version);
             int deleteRelationLog = processTaskRelationLogMapper.deleteByCode(code, version);
             if (deleteLog == 0 || deleteRelationLog == 0) {
-                logger.error("Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
+                logger.error(
+                        "Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.",
+                        projectCode, code, version);
                 putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
                 throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
             }
             deleteOtherRelation(project, result, processDefinition);
-            logger.info("Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
+            logger.info(
+                    "Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
+                    projectCode, code, version);
             putMsg(result, Status.SUCCESS);
         }
         return result;
@@ -2337,7 +2393,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         Date now = new Date();
         scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
         if (DateUtils.differSec(scheduleObj.getStartTime(), scheduleObj.getEndTime()) == 0) {
-            logger.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.", processDefinition.getCode());
+            logger.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.",
+                    processDefinition.getCode());
             putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
             return result;
         }
@@ -2569,7 +2626,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                     scheduleObj.setReleaseState(ReleaseState.OFFLINE);
                     int updateSchedule = scheduleMapper.updateById(scheduleObj);
                     if (updateSchedule == 0) {
-                        logger.error("Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, scheduleObj.getId());
+                        logger.error(
+                                "Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
+                                projectCode, code, scheduleObj.getId());
                         putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
                         throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
                     }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 09ccbd7d10..6bb38004c1 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -416,6 +416,7 @@ public class ProcessServiceImpl implements ProcessService {
                 commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
                 command.setCommandParam(JSONUtils.toJsonString(commandParams));
             }
+            command.setId(null);
             result = commandMapper.insert(command);
         }
         return result;