You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/30 09:21:04 UTC
[dolphinscheduler] branch 2.0.1-prepare updated: [cherry-pick][DS-6849] add delete command check (#7074)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.1-prepare by this push:
new 40e0058 [cherry-pick][DS-6849] add delete command check (#7074)
40e0058 is described below
commit 40e00585aa0fb0496b600788062e44a4d29a01dc
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Tue Nov 30 17:20:57 2021 +0800
[cherry-pick][DS-6849] add delete command check (#7074)
* delete command with check
* add test
Co-authored-by: caishunfeng <53...@qq.com>
---
.../service/process/ProcessService.java | 173 ++++++++++++++-------
.../service/process/ProcessServiceTest.java | 157 ++++++++++---------
2 files changed, 207 insertions(+), 123 deletions(-)
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 198af0b..a1ebe3a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -17,13 +17,29 @@
package org.apache.dolphinscheduler.service.process;
-import com.facebook.presto.jdbc.internal.guava.collect.Lists;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+
+import static java.util.stream.Collectors.toSet;
+
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.Direct;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -34,27 +50,85 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
-import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.DagData;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.Environment;
+import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
+import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
+import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
+import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-
-import static java.util.stream.Collectors.toSet;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import com.facebook.presto.jdbc.internal.guava.collect.Lists;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* process relative dao that some mappers in this.
@@ -65,10 +139,10 @@ public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
- ExecutionStatus.RUNNING_EXECUTION.ordinal(),
- ExecutionStatus.DELAY_EXECUTION.ordinal(),
- ExecutionStatus.READY_PAUSE.ordinal(),
- ExecutionStatus.READY_STOP.ordinal()};
+ ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.DELAY_EXECUTION.ordinal(),
+ ExecutionStatus.READY_PAUSE.ordinal(),
+ ExecutionStatus.READY_STOP.ordinal()};
@Autowired
private UserMapper userMapper;
@@ -136,7 +210,6 @@ public class ProcessService {
* @param logger logger
* @param host host
* @param command found command
- * @param processDefinitionCacheMaps
* @return process instance
*/
@Transactional
@@ -152,7 +225,7 @@ public class ProcessService {
processInstance.addHistoryCmd(command.getCommandType());
saveProcessInstance(processInstance);
this.setSubProcessParam(processInstance);
- this.commandMapper.deleteById(command.getId());
+ this.deleteCommandWithCheck(command.getId());
return processInstance;
}
@@ -202,10 +275,6 @@ public class ProcessService {
/**
* get command page
- *
- * @param pageSize
- * @param pageNumber
- * @return
*/
public List<Command> findCommandPage(int pageSize, int pageNumber) {
return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize);
@@ -426,21 +495,21 @@ public class ProcessService {
// process instance quit by "waiting thread" state
if (originCommand == null) {
Command command = new Command(
- CommandType.RECOVER_WAITING_THREAD,
- processInstance.getTaskDependType(),
- processInstance.getFailureStrategy(),
- processInstance.getExecutorId(),
- processInstance.getProcessDefinition().getCode(),
- JSONUtils.toJsonString(cmdParam),
- processInstance.getWarningType(),
- processInstance.getWarningGroupId(),
- processInstance.getScheduleTime(),
- processInstance.getWorkerGroup(),
- processInstance.getEnvironmentCode(),
- processInstance.getProcessInstancePriority(),
- processInstance.getDryRun(),
- processInstance.getId(),
- processInstance.getProcessDefinitionVersion()
+ CommandType.RECOVER_WAITING_THREAD,
+ processInstance.getTaskDependType(),
+ processInstance.getFailureStrategy(),
+ processInstance.getExecutorId(),
+ processInstance.getProcessDefinition().getCode(),
+ JSONUtils.toJsonString(cmdParam),
+ processInstance.getWarningType(),
+ processInstance.getWarningGroupId(),
+ processInstance.getScheduleTime(),
+ processInstance.getWorkerGroup(),
+ processInstance.getEnvironmentCode(),
+ processInstance.getProcessInstancePriority(),
+ processInstance.getDryRun(),
+ processInstance.getId(),
+ processInstance.getProcessDefinitionVersion()
);
saveCommand(command);
return;
@@ -532,10 +601,10 @@ public class ProcessService {
// curing global params
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ getCommandTypeIfComplement(processInstance, command),
+ processInstance.getScheduleTime()));
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
@@ -947,11 +1016,6 @@ public class ProcessService {
/**
* retry submit task to db
- *
- * @param taskInstance
- * @param commitRetryTimes
- * @param commitInterval
- * @return
*/
public TaskInstance submitTask(TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
@@ -1672,8 +1736,6 @@ public class ProcessService {
/**
* for show in page of taskInstance
- *
- * @param taskInstance
*/
public void changeOutParam(TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getVarPool())) {
@@ -1786,7 +1848,7 @@ public class ProcessService {
*/
public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
return taskInstanceMapper.queryByHostAndStatus(host,
- stateArray);
+ stateArray);
}
/**
@@ -2401,4 +2463,11 @@ public class ProcessService {
}
return processTaskMap;
}
+
+ private void deleteCommandWithCheck(int commandId) {
+ int delete = this.commandMapper.deleteById(commandId);
+ if (delete != 1) {
+ throw new ServiceException("delete command fail, id:" + commandId);
+ }
+ }
}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 45b1952..da71460 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+
import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.Constants;
@@ -245,7 +246,7 @@ public class ProcessServiceTest {
command.setProcessDefinitionCode(222);
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
- + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
+ + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
Assert.assertNull(processService.handleCommand(logger, host, command, processDefinitionCacheMaps));
int definitionVersion = 1;
@@ -253,10 +254,13 @@ public class ProcessServiceTest {
int processInstanceId = 222;
//there is not enough thread for this command
Command command1 = new Command();
+ command1.setId(1);
command1.setProcessDefinitionCode(definitionCode);
command1.setProcessDefinitionVersion(definitionVersion);
command1.setCommandParam("{\"ProcessInstanceId\":222}");
command1.setCommandType(CommandType.START_PROCESS);
+ Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
+
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(123);
processDefinition.setName("test");
@@ -268,36 +272,45 @@ public class ProcessServiceTest {
processInstance.setProcessDefinitionCode(definitionCode);
processInstance.setProcessDefinitionVersion(definitionVersion);
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
+ processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
Command command2 = new Command();
+ command2.setId(2);
command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command2.setProcessDefinitionCode(definitionCode);
command2.setProcessDefinitionVersion(definitionVersion);
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
command2.setProcessInstanceId(processInstanceId);
+ Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command2, processDefinitionCacheMaps));
Command command3 = new Command();
+ command3.setId(3);
command3.setProcessDefinitionCode(definitionCode);
command3.setProcessDefinitionVersion(definitionVersion);
command3.setProcessInstanceId(processInstanceId);
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+ Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
+
Assert.assertNotNull(processService.handleCommand(logger, host, command3, processDefinitionCacheMaps));
Command command4 = new Command();
+ command4.setId(4);
command4.setProcessDefinitionCode(definitionCode);
command4.setProcessDefinitionVersion(definitionVersion);
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command4.setCommandType(CommandType.REPEAT_RUNNING);
command4.setProcessInstanceId(processInstanceId);
+ Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
+
Assert.assertNotNull(processService.handleCommand(logger, host, command4, processDefinitionCacheMaps));
Command command5 = new Command();
+ command5.setId(5);
command5.setProcessDefinitionCode(definitionCode);
command5.setProcessDefinitionVersion(definitionVersion);
HashMap<String, String> startParams = new HashMap<>();
@@ -307,6 +320,8 @@ public class ProcessServiceTest {
command5.setCommandParam(JSONUtils.toJsonString(commandParams));
command5.setCommandType(CommandType.START_PROCESS);
command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
+ Mockito.when(commandMapper.deleteById(5)).thenReturn(1);
+
ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
}
@@ -389,14 +404,14 @@ public class ProcessServiceTest {
operator.setUserType(UserType.GENERAL_USER);
long projectCode = 751485690568704L;
String taskJson = "[{\"code\":751500437479424,\"name\":\"aa\",\"version\":1,\"description\":\"\",\"delayTime\":0,"
- + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"sleep 1s\\necho 11\","
- + "\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"waitStartTimeout\":{}},"
- + "\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"yarn\",\"failRetryTimes\":0,\"failRetryInterval\":1,"
- + "\"timeoutFlag\":\"OPEN\",\"timeoutNotifyStrategy\":\"FAILED\",\"timeout\":1,\"environmentCode\":751496815697920},"
- + "{\"code\":751516889636864,\"name\":\"bb\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
- + "\"localParams\":[],\"rawScript\":\"echo 22\",\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},"
- + "\"waitStartTimeout\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\","
- + "\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":0,\"delayTime\":\"0\",\"environmentCode\":-1}]";
+ + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"sleep 1s\\necho 11\","
+ + "\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"waitStartTimeout\":{}},"
+ + "\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"yarn\",\"failRetryTimes\":0,\"failRetryInterval\":1,"
+ + "\"timeoutFlag\":\"OPEN\",\"timeoutNotifyStrategy\":\"FAILED\",\"timeout\":1,\"environmentCode\":751496815697920},"
+ + "{\"code\":751516889636864,\"name\":\"bb\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
+ + "\"localParams\":[],\"rawScript\":\"echo 22\",\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},"
+ + "\"waitStartTimeout\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\","
+ + "\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":0,\"delayTime\":\"0\",\"environmentCode\":-1}]";
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskJson, TaskDefinitionLog.class);
TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
taskDefinition.setCode(751500437479424L);
@@ -487,10 +502,10 @@ public class ProcessServiceTest {
processInstance.setId(62);
taskInstance.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
taskInstance.setTaskParams("{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select id from tb_test limit 1\","
- + "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\","
- + "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}],"
- + "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
- + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}");
+ + "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\","
+ + "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}],"
+ + "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+ + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}");
processService.changeOutParam(taskInstance);
}
@@ -498,65 +513,65 @@ public class ProcessServiceTest {
public void testUpdateTaskDefinitionResources() throws Exception {
TaskDefinition taskDefinition = new TaskDefinition();
String taskParameters = "{\n"
- + " \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n"
- + " \"mainJar\": {\n"
- + " \"id\": 1\n"
- + " },\n"
- + " \"deployMode\": \"cluster\",\n"
- + " \"resourceList\": [\n"
- + " {\n"
- + " \"id\": 3\n"
- + " },\n"
- + " {\n"
- + " \"id\": 4\n"
- + " }\n"
- + " ],\n"
- + " \"localParams\": [],\n"
- + " \"driverCores\": 1,\n"
- + " \"driverMemory\": \"512M\",\n"
- + " \"numExecutors\": 2,\n"
- + " \"executorMemory\": \"2G\",\n"
- + " \"executorCores\": 2,\n"
- + " \"appName\": \"\",\n"
- + " \"mainArgs\": \"\",\n"
- + " \"others\": \"\",\n"
- + " \"programType\": \"JAVA\",\n"
- + " \"sparkVersion\": \"SPARK2\",\n"
- + " \"dependence\": {},\n"
- + " \"conditionResult\": {\n"
- + " \"successNode\": [\n"
- + " \"\"\n"
- + " ],\n"
- + " \"failedNode\": [\n"
- + " \"\"\n"
- + " ]\n"
- + " },\n"
- + " \"waitStartTimeout\": {}\n"
- + "}";
+ + " \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n"
+ + " \"mainJar\": {\n"
+ + " \"id\": 1\n"
+ + " },\n"
+ + " \"deployMode\": \"cluster\",\n"
+ + " \"resourceList\": [\n"
+ + " {\n"
+ + " \"id\": 3\n"
+ + " },\n"
+ + " {\n"
+ + " \"id\": 4\n"
+ + " }\n"
+ + " ],\n"
+ + " \"localParams\": [],\n"
+ + " \"driverCores\": 1,\n"
+ + " \"driverMemory\": \"512M\",\n"
+ + " \"numExecutors\": 2,\n"
+ + " \"executorMemory\": \"2G\",\n"
+ + " \"executorCores\": 2,\n"
+ + " \"appName\": \"\",\n"
+ + " \"mainArgs\": \"\",\n"
+ + " \"others\": \"\",\n"
+ + " \"programType\": \"JAVA\",\n"
+ + " \"sparkVersion\": \"SPARK2\",\n"
+ + " \"dependence\": {},\n"
+ + " \"conditionResult\": {\n"
+ + " \"successNode\": [\n"
+ + " \"\"\n"
+ + " ],\n"
+ + " \"failedNode\": [\n"
+ + " \"\"\n"
+ + " ]\n"
+ + " },\n"
+ + " \"waitStartTimeout\": {}\n"
+ + "}";
taskDefinition.setTaskParams(taskParameters);
Map<Integer, Resource> resourceMap =
- Stream.of(1, 3, 4)
- .map(i -> {
- Resource resource = new Resource();
- resource.setId(i);
- resource.setFileName("file" + i);
- resource.setFullName("/file" + i);
- return resource;
- })
- .collect(
- Collectors.toMap(
- Resource::getId,
- resource -> resource)
- );
+ Stream.of(1, 3, 4)
+ .map(i -> {
+ Resource resource = new Resource();
+ resource.setId(i);
+ resource.setFileName("file" + i);
+ resource.setFullName("/file" + i);
+ return resource;
+ })
+ .collect(
+ Collectors.toMap(
+ Resource::getId,
+ resource -> resource)
+ );
for (Integer integer : Arrays.asList(1, 3, 4)) {
Mockito.when(resourceMapper.selectById(integer))
- .thenReturn(resourceMap.get(integer));
+ .thenReturn(resourceMap.get(integer));
}
Whitebox.invokeMethod(processService,
- "updateTaskDefinitionResources",
- taskDefinition);
+ "updateTaskDefinitionResources",
+ taskDefinition);
String taskParams = taskDefinition.getTaskParams();
SparkParameters sparkParameters = JSONUtils.parseObject(taskParams, SparkParameters.class);
@@ -582,15 +597,15 @@ public class ProcessServiceTest {
// test if input is null
ResourceInfo resourceInfoNull = null;
ResourceInfo updatedResourceInfo1 = Whitebox.invokeMethod(processService,
- "updateResourceInfo",
- resourceInfoNull);
+ "updateResourceInfo",
+ resourceInfoNull);
Assert.assertNull(updatedResourceInfo1);
// test if resource id less than 1
ResourceInfo resourceInfoVoid = new ResourceInfo();
ResourceInfo updatedResourceInfo2 = Whitebox.invokeMethod(processService,
- "updateResourceInfo",
- resourceInfoVoid);
+ "updateResourceInfo",
+ resourceInfoVoid);
Assert.assertNull(updatedResourceInfo2);
// test normal situation
@@ -602,8 +617,8 @@ public class ProcessServiceTest {
resource.setFullName("/test.txt");
Mockito.when(resourceMapper.selectById(1)).thenReturn(resource);
ResourceInfo updatedResourceInfo3 = Whitebox.invokeMethod(processService,
- "updateResourceInfo",
- resourceInfoNormal);
+ "updateResourceInfo",
+ resourceInfoNormal);
Assert.assertEquals(1, updatedResourceInfo3.getId());
Assert.assertEquals("test.txt", updatedResourceInfo3.getRes());