You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/22 01:52:39 UTC
[dolphinscheduler] branch dev updated: Fix insert command error due to the id is not null (#12092)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new fba5a8eaa0 Fix insert command error due to the id is not null (#12092)
fba5a8eaa0 is described below
commit fba5a8eaa0945f399733275341dfdcec11ab6b3d
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Sep 22 09:52:32 2022 +0800
Fix insert command error due to the id is not null (#12092)
---
.../api/service/impl/ExecutorServiceImpl.java | 141 ++++++++++++++-------
.../service/impl/ProcessDefinitionServiceImpl.java | 135 ++++++++++++++------
.../service/process/ProcessServiceImpl.java | 1 +
3 files changed, 193 insertions(+), 84 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 7524a0c15f..32de874b1d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -177,7 +177,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Priority processInstancePriority, String workerGroup,
Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
- int dryRun, int testFlag, ComplementDependentMode complementDependentMode) {
+ int dryRun, int testFlag,
+ ComplementDependentMode complementDependentMode) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
@@ -201,7 +202,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
if (!checkTenantSuitable(processDefinition)) {
- logger.error("There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
+ logger.error(
+ "There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
processDefinition.getCode(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
return result;
@@ -224,15 +226,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
startNodeList,
cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
workerGroup,
- environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, complementDependentMode);
+ environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag,
+ complementDependentMode);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
- logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", processDefinition.getCode(), create);
+ logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.",
+ processDefinition.getCode(), create);
putMsg(result, Status.SUCCESS);
} else {
- logger.error("Start process instance failed because create command error, processDefinitionCode:{}.", processDefinition.getCode());
+ logger.error("Start process instance failed because create command error, processDefinitionCode:{}.",
+ processDefinition.getCode());
putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
}
return result;
@@ -295,15 +300,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Map<String, Object> result = new HashMap<>();
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
// check process definition exists
- logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefineCode);
+ logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
+ processDefineCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
// check process definition online
- logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.", ReleaseState.ONLINE.getDescp(), processDefineCode, version);
+ logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.",
+ ReleaseState.ONLINE.getDescp(), processDefineCode, version);
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version);
} else if (!checkSubProcessDefinitionValid(processDefinition)) {
// check sub process definition online
- logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.", ReleaseState.ONLINE.getDescp(), processDefineCode);
+ logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.",
+ ReleaseState.ONLINE.getDescp(), processDefineCode);
putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
} else {
result.put(Constants.STATUS, Status.SUCCESS);
@@ -398,7 +406,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
if (!checkTenantSuitable(processDefinition)) {
- logger.error("There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
+ logger.error(
+ "There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
}
@@ -418,19 +427,23 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
switch (executeType) {
case REPEAT_RUNNING:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
- processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams, processInstance.getTestFlag());
+ processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams,
+ processInstance.getTestFlag());
break;
case RECOVER_SUSPENDED_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
- processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams, processInstance.getTestFlag());
+ processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams,
+ processInstance.getTestFlag());
break;
case START_FAILURE_TASK_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
- processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams, processInstance.getTestFlag());
+ processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams,
+ processInstance.getTestFlag());
break;
case STOP:
if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP) {
- logger.warn("Process instance status is already {}, processInstanceName:{}.", WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
+ logger.warn("Process instance status is already {}, processInstanceName:{}.",
+ WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
processInstance.getState());
} else {
@@ -441,7 +454,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break;
case PAUSE:
if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
- logger.warn("Process instance status is already {}, processInstanceName:{}.", WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
+ logger.warn("Process instance status is already {}, processInstanceName:{}.",
+ WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
processInstance.getState());
} else {
@@ -450,7 +464,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
break;
default:
- logger.warn("Unknown execute type for process instance, processInstanceId:{}.", processInstance.getId());
+ logger.warn("Unknown execute type for process instance, processInstanceId:{}.",
+ processInstance.getId());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
break;
@@ -465,7 +480,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// check process instance exist
ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
if (processInstance == null) {
- logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
+ logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.",
+ taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
return result;
}
@@ -558,7 +574,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether the process is normal
if (update > 0) {
- logger.info("Process instance state is updated to {} in database, processInstanceName:{}.", executionStatus.getDesc(), processInstance.getName());
+ logger.info("Process instance state is updated to {} in database, processInstanceName:{}.",
+ executionStatus.getDesc(), processInstance.getName());
// directly send the process instance state change event to target master, not guarantee the event send
// success
WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand(
@@ -607,7 +624,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @return insert result code
*/
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode,
- int processVersion, CommandType commandType, String startParams, int testFlag) {
+ int processVersion, CommandType commandType, String startParams,
+ int testFlag) {
Map<String, Object> result = new HashMap<>();
// To add startParams only when repeat running is needed
@@ -626,7 +644,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setProcessInstanceId(instanceId);
command.setTestFlag(testFlag);
if (!processService.verifyIsNeedCreateCommand(command)) {
- logger.warn("Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
+ logger.warn(
+ "Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
processDefinitionCode, processVersion, instanceId);
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
return result;
@@ -640,8 +659,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion);
putMsg(result, Status.SUCCESS);
} else {
- logger.error("Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
- command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion, instanceId);
+ logger.error(
+ "Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion,
+ instanceId);
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
@@ -676,7 +697,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* if there is no online process, exit directly
*/
if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) {
- logger.warn("Subprocess definition {} of process definition {} is not {}.", processDefinitionTmp.getName(),
+ logger.warn("Subprocess definition {} of process definition {} is not {}.",
+ processDefinitionTmp.getName(),
processDefinition.getName(), ReleaseState.ONLINE.getDescp());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName());
return result;
@@ -759,14 +781,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether to complement
if (commandType == CommandType.COMPLEMENT_DATA) {
if (schedule == null || StringUtils.isEmpty(schedule)) {
- logger.error("Create {} type command error because parameter schedule is invalid.", command.getCommandType().getDescp());
+ logger.error("Create {} type command error because parameter schedule is invalid.",
+ command.getCommandType().getDescp());
return 0;
}
if (!isValidateScheduleTime(schedule)) {
return 0;
}
try {
- logger.info("Start to create {} command, processDefinitionCode:{}.", command.getCommandType().getDescp(), processDefineCode);
+ logger.info("Start to create {} command, processDefinitionCode:{}.",
+ command.getCommandType().getDescp(), processDefineCode);
return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber,
complementDependentMode);
} catch (CronParseException cronParseException) {
@@ -811,16 +835,19 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
switch (runMode) {
case RUN_MODE_SERIAL: {
- logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (StringUtils.isNotEmpty(dateList)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
createCount = processService.createCommand(command);
if (createCount > 0)
- logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ logger.info("Create {} command complete, processDefinitionCode:{}",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
- logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ logger.error("Create {} command error, processDefinitionCode:{}",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
if (startDate != null && endDate != null) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
@@ -829,37 +856,46 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
logger.info("Creating command, commandInfo:{}.", command);
createCount = processService.createCommand(command);
if (createCount > 0)
- logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ logger.info("Create {} command complete, processDefinitionCode:{}",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
- logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ logger.error("Create {} command error, processDefinitionCode:{}",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode());
// dependent process definition
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
- logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
+ logger.info(
+ "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
+ command.getProcessDefinitionCode());
} else {
- logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
+ logger.info(
+ "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
+ command.getProcessDefinitionCode());
dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
}
}
break;
}
case RUN_MODE_PARALLEL: {
- logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (startDate != null && endDate != null) {
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode());
- List<ZonedDateTime> listDate = new ArrayList<>(
- CronUtils.getSelfFireDateList(DateUtils.stringToZoneDateTime(startDate),
- DateUtils.stringToZoneDateTime(endDate), schedules));
+ List<ZonedDateTime> listDate = CronUtils.getSelfFireDateList(
+ DateUtils.stringToZoneDateTime(startDate),
+ DateUtils.stringToZoneDateTime(endDate),
+ schedules);
int listDateSize = listDate.size();
createCount = listDate.size();
if (!CollectionUtils.isEmpty(listDate)) {
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
createCount = Math.min(createCount, expectedParallelismNumber);
}
- logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", createCount);
+ logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
+ createCount);
// Distribute the number of tasks equally to each command.
// The last command with insufficient quantity will be assigned to the remaining tasks.
@@ -886,13 +922,19 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
if (processService.createCommand(command) > 0)
- logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ logger.info("Create {} command complete, processDefinitionCode:{}",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
- logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ logger.error("Create {} command error, processDefinitionCode:{}",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
- logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
+ logger.info(
+ "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
+ command.getProcessDefinitionCode());
} else {
- logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
+ logger.info(
+ "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
+ command.getProcessDefinitionCode());
dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
}
@@ -906,15 +948,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
createCount = Math.min(createCount, expectedParallelismNumber);
}
- logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", createCount);
+ logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
+ createCount);
for (List<String> stringDate : Lists.partition(listDate, createCount)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
if (processService.createCommand(command) > 0)
- logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ logger.info("Create {} command complete, processDefinitionCode:{}",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
- logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ logger.error("Create {} command error, processDefinitionCode:{}",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
}
}
@@ -1036,7 +1081,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return false;
}
if (start.isAfter(end)) {
- logger.error("Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.", start, end);
+ logger.error(
+ "Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.",
+ start, end);
return false;
}
} catch (Exception ex) {
@@ -1078,7 +1125,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
org.apache.dolphinscheduler.remote.command.Command command =
stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
if (command == null) {
- logger.error("Query executing process instance from master error, processInstanceId:{}.", processInstanceId);
+ logger.error("Query executing process instance from master error, processInstanceId:{}.",
+ processInstanceId);
return null;
}
WorkflowExecutingDataResponseCommand responseCommand =
@@ -1126,7 +1174,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
logger.info("Send task execute start command complete, response is {}.", response);
putMsg(result, Status.SUCCESS);
} else {
- logger.error("Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.",
+ logger.error(
+ "Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.",
projectCode, taskDefinitionCode, taskDefinitionVersion);
putMsg(result, Status.START_TASK_INSTANCE_ERROR);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 6a172d4936..c3569a5e00 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -117,7 +117,19 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -299,7 +311,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.error("Save process definition error, processCode:{}.", processDefinition.getCode());
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
} else
- logger.info("Save process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion);
+ logger.info("Save process definition complete, processCode:{}, processVersion:{}.",
+ processDefinition.getCode(), insertVersion);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(),
insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
@@ -692,7 +705,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.info("The task has not changed, so skip");
}
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
- logger.error("Update task definitions error, projectCode:{}, processCode:{}.", processDefinition.getProjectCode(), processDefinition.getCode());
+ logger.error("Update task definitions error, projectCode:{}, processCode:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode());
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
@@ -728,13 +742,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} else
- logger.info("Update process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion);
+ logger.info("Update process definition complete, processCode:{}, processVersion:{}.",
+ processDefinition.getCode(), insertVersion);
taskUsedInOtherTaskValid(processDefinition, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
- logger.info("Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+ logger.info(
+ "Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@@ -746,7 +762,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
} else {
- logger.info("Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
+ logger.info(
+ "Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@@ -763,7 +780,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return true if process definition name not exists, otherwise false
*/
@Override
- public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name, long processDefinitionCode) {
+ public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name,
+ long processDefinitionCode) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
@@ -806,7 +824,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessInstance> processInstances = processInstanceService
.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
- logger.warn("Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}",
+ logger.warn(
+ "Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}",
processInstances.size(), processDefinition.getCode());
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, processInstances.size());
}
@@ -819,7 +838,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
.map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(),
task.getTaskName()))
.collect(Collectors.joining(Constants.COMMA));
- logger.warn("Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}",
+ logger.warn(
+ "Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}",
taskDepDetail, processDefinition.getCode());
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail);
}
@@ -852,7 +872,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// Determine if the login user is the owner of the process definition
if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
- logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.", loginUser.getId(), code);
+ logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.",
+ loginUser.getId(), code);
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -865,13 +886,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) {
int delete = scheduleMapper.deleteById(scheduleObj.getId());
if (delete == 0) {
- logger.error("Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.", code, scheduleObj.getId());
+ logger.error(
+ "Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.",
+ code, scheduleObj.getId());
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
}
if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) {
- logger.warn("Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.",
+ logger.warn(
+ "Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.",
ReleaseState.ONLINE.getDescp(), processDefinition.getCode(), scheduleObj.getId());
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId());
return result;
@@ -885,7 +909,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (deleteRelation == 0) {
- logger.warn("The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.", code);
+ logger.warn(
+ "The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.",
+ code);
}
deleteOtherRelation(project, result, processDefinition);
putMsg(result, Status.SUCCESS);
@@ -936,24 +962,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
- logger.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
+ logger.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode,
+ code);
break;
case OFFLINE:
processDefinition.setReleaseState(releaseState);
int updateProcess = processDefinitionMapper.updateById(processDefinition);
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code);
if (updateProcess > 0) {
- logger.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
+ logger.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.",
+ projectCode, code);
if (schedule != null) {
// set status
schedule.setReleaseState(releaseState);
int updateSchedule = scheduleMapper.updateById(schedule);
if (updateSchedule == 0) {
- logger.error("Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId());
+ logger.error(
+ "Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
+ projectCode, code, schedule.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
} else
- logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId());
+ logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
+ projectCode, code, schedule.getId());
schedulerService.deleteSchedule(project.getId(), schedule.getId());
}
}
@@ -1321,7 +1352,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
try {
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
- logger.error("Save process definition error because generate process definition code error, projectCode:{}.", projectCode, e);
+ logger.error(
+ "Save process definition error because generate process definition code error, projectCode:{}.",
+ projectCode, e);
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
return false;
}
@@ -1344,7 +1377,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskCodeMap.put(taskDefinitionLog.getCode(), code);
taskDefinitionLog.setCode(code);
} catch (CodeGenerateException e) {
- logger.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}", projectCode, processDefinition.getCode(), e);
+ logger.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}",
+ projectCode, processDefinition.getCode(), e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return false;
}
@@ -1353,7 +1387,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList);
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
if ((logInsert & insert) == 0) {
- logger.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode, processDefinition.getCode());
+ logger.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode,
+ processDefinition.getCode());
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
@@ -1396,7 +1431,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(createDagResult, Status.SUCCESS);
} else {
result.putAll(createDagResult);
- logger.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
+ logger.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode,
+ processDefinition.getCode());
throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
}
@@ -1409,13 +1445,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
schedule.setUpdateTime(now);
int scheduleInsert = scheduleMapper.insert(schedule);
if (0 == scheduleInsert) {
- logger.error("Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
+ logger.error(
+ "Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.",
+ projectCode, processDefinition.getCode());
putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
}
}
- logger.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
+ logger.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode,
+ processDefinition.getCode());
return true;
}
@@ -1992,7 +2031,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
- processDefinition.setId(0);
+ processDefinition.setId(null);
processDefinition.setUserId(loginUser.getId());
processDefinition.setName(getNewName(processDefinition.getName(), COPY_SUFFIX));
final Date date = new Date();
@@ -2026,7 +2065,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs,
otherParamsJson));
} catch (Exception e) {
- logger.error("Copy process definition error, processDefinitionCode from {} to {}.", oldProcessDefinitionCode, processDefinition.getCode(), e);
+ logger.error("Copy process definition error, processDefinitionCode from {} to {}.",
+ oldProcessDefinitionCode, processDefinition.getCode(), e);
putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR);
}
@@ -2036,7 +2076,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null,
Lists.newArrayList(), otherParamsJson));
} catch (Exception e) {
- logger.error("Move process definition error, processDefinitionCode:{}.", processDefinition.getCode(), e);
+ logger.error("Move process definition error, processDefinitionCode:{}.",
+ processDefinition.getCode(), e);
putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR);
}
@@ -2092,7 +2133,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (Objects.isNull(processDefinition) || projectCode != processDefinition.getProjectCode()) {
- logger.error("Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
+ logger.error(
+ "Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.",
+ projectCode, code);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code);
return result;
}
@@ -2100,18 +2143,23 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinitionLog processDefinitionLog =
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
if (Objects.isNull(processDefinitionLog)) {
- logger.error("Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
+ logger.error(
+ "Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.",
+ projectCode, code, version);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR,
processDefinition.getCode(), version);
return result;
}
int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
if (switchVersion <= 0) {
- logger.error("Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
+ logger.error(
+ "Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.",
+ projectCode, code, version);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
}
- logger.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
+ logger.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
+ projectCode, code, version);
putMsg(result, Status.SUCCESS);
return result;
}
@@ -2130,16 +2178,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (!failedProcessList.isEmpty()) {
String failedProcess = String.join(",", failedProcessList);
if (isCopy) {
- logger.error("Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
+ logger.error(
+ "Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
srcProjectCode, targetProjectCode, failedProcess);
putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess);
} else {
- logger.error("Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
+ logger.error(
+ "Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
srcProjectCode, targetProjectCode, failedProcess);
putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess);
}
} else {
- logger.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.", isCopy?"copy":"move", srcProjectCode, targetProjectCode);
+ logger.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.",
+ isCopy ? "copy" : "move", srcProjectCode, targetProjectCode);
putMsg(result, Status.SUCCESS);
}
}
@@ -2207,7 +2258,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
} else {
if (processDefinition.getVersion() == version) {
- logger.warn("Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.",
+ logger.warn(
+ "Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
return result;
@@ -2215,12 +2267,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int deleteLog = processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, version);
int deleteRelationLog = processTaskRelationLogMapper.deleteByCode(code, version);
if (deleteLog == 0 || deleteRelationLog == 0) {
- logger.error("Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
+ logger.error(
+ "Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.",
+ projectCode, code, version);
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
deleteOtherRelation(project, result, processDefinition);
- logger.info("Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
+ logger.info(
+ "Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
+ projectCode, code, version);
putMsg(result, Status.SUCCESS);
}
return result;
@@ -2337,7 +2393,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Date now = new Date();
scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
if (DateUtils.differSec(scheduleObj.getStartTime(), scheduleObj.getEndTime()) == 0) {
- logger.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.", processDefinition.getCode());
+ logger.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.",
+ processDefinition.getCode());
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
@@ -2569,7 +2626,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
int updateSchedule = scheduleMapper.updateById(scheduleObj);
if (updateSchedule == 0) {
- logger.error("Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, scheduleObj.getId());
+ logger.error(
+ "Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
+ projectCode, code, scheduleObj.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 09ccbd7d10..6bb38004c1 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -416,6 +416,7 @@ public class ProcessServiceImpl implements ProcessService {
commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
command.setCommandParam(JSONUtils.toJsonString(commandParams));
}
+ command.setId(null);
result = commandMapper.insert(command);
}
return result;