You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/09/16 05:54:25 UTC
[dolphinscheduler] branch dev updated: Fix cannot save processDefinition (#11931)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d833a28b2e Fix cannot save processDefinition (#11931)
d833a28b2e is described below
commit d833a28b2e312c9becd3d356f71f124317bad7f9
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Sep 16 13:54:18 2022 +0800
Fix cannot save processDefinition (#11931)
---
.../service/impl/ProcessDefinitionServiceImpl.java | 122 ++++------
.../api/service/ProcessDefinitionServiceTest.java | 14 +-
.../common/utils/CodeGenerateUtils.java | 11 +-
.../dolphinscheduler/dao/entity/Command.java | 132 -----------
.../dao/entity/ProcessDefinition.java | 250 +--------------------
.../dao/entity/ProcessTaskRelation.java | 30 +--
.../dao/mapper/TaskInstanceMapperTest.java | 37 ++-
.../service/process/ProcessServiceImpl.java | 129 +++++------
8 files changed, 143 insertions(+), 582 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index ef0a16a816..d2e957bd58 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -261,43 +261,24 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
if (checkDescriptionLength(description)) {
- putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
- return result;
+ throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR);
}
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
- putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
- return result;
- }
- List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
- Map<String, Object> checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson);
- if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) {
- return checkTaskDefinitions;
- }
- List<ProcessTaskRelationLog> taskRelationList =
- JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
- Map<String, Object> checkRelationJson =
- checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
- if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
- return checkRelationJson;
+ throw new ServiceException(Status.PROCESS_DEFINITION_NAME_EXIST, name);
}
+ List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
+ List<ProcessTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);
int tenantId = -1;
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
- putMsg(result, Status.TENANT_NOT_EXIST);
- return result;
+ throw new ServiceException(Status.TENANT_NOT_EXIST);
}
tenantId = tenant.getId();
}
- long processDefinitionCode;
- try {
- processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
- } catch (CodeGenerateException e) {
- putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
- return result;
- }
+ long processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
ProcessDefinition processDefinition =
new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, locations, timeout, loginUser.getId(), tenantId);
@@ -317,66 +298,63 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.info("The task has not changed, so skip");
}
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
- putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion == 0) {
- putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(),
insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
- if (insertResult == Constants.EXIT_CODE_SUCCESS) {
- putMsg(result, Status.SUCCESS);
- result.put(Constants.DATA_LIST, processDefinition);
- } else {
- putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ if (insertResult != Constants.EXIT_CODE_SUCCESS) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
+
+ putMsg(result, Status.SUCCESS);
+ result.put(Constants.DATA_LIST, processDefinition);
return result;
}
- private Map<String, Object> checkTaskDefinitionList(List<TaskDefinitionLog> taskDefinitionLogs,
- String taskDefinitionJson) {
- Map<String, Object> result = new HashMap<>();
+ private List<TaskDefinitionLog> generateTaskDefinitionList(String taskDefinitionJson) {
try {
- if (taskDefinitionLogs.isEmpty()) {
- logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson);
- putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
- return result;
+ List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
+ if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
+ logger.error("Generate task definition list failed, the given taskDefinitionJson is invalided: {}",
+ taskDefinitionJson);
+ throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJson);
}
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
-
if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinitionLog.getTaskType())
.taskParams(taskDefinitionLog.getTaskParams())
.dependence(taskDefinitionLog.getDependence())
.build())) {
- logger.error("task definition {} parameter invalid", taskDefinitionLog.getName());
- putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
- return result;
+ logger.error(
+ "Generate task definition list failed, the given task definition parameter is invalided, taskName: {}, taskDefinition: {}",
+ taskDefinitionLog.getName(), taskDefinitionLog);
+ throw new ServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
}
}
- putMsg(result, Status.SUCCESS);
+ return taskDefinitionLogs;
+ } catch (ServiceException ex) {
+ throw ex;
} catch (Exception e) {
- result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
- result.put(Constants.MSG, e.getMessage());
+ logger.error("Generate task definition list failed, meet an unknown exception", e);
+ throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR);
}
- return result;
}
- private Map<String, Object> checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList,
- String taskRelationJson,
- List<TaskDefinitionLog> taskDefinitionLogs) {
- Map<String, Object> result = new HashMap<>();
+ private List<ProcessTaskRelationLog> generateTaskRelationList(String taskRelationJson,
+ List<TaskDefinitionLog> taskDefinitionLogs) {
try {
- if (taskRelationList == null || taskRelationList.isEmpty()) {
- logger.error("task relation list is null");
- putMsg(result, Status.DATA_IS_NOT_VALID, taskRelationJson);
- return result;
+ List<ProcessTaskRelationLog> taskRelationList =
+ JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
+ if (CollectionUtils.isEmpty(taskRelationList)) {
+ logger.error("Generate task relation list failed the taskRelation list is empty, taskRelationJson: {}",
+ taskRelationJson);
+ throw new ServiceException(Status.DATA_IS_NOT_VALID);
}
List<ProcessTaskRelation> processTaskRelations = taskRelationList.stream()
.map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog),
@@ -390,31 +368,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Collection<Long> codes = CollectionUtils.subtract(postTaskCodes, taskNodeCodes);
if (CollectionUtils.isNotEmpty(codes)) {
logger.error("the task code is not exist");
- putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
- StringUtils.join(codes, Constants.COMMA));
- return result;
+ throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(codes, Constants.COMMA));
}
}
if (graphHasCycle(taskNodeList)) {
logger.error("process DAG has cycle");
- putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
- return result;
+ throw new ServiceException(Status.PROCESS_NODE_HAS_CYCLE);
}
// check whether the task relation json is normal
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
if (processTaskRelationLog.getPostTaskCode() == 0) {
logger.error("the post_task_code or post_task_version can't be zero");
- putMsg(result, Status.CHECK_PROCESS_TASK_RELATION_ERROR);
- return result;
+ throw new ServiceException(Status.CHECK_PROCESS_TASK_RELATION_ERROR);
}
}
- putMsg(result, Status.SUCCESS);
+ return taskRelationList;
+ } catch (ServiceException ex) {
+ throw ex;
} catch (Exception e) {
- result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
- result.put(Constants.MSG, e.getMessage());
+ logger.error("Check task relation list error, meet an unknown exception, given taskRelationJson: {}",
+ taskRelationJson, e);
+ throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR);
}
- return result;
}
/**
@@ -620,18 +596,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
- List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
- Map<String, Object> checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson);
- if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) {
- return checkTaskDefinitions;
- }
- List<ProcessTaskRelationLog> taskRelationList =
- JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
- Map<String, Object> checkRelationJson =
- checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
- if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
- return checkRelationJson;
- }
+ List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
+ List<ProcessTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);
int tenantId = -1;
if (!Constants.DEFAULT.equals(tenantCode)) {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 05c63ab537..e4401cd71d 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -96,9 +96,6 @@ import org.springframework.mock.web.MockMultipartFile;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-/**
- * process definition service test
- */
@RunWith(MockitoJUnitRunner.class)
public class ProcessDefinitionServiceTest {
@@ -756,10 +753,13 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE))
.thenReturn(result);
- Map<String, Object> updateResult =
- processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
- "", "", "", 0, "root", null, "", null, ProcessExecutionTypeEnum.PARALLEL);
- Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS));
+ try {
+ processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
+ "", "", "", 0, "root", null, "", null, ProcessExecutionTypeEnum.PARALLEL);
+ Assert.fail();
+ } catch (ServiceException ex) {
+ Assert.assertEquals(Status.DATA_IS_NOT_VALID.getCode(), ex.getCode());
+ }
}
@Test
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
index ffea87be51..f35523b59d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
@@ -10,8 +10,9 @@ import java.util.Objects;
* Rewriting based on Twitter snowflake algorithm
*/
public class CodeGenerateUtils {
+
// start timestamp
- private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01 00:00:00
+ private static final long START_TIMESTAMP = 1609430400000L; // 2021-01-01 00:00:00
// Each machine generates 32 in the same millisecond
private static final long LOW_DIGIT_BIT = 5L;
private static final long MIDDLE_BIT = 2L;
@@ -24,11 +25,12 @@ public class CodeGenerateUtils {
private long recordMillisecond = -1L;
private static final long SYSTEM_TIMESTAMP = System.currentTimeMillis();
- private static final long SYSTEM_NANOTIME = System.nanoTime();
+ private static final long SYSTEM_NANOTIME = System.nanoTime();
private CodeGenerateUtils() throws CodeGenerateException {
try {
- this.machineHash = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1));
+ this.machineHash =
+ Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1));
} catch (UnknownHostException e) {
throw new CodeGenerateException(e.getMessage());
}
@@ -66,7 +68,8 @@ public class CodeGenerateUtils {
return SYSTEM_TIMESTAMP + (System.nanoTime() - SYSTEM_NANOTIME) / 1000000;
}
- public static class CodeGenerateException extends Exception {
+ public static class CodeGenerateException extends RuntimeException {
+
public CodeGenerateException(String message) {
super(message);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
index 12a094139a..ec9336d501 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
@@ -36,27 +36,15 @@ import com.baomidou.mybatisplus.annotation.TableName;
@TableName("t_ds_command")
public class Command {
- /**
- * id
- */
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
- /**
- * command type
- */
@TableField("command_type")
private CommandType commandType;
- /**
- * process definition code
- */
@TableField("process_definition_code")
private long processDefinitionCode;
- /**
- * executor id
- */
@TableField("executor_id")
private int executorId;
@@ -66,69 +54,36 @@ public class Command {
@TableField("command_param")
private String commandParam;
- /**
- * task depend type
- */
@TableField("task_depend_type")
private TaskDependType taskDependType;
- /**
- * failure strategy
- */
@TableField("failure_strategy")
private FailureStrategy failureStrategy;
- /**
- * warning type
- */
@TableField("warning_type")
private WarningType warningType;
- /**
- * warning group id
- */
@TableField("warning_group_id")
private Integer warningGroupId;
- /**
- * schedule time
- */
@TableField("schedule_time")
private Date scheduleTime;
- /**
- * start time
- */
@TableField("start_time")
private Date startTime;
- /**
- * process instance priority
- */
@TableField("process_instance_priority")
private Priority processInstancePriority;
- /**
- * update time
- */
@TableField("update_time")
private Date updateTime;
- /**
- * worker group
- */
@TableField("worker_group")
private String workerGroup;
- /**
- * environment code
- */
@TableField("environment_code")
private Long environmentCode;
- /**
- * dry run flag
- */
@TableField("dry_run")
private int dryRun;
@@ -180,91 +135,4 @@ public class Command {
this.processDefinitionVersion = processDefinitionVersion;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Command command = (Command) o;
-
- if (id != command.id) {
- return false;
- }
- if (processDefinitionCode != command.processDefinitionCode) {
- return false;
- }
- if (executorId != command.executorId) {
- return false;
- }
- if (workerGroup != null ? workerGroup.equals(command.workerGroup) : command.workerGroup == null) {
- return false;
- }
-
- if (environmentCode != null ? environmentCode.equals(command.environmentCode)
- : command.environmentCode == null) {
- return false;
- }
-
- if (commandType != command.commandType) {
- return false;
- }
- if (commandParam != null ? !commandParam.equals(command.commandParam) : command.commandParam != null) {
- return false;
- }
- if (taskDependType != command.taskDependType) {
- return false;
- }
- if (failureStrategy != command.failureStrategy) {
- return false;
- }
- if (warningType != command.warningType) {
- return false;
- }
- if (warningGroupId != null ? !warningGroupId.equals(command.warningGroupId) : command.warningGroupId != null) {
- return false;
- }
- if (scheduleTime != null ? !scheduleTime.equals(command.scheduleTime) : command.scheduleTime != null) {
- return false;
- }
- if (startTime != null ? !startTime.equals(command.startTime) : command.startTime != null) {
- return false;
- }
- if (processInstancePriority != command.processInstancePriority) {
- return false;
- }
- if (processInstanceId != command.processInstanceId) {
- return false;
- }
- if (processDefinitionVersion != command.getProcessDefinitionVersion()) {
- return false;
- }
- return !(updateTime != null ? !updateTime.equals(command.updateTime) : command.updateTime != null);
- }
-
- @Override
- public int hashCode() {
- int result = id;
- result = 31 * result + (commandType != null ? commandType.hashCode() : 0);
- result = 31 * result + Long.hashCode(processDefinitionCode);
- result = 31 * result + executorId;
- result = 31 * result + (commandParam != null ? commandParam.hashCode() : 0);
- result = 31 * result + (taskDependType != null ? taskDependType.hashCode() : 0);
- result = 31 * result + (failureStrategy != null ? failureStrategy.hashCode() : 0);
- result = 31 * result + (warningType != null ? warningType.hashCode() : 0);
- result = 31 * result + (warningGroupId != null ? warningGroupId.hashCode() : 0);
- result = 31 * result + (scheduleTime != null ? scheduleTime.hashCode() : 0);
- result = 31 * result + (startTime != null ? startTime.hashCode() : 0);
- result = 31 * result + (processInstancePriority != null ? processInstancePriority.hashCode() : 0);
- result = 31 * result + (updateTime != null ? updateTime.hashCode() : 0);
- result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0);
- result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0);
- result = 31 * result + dryRun;
- result = 31 * result + processInstanceId;
- result = 31 * result + processDefinitionVersion;
- return result;
- }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index 089a035373..6e7414c286 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -27,18 +27,23 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.google.common.base.Strings;
-/**
- * process definition
- */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
@TableName("t_ds_process_definition")
public class ProcessDefinition {
@@ -171,9 +176,6 @@ public class ProcessDefinition {
*/
private ProcessExecutionTypeEnum executionType;
- public ProcessDefinition() {
- }
-
public ProcessDefinition(long projectCode,
String name,
long code,
@@ -208,90 +210,6 @@ public class ProcessDefinition {
this.flag = Flag.YES;
}
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getVersion() {
- return version;
- }
-
- public void setVersion(int version) {
- this.version = version;
- }
-
- public Integer getId() {
- return id;
- }
-
- public void setId(Integer id) {
- this.id = id;
- }
-
- public ReleaseState getReleaseState() {
- return releaseState;
- }
-
- public void setReleaseState(ReleaseState releaseState) {
- this.releaseState = releaseState;
- }
-
- public Date getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(Date createTime) {
- this.createTime = createTime;
- }
-
- public Date getUpdateTime() {
- return updateTime;
- }
-
- public void setUpdateTime(Date updateTime) {
- this.updateTime = updateTime;
- }
-
- public Flag getFlag() {
- return flag;
- }
-
- public void setFlag(Flag flag) {
- this.flag = flag;
- }
-
- public int getUserId() {
- return userId;
- }
-
- public void setUserId(int userId) {
- this.userId = userId;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public String getProjectName() {
- return projectName;
- }
-
- public void setProjectName(String projectName) {
- this.projectName = projectName;
- }
-
- public String getGlobalParams() {
- return globalParams;
- }
-
public void setGlobalParams(String globalParams) {
this.globalParamList = JSONUtils.toList(globalParams, Property.class);
if (this.globalParamList == null) {
@@ -300,14 +218,6 @@ public class ProcessDefinition {
this.globalParams = globalParams;
}
- public List<Property> getGlobalParamList() {
- return globalParamList;
- }
-
- public void setGlobalParamList(List<Property> globalParamList) {
- this.globalParamList = globalParamList;
- }
-
public Map<String, String> getGlobalParamMap() {
if (globalParamMap == null && !Strings.isNullOrEmpty(globalParams)) {
List<Property> propList = JSONUtils.toList(globalParams, Property.class);
@@ -317,146 +227,4 @@ public class ProcessDefinition {
return globalParamMap;
}
- public void setGlobalParamMap(Map<String, String> globalParamMap) {
- this.globalParamMap = globalParamMap;
- }
-
- public String getLocations() {
- return locations;
- }
-
- public void setLocations(String locations) {
- this.locations = locations;
- }
-
- public ReleaseState getScheduleReleaseState() {
- return scheduleReleaseState;
- }
-
- public void setScheduleReleaseState(ReleaseState scheduleReleaseState) {
- this.scheduleReleaseState = scheduleReleaseState;
- }
-
- public int getTimeout() {
- return timeout;
- }
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- public int getTenantId() {
- return tenantId;
- }
-
- public void setTenantId(int tenantId) {
- this.tenantId = tenantId;
- }
-
- public String getTenantCode() {
- return tenantCode;
- }
-
- public void setTenantCode(String tenantCode) {
- this.tenantCode = tenantCode;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public String getModifyBy() {
- return modifyBy;
- }
-
- public void setModifyBy(String modifyBy) {
- this.modifyBy = modifyBy;
- }
-
- public long getCode() {
- return code;
- }
-
- public void setCode(long code) {
- this.code = code;
- }
-
- public long getProjectCode() {
- return projectCode;
- }
-
- public void setProjectCode(long projectCode) {
- this.projectCode = projectCode;
- }
-
- public int getWarningGroupId() {
- return warningGroupId;
- }
-
- public void setWarningGroupId(int warningGroupId) {
- this.warningGroupId = warningGroupId;
- }
-
- public ProcessExecutionTypeEnum getExecutionType() {
- return executionType;
- }
-
- public void setExecutionType(ProcessExecutionTypeEnum executionType) {
- this.executionType = executionType;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ProcessDefinition that = (ProcessDefinition) o;
- return projectCode == that.projectCode
- && userId == that.userId
- && timeout == that.timeout
- && tenantId == that.tenantId
- && Objects.equals(name, that.name)
- && releaseState == that.releaseState
- && Objects.equals(description, that.description)
- && Objects.equals(globalParams, that.globalParams)
- && flag == that.flag
- && executionType == that.executionType
- && Objects.equals(locations, that.locations);
- }
-
- @Override
- public String toString() {
- return "ProcessDefinition{"
- + "id=" + id
- + ", code=" + code
- + ", name='" + name + '\''
- + ", version=" + version
- + ", releaseState=" + releaseState
- + ", projectCode=" + projectCode
- + ", description='" + description + '\''
- + ", globalParams='" + globalParams + '\''
- + ", globalParamList=" + globalParamList
- + ", globalParamMap=" + globalParamMap
- + ", createTime=" + createTime
- + ", updateTime=" + updateTime
- + ", flag=" + flag
- + ", userId=" + userId
- + ", userName='" + userName + '\''
- + ", projectName='" + projectName + '\''
- + ", locations='" + locations + '\''
- + ", scheduleReleaseState=" + scheduleReleaseState
- + ", timeout=" + timeout
- + ", tenantId=" + tenantId
- + ", tenantCode='" + tenantCode + '\''
- + ", modifyBy='" + modifyBy + '\''
- + ", warningGroupId=" + warningGroupId
- + '}';
- }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
index 70a324f298..bef2630d8f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
@@ -21,9 +21,9 @@ import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.Date;
-import java.util.Objects;
import lombok.Data;
+import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
@@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
@Data
+@NoArgsConstructor
@TableName("t_ds_process_task_relation")
public class ProcessTaskRelation {
@@ -103,9 +104,6 @@ public class ProcessTaskRelation {
*/
private Date updateTime;
- public ProcessTaskRelation() {
- }
-
public ProcessTaskRelation(String name,
int processDefinitionVersion,
long projectCode,
@@ -132,28 +130,4 @@ public class ProcessTaskRelation {
this.updateTime = updateTime;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ProcessTaskRelation that = (ProcessTaskRelation) o;
- return processDefinitionVersion == that.processDefinitionVersion
- && projectCode == that.projectCode
- && processDefinitionCode == that.processDefinitionCode
- && preTaskCode == that.preTaskCode
- && preTaskVersion == that.preTaskVersion
- && postTaskCode == that.postTaskCode
- && postTaskVersion == that.postTaskVersion
- && Objects.equals(name, that.name);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, processDefinitionVersion, projectCode, processDefinitionCode, preTaskCode,
- preTaskVersion, postTaskCode, postTaskVersion);
- }
}
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
index 014a2a2c3c..0c4702517e 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
@@ -25,14 +25,13 @@ import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import java.util.Collections;
import java.util.Date;
import java.util.List;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -50,15 +49,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
@Autowired
private ProcessInstanceMapper processInstanceMapper;
- @Before
- public void before() {
- ProcessInstance processInstance = new ProcessInstance();
- processInstance.setWarningGroupId(0);
- processInstance.setCommandParam("");
- processInstance.setProcessDefinitionCode(1L);
- processInstanceMapper.insert(processInstance);
- }
-
/**
* insert
*
@@ -191,7 +181,7 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
task2.setFlag(Flag.NO);
taskInstanceMapper.updateById(task2);
List<TaskInstance> taskInstances1 = taskInstanceMapper.findValidTaskListByProcessId(task.getProcessInstanceId(),
- Flag.NO);
+ Flag.NO);
taskInstanceMapper.deleteById(task2.getId());
taskInstanceMapper.deleteById(task.getId());
@@ -377,18 +367,17 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
Page<TaskInstance> page = new Page(1, 3);
IPage<TaskInstance> taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
- page,
- definition.getProjectCode(),
- task.getProcessInstanceId(),
- "",
- "",
- "",
- 0,
- new int[0],
- "",
- TaskExecuteType.BATCH,
- null, null
- );
+ page,
+ definition.getProjectCode(),
+ task.getProcessInstanceId(),
+ "",
+ "",
+ "",
+ 0,
+ new int[0],
+ "",
+ TaskExecuteType.BATCH,
+ null, null);
processInstanceMapper.deleteById(processInstance.getId());
taskInstanceMapper.deleteById(task.getId());
processDefinitionMapper.deleteById(definition.getId());
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index c213346451..fa6b640ddc 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,14 +17,21 @@
package org.apache.dolphinscheduler.service.process;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import io.micrometer.core.annotation.Counted;
-import org.apache.commons.collections.CollectionUtils;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -130,13 +137,9 @@ import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -149,22 +152,24 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
-import static java.util.stream.Collectors.toSet;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import io.micrometer.core.annotation.Counted;
/**
* process relative dao that some mappers in this.
@@ -2492,20 +2497,15 @@ public class ProcessServiceImpl implements ProcessService {
taskDefinitionLog.setOperator(operator.getId());
taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
if (taskDefinitionLog.getCode() == 0) {
- try {
- taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
- } catch (CodeGenerateException e) {
- logger.error("Task code get error, ", e);
- return Constants.DEFINITION_FAILURE;
- }
+ taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
}
if (taskDefinitionLog.getVersion() == 0) {
// init first version
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
}
- TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
- .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
+ TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+ taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion == null) {
taskDefinitionLog.setUserId(operator.getId());
taskDefinitionLog.setCreateTime(now);
@@ -2522,39 +2522,33 @@ public class ProcessServiceImpl implements ProcessService {
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
updateTaskDefinitionLogs.add(taskDefinitionLog);
}
- int insertResult = 0;
- int updateResult = 0;
- if (!updateTaskDefinitionLogs.isEmpty()) {
- List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(updateTaskDefinitionLogs
- .stream().map(TaskDefinition::getCode).distinct().collect(Collectors.toList()));
+ if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) {
+ List<Long> taskDefinitionCodes = updateTaskDefinitionLogs
+ .stream()
+ .map(TaskDefinition::getCode)
+ .distinct()
+ .collect(Collectors.toList());
+ Map<Long, TaskDefinition> taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes)
+ .stream()
+ .collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
- TaskDefinition task = null;
- for (TaskDefinition taskDefinition : taskDefinitions) {
- if (taskDefinitionToUpdate.getCode() == taskDefinition.getCode()) {
- task = taskDefinition;
- break;
- }
- }
+ TaskDefinition task = taskDefinitionMap.get(taskDefinitionToUpdate.getCode());
if (task == null) {
newTaskDefinitionLogs.add(taskDefinitionToUpdate);
- } else {
- insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
- if (Boolean.TRUE.equals(syncDefine)) {
- taskDefinitionToUpdate.setId(task.getId());
- updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate);
- } else {
- updateResult++;
- }
}
}
}
- if (!newTaskDefinitionLogs.isEmpty()) {
- insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
- if (Boolean.TRUE.equals(syncDefine)) {
- updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
- } else {
- updateResult += newTaskDefinitionLogs.size();
- }
+
+ // for each taskDefinitionLog, we will insert a new version into db
+ // and update the origin one if exist
+ int updateResult = updateTaskDefinitionLogs.size();
+ int insertResult = newTaskDefinitionLogs.size();
+ if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
+ insertResult = taskDefinitionLogMapper.batchInsert(taskDefinitionLogs);
+ }
+
+ if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) {
+ updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
}
return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS;
}
@@ -2575,10 +2569,8 @@ public class ProcessServiceImpl implements ProcessService {
: ReleaseState.OFFLINE);
processDefinitionLog.setOperator(operator.getId());
processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
- int insertLog = processDefineLogMapper.updateById(processDefinitionLog);
- if (insertLog == 0) {
- insertLog = processDefineLogMapper.insert(processDefinitionLog);
- }
+ processDefinitionLog.setId(null);
+ int insertLog = processDefineLogMapper.insert(processDefinitionLog);
int result = 1;
if (Boolean.TRUE.equals(syncDefine)) {
if (processDefinition.getId() == null) {
@@ -2605,7 +2597,8 @@ public class ProcessServiceImpl implements ProcessService {
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
- taskDefinitionLogMap = taskDefinitionLogs.stream()
+ taskDefinitionLogMap = taskDefinitionLogs
+ .stream()
.collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
}
Date now = new Date();