You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/30 09:21:04 UTC

[dolphinscheduler] branch 2.0.1-prepare updated: [cherry-pick][DS-6849] add delete command check (#7074)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.1-prepare by this push:
     new 40e0058  [cherry-pick][DS-6849] add delete command check (#7074)
40e0058 is described below

commit 40e00585aa0fb0496b600788062e44a4d29a01dc
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Tue Nov 30 17:20:57 2021 +0800

    [cherry-pick][DS-6849] add delete command check (#7074)
    
    * delete command with check
    
    * add test
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../service/process/ProcessService.java            | 173 ++++++++++++++-------
 .../service/process/ProcessServiceTest.java        | 157 ++++++++++---------
 2 files changed, 207 insertions(+), 123 deletions(-)

diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 198af0b..a1ebe3a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -17,13 +17,29 @@
 
 package org.apache.dolphinscheduler.service.process;
 
-import com.facebook.presto.jdbc.internal.guava.collect.Lists;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+
+import static java.util.stream.Collectors.toSet;
+
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.Direct;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.DateInterval;
 import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -34,27 +50,85 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
-import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.DagData;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.Environment;
+import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
+import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
+import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
+import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-
-import static java.util.stream.Collectors.toSet;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import com.facebook.presto.jdbc.internal.guava.collect.Lists;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * process relative dao that some mappers in this.
@@ -65,10 +139,10 @@ public class ProcessService {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
-        ExecutionStatus.RUNNING_EXECUTION.ordinal(),
-        ExecutionStatus.DELAY_EXECUTION.ordinal(),
-        ExecutionStatus.READY_PAUSE.ordinal(),
-        ExecutionStatus.READY_STOP.ordinal()};
+            ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+            ExecutionStatus.DELAY_EXECUTION.ordinal(),
+            ExecutionStatus.READY_PAUSE.ordinal(),
+            ExecutionStatus.READY_STOP.ordinal()};
 
     @Autowired
     private UserMapper userMapper;
@@ -136,7 +210,6 @@ public class ProcessService {
      * @param logger logger
      * @param host host
      * @param command found command
-     * @param processDefinitionCacheMaps
      * @return process instance
      */
     @Transactional
@@ -152,7 +225,7 @@ public class ProcessService {
         processInstance.addHistoryCmd(command.getCommandType());
         saveProcessInstance(processInstance);
         this.setSubProcessParam(processInstance);
-        this.commandMapper.deleteById(command.getId());
+        this.deleteCommandWithCheck(command.getId());
         return processInstance;
     }
 
@@ -202,10 +275,6 @@ public class ProcessService {
 
     /**
      * get command page
-     *
-     * @param pageSize
-     * @param pageNumber
-     * @return
      */
     public List<Command> findCommandPage(int pageSize, int pageNumber) {
         return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize);
@@ -426,21 +495,21 @@ public class ProcessService {
         // process instance quit by "waiting thread" state
         if (originCommand == null) {
             Command command = new Command(
-                CommandType.RECOVER_WAITING_THREAD,
-                processInstance.getTaskDependType(),
-                processInstance.getFailureStrategy(),
-                processInstance.getExecutorId(),
-                processInstance.getProcessDefinition().getCode(),
-                JSONUtils.toJsonString(cmdParam),
-                processInstance.getWarningType(),
-                processInstance.getWarningGroupId(),
-                processInstance.getScheduleTime(),
-                processInstance.getWorkerGroup(),
-                processInstance.getEnvironmentCode(),
-                processInstance.getProcessInstancePriority(),
-                processInstance.getDryRun(),
-                processInstance.getId(),
-                processInstance.getProcessDefinitionVersion()
+                    CommandType.RECOVER_WAITING_THREAD,
+                    processInstance.getTaskDependType(),
+                    processInstance.getFailureStrategy(),
+                    processInstance.getExecutorId(),
+                    processInstance.getProcessDefinition().getCode(),
+                    JSONUtils.toJsonString(cmdParam),
+                    processInstance.getWarningType(),
+                    processInstance.getWarningGroupId(),
+                    processInstance.getScheduleTime(),
+                    processInstance.getWorkerGroup(),
+                    processInstance.getEnvironmentCode(),
+                    processInstance.getProcessInstancePriority(),
+                    processInstance.getDryRun(),
+                    processInstance.getId(),
+                    processInstance.getProcessDefinitionVersion()
             );
             saveCommand(command);
             return;
@@ -532,10 +601,10 @@ public class ProcessService {
 
         // curing global params
         processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-            processDefinition.getGlobalParamMap(),
-            processDefinition.getGlobalParamList(),
-            getCommandTypeIfComplement(processInstance, command),
-            processInstance.getScheduleTime()));
+                processDefinition.getGlobalParamMap(),
+                processDefinition.getGlobalParamList(),
+                getCommandTypeIfComplement(processInstance, command),
+                processInstance.getScheduleTime()));
 
         // set process instance priority
         processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
@@ -947,11 +1016,6 @@ public class ProcessService {
 
     /**
      * retry submit task to db
-     *
-     * @param taskInstance
-     * @param commitRetryTimes
-     * @param commitInterval
-     * @return
      */
     public TaskInstance submitTask(TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
 
@@ -1672,8 +1736,6 @@ public class ProcessService {
 
     /**
      * for show in page of taskInstance
-     *
-     * @param taskInstance
      */
     public void changeOutParam(TaskInstance taskInstance) {
         if (StringUtils.isEmpty(taskInstance.getVarPool())) {
@@ -1786,7 +1848,7 @@ public class ProcessService {
      */
     public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
         return taskInstanceMapper.queryByHostAndStatus(host,
-            stateArray);
+                stateArray);
     }
 
     /**
@@ -2401,4 +2463,11 @@ public class ProcessService {
         }
         return processTaskMap;
     }
+
+    private void deleteCommandWithCheck(int commandId) {
+        int delete = this.commandMapper.deleteById(commandId);
+        if (delete != 1) {
+            throw new ServiceException("delete command fail, id:" + commandId);
+        }
+    }
 }
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 45b1952..da71460 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+
 import static org.mockito.ArgumentMatchers.any;
 
 import org.apache.dolphinscheduler.common.Constants;
@@ -245,7 +246,7 @@ public class ProcessServiceTest {
         command.setProcessDefinitionCode(222);
         command.setCommandType(CommandType.REPEAT_RUNNING);
         command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
-            + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
+                + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
         Assert.assertNull(processService.handleCommand(logger, host, command, processDefinitionCacheMaps));
 
         int definitionVersion = 1;
@@ -253,10 +254,13 @@ public class ProcessServiceTest {
         int processInstanceId = 222;
         //there is not enough thread for this command
         Command command1 = new Command();
+        command1.setId(1);
         command1.setProcessDefinitionCode(definitionCode);
         command1.setProcessDefinitionVersion(definitionVersion);
         command1.setCommandParam("{\"ProcessInstanceId\":222}");
         command1.setCommandType(CommandType.START_PROCESS);
+        Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
+
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setId(123);
         processDefinition.setName("test");
@@ -268,36 +272,45 @@ public class ProcessServiceTest {
         processInstance.setProcessDefinitionCode(definitionCode);
         processInstance.setProcessDefinitionVersion(definitionVersion);
         Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
-            processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
+                processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
         Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
         Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
 
         Command command2 = new Command();
+        command2.setId(2);
         command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
         command2.setProcessDefinitionCode(definitionCode);
         command2.setProcessDefinitionVersion(definitionVersion);
         command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
         command2.setProcessInstanceId(processInstanceId);
+        Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
 
         Assert.assertNotNull(processService.handleCommand(logger, host, command2, processDefinitionCacheMaps));
 
         Command command3 = new Command();
+        command3.setId(3);
         command3.setProcessDefinitionCode(definitionCode);
         command3.setProcessDefinitionVersion(definitionVersion);
         command3.setProcessInstanceId(processInstanceId);
         command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
         command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+        Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
+
         Assert.assertNotNull(processService.handleCommand(logger, host, command3, processDefinitionCacheMaps));
 
         Command command4 = new Command();
+        command4.setId(4);
         command4.setProcessDefinitionCode(definitionCode);
         command4.setProcessDefinitionVersion(definitionVersion);
         command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
         command4.setCommandType(CommandType.REPEAT_RUNNING);
         command4.setProcessInstanceId(processInstanceId);
+        Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
+
         Assert.assertNotNull(processService.handleCommand(logger, host, command4, processDefinitionCacheMaps));
 
         Command command5 = new Command();
+        command5.setId(5);
         command5.setProcessDefinitionCode(definitionCode);
         command5.setProcessDefinitionVersion(definitionVersion);
         HashMap<String, String> startParams = new HashMap<>();
@@ -307,6 +320,8 @@ public class ProcessServiceTest {
         command5.setCommandParam(JSONUtils.toJsonString(commandParams));
         command5.setCommandType(CommandType.START_PROCESS);
         command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
+        Mockito.when(commandMapper.deleteById(5)).thenReturn(1);
+
         ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps);
         Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
     }
@@ -389,14 +404,14 @@ public class ProcessServiceTest {
         operator.setUserType(UserType.GENERAL_USER);
         long projectCode = 751485690568704L;
         String taskJson = "[{\"code\":751500437479424,\"name\":\"aa\",\"version\":1,\"description\":\"\",\"delayTime\":0,"
-            + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"sleep 1s\\necho 11\","
-            + "\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"waitStartTimeout\":{}},"
-            + "\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"yarn\",\"failRetryTimes\":0,\"failRetryInterval\":1,"
-            + "\"timeoutFlag\":\"OPEN\",\"timeoutNotifyStrategy\":\"FAILED\",\"timeout\":1,\"environmentCode\":751496815697920},"
-            + "{\"code\":751516889636864,\"name\":\"bb\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
-            + "\"localParams\":[],\"rawScript\":\"echo 22\",\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},"
-            + "\"waitStartTimeout\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\","
-            + "\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":0,\"delayTime\":\"0\",\"environmentCode\":-1}]";
+                + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"sleep 1s\\necho 11\","
+                + "\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"waitStartTimeout\":{}},"
+                + "\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"yarn\",\"failRetryTimes\":0,\"failRetryInterval\":1,"
+                + "\"timeoutFlag\":\"OPEN\",\"timeoutNotifyStrategy\":\"FAILED\",\"timeout\":1,\"environmentCode\":751496815697920},"
+                + "{\"code\":751516889636864,\"name\":\"bb\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
+                + "\"localParams\":[],\"rawScript\":\"echo 22\",\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},"
+                + "\"waitStartTimeout\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\","
+                + "\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":0,\"delayTime\":\"0\",\"environmentCode\":-1}]";
         List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskJson, TaskDefinitionLog.class);
         TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
         taskDefinition.setCode(751500437479424L);
@@ -487,10 +502,10 @@ public class ProcessServiceTest {
         processInstance.setId(62);
         taskInstance.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
         taskInstance.setTaskParams("{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select id from tb_test limit 1\","
-            + "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\","
-            + "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}],"
-            + "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
-            + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}");
+                + "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\","
+                + "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}],"
+                + "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+                + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}");
         processService.changeOutParam(taskInstance);
     }
 
@@ -498,65 +513,65 @@ public class ProcessServiceTest {
     public void testUpdateTaskDefinitionResources() throws Exception {
         TaskDefinition taskDefinition = new TaskDefinition();
         String taskParameters = "{\n"
-            + "    \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n"
-            + "    \"mainJar\": {\n"
-            + "        \"id\": 1\n"
-            + "    },\n"
-            + "    \"deployMode\": \"cluster\",\n"
-            + "    \"resourceList\": [\n"
-            + "        {\n"
-            + "            \"id\": 3\n"
-            + "        },\n"
-            + "        {\n"
-            + "            \"id\": 4\n"
-            + "        }\n"
-            + "    ],\n"
-            + "    \"localParams\": [],\n"
-            + "    \"driverCores\": 1,\n"
-            + "    \"driverMemory\": \"512M\",\n"
-            + "    \"numExecutors\": 2,\n"
-            + "    \"executorMemory\": \"2G\",\n"
-            + "    \"executorCores\": 2,\n"
-            + "    \"appName\": \"\",\n"
-            + "    \"mainArgs\": \"\",\n"
-            + "    \"others\": \"\",\n"
-            + "    \"programType\": \"JAVA\",\n"
-            + "    \"sparkVersion\": \"SPARK2\",\n"
-            + "    \"dependence\": {},\n"
-            + "    \"conditionResult\": {\n"
-            + "        \"successNode\": [\n"
-            + "            \"\"\n"
-            + "        ],\n"
-            + "        \"failedNode\": [\n"
-            + "            \"\"\n"
-            + "        ]\n"
-            + "    },\n"
-            + "    \"waitStartTimeout\": {}\n"
-            + "}";
+                + "    \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n"
+                + "    \"mainJar\": {\n"
+                + "        \"id\": 1\n"
+                + "    },\n"
+                + "    \"deployMode\": \"cluster\",\n"
+                + "    \"resourceList\": [\n"
+                + "        {\n"
+                + "            \"id\": 3\n"
+                + "        },\n"
+                + "        {\n"
+                + "            \"id\": 4\n"
+                + "        }\n"
+                + "    ],\n"
+                + "    \"localParams\": [],\n"
+                + "    \"driverCores\": 1,\n"
+                + "    \"driverMemory\": \"512M\",\n"
+                + "    \"numExecutors\": 2,\n"
+                + "    \"executorMemory\": \"2G\",\n"
+                + "    \"executorCores\": 2,\n"
+                + "    \"appName\": \"\",\n"
+                + "    \"mainArgs\": \"\",\n"
+                + "    \"others\": \"\",\n"
+                + "    \"programType\": \"JAVA\",\n"
+                + "    \"sparkVersion\": \"SPARK2\",\n"
+                + "    \"dependence\": {},\n"
+                + "    \"conditionResult\": {\n"
+                + "        \"successNode\": [\n"
+                + "            \"\"\n"
+                + "        ],\n"
+                + "        \"failedNode\": [\n"
+                + "            \"\"\n"
+                + "        ]\n"
+                + "    },\n"
+                + "    \"waitStartTimeout\": {}\n"
+                + "}";
         taskDefinition.setTaskParams(taskParameters);
 
         Map<Integer, Resource> resourceMap =
-            Stream.of(1, 3, 4)
-                .map(i -> {
-                    Resource resource = new Resource();
-                    resource.setId(i);
-                    resource.setFileName("file" + i);
-                    resource.setFullName("/file" + i);
-                    return resource;
-                })
-                .collect(
-                    Collectors.toMap(
-                        Resource::getId,
-                        resource -> resource)
-                );
+                Stream.of(1, 3, 4)
+                        .map(i -> {
+                            Resource resource = new Resource();
+                            resource.setId(i);
+                            resource.setFileName("file" + i);
+                            resource.setFullName("/file" + i);
+                            return resource;
+                        })
+                        .collect(
+                                Collectors.toMap(
+                                        Resource::getId,
+                                        resource -> resource)
+                        );
         for (Integer integer : Arrays.asList(1, 3, 4)) {
             Mockito.when(resourceMapper.selectById(integer))
-                .thenReturn(resourceMap.get(integer));
+                    .thenReturn(resourceMap.get(integer));
         }
 
         Whitebox.invokeMethod(processService,
-            "updateTaskDefinitionResources",
-            taskDefinition);
+                "updateTaskDefinitionResources",
+                taskDefinition);
 
         String taskParams = taskDefinition.getTaskParams();
         SparkParameters sparkParameters = JSONUtils.parseObject(taskParams, SparkParameters.class);
@@ -582,15 +597,15 @@ public class ProcessServiceTest {
         // test if input is null
         ResourceInfo resourceInfoNull = null;
         ResourceInfo updatedResourceInfo1 = Whitebox.invokeMethod(processService,
-            "updateResourceInfo",
-            resourceInfoNull);
+                "updateResourceInfo",
+                resourceInfoNull);
         Assert.assertNull(updatedResourceInfo1);
 
         // test if resource id less than 1
         ResourceInfo resourceInfoVoid = new ResourceInfo();
         ResourceInfo updatedResourceInfo2 = Whitebox.invokeMethod(processService,
-            "updateResourceInfo",
-            resourceInfoVoid);
+                "updateResourceInfo",
+                resourceInfoVoid);
         Assert.assertNull(updatedResourceInfo2);
 
         // test normal situation
@@ -602,8 +617,8 @@ public class ProcessServiceTest {
         resource.setFullName("/test.txt");
         Mockito.when(resourceMapper.selectById(1)).thenReturn(resource);
         ResourceInfo updatedResourceInfo3 = Whitebox.invokeMethod(processService,
-            "updateResourceInfo",
-            resourceInfoNormal);
+                "updateResourceInfo",
+                resourceInfoNormal);
 
         Assert.assertEquals(1, updatedResourceInfo3.getId());
         Assert.assertEquals("test.txt", updatedResourceInfo3.getRes());