You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/10/22 04:39:30 UTC

[dolphinscheduler] branch dev updated: [Refactor] Migrate all command-related interface functions from ProcessServiceImpl (#12474)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b936b882bb [Refactor] Migrate all command-related interface functions from ProcessServiceImpl (#12474)
b936b882bb is described below

commit b936b882bb05a6058a90b6f5f271793b9ac70aaa
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Sat Oct 22 12:39:25 2022 +0800

    [Refactor] Migrate all command-related interface functions from ProcessServiceImpl (#12474)
    
    * migrate all command-related interface functions to CommonService
---
 .../api/service/impl/ExecutorServiceImpl.java      |  36 ++-
 .../api/service/ExecutorServiceTest.java           |  24 +-
 .../master/runner/MasterSchedulerBootstrap.java    |   9 +-
 .../master/runner/WorkflowExecuteRunnable.java     |   9 +-
 .../master/runner/WorkflowExecuteRunnableTest.java |   7 +-
 .../scheduler/quartz/ProcessScheduleTask.java      |   6 +-
 .../service/command/CommandService.java            |  88 +++++++
 .../service/command/CommandServiceImpl.java        | 272 ++++++++++++++++++++
 .../service/process/ProcessService.java            |  21 --
 .../service/process/ProcessServiceImpl.java        | 285 +--------------------
 .../dolphinscheduler/service/utils/ParamUtils.java |  90 +++++++
 .../service/command/CommandServiceImplTest.java    | 227 ++++++++++++++++
 .../service/process/ProcessServiceTest.java        | 152 -----------
 .../service/utils/ParamUtilsTest.java              |  37 +++
 14 files changed, 788 insertions(+), 475 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 41ac529318..2ba1ccd38f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -77,6 +77,7 @@ import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeComman
 import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.cron.CronUtils;
 import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -130,6 +131,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private CommandService commandService;
+
     @Autowired
     private ProcessInstanceDao processInstanceDao;
 
@@ -626,7 +630,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         command.setProcessDefinitionVersion(processVersion);
         command.setProcessInstanceId(instanceId);
         command.setTestFlag(testFlag);
-        if (!processService.verifyIsNeedCreateCommand(command)) {
+        if (!commandService.verifyIsNeedCreateCommand(command)) {
             logger.warn(
                     "Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
                     processDefinitionCode, processVersion, instanceId);
@@ -635,7 +639,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         }
 
         logger.info("Creating command, commandInfo:{}.", command);
-        int create = processService.createCommand(command);
+        int create = commandService.createCommand(command);
 
         if (create > 0) {
             logger.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.",
@@ -784,7 +788,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         } else {
             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
             logger.info("Creating command, commandInfo:{}.", command);
-            return processService.createCommand(command);
+            return commandService.createCommand(command);
         }
     }
 
@@ -824,26 +828,28 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                     cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
                     command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                     logger.info("Creating command, commandInfo:{}.", command);
-                    createCount = processService.createCommand(command);
-                    if (createCount > 0)
+                    createCount = commandService.createCommand(command);
+                    if (createCount > 0) {
                         logger.info("Create {} command complete, processDefinitionCode:{}",
                                 command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                    else
+                    } else {
                         logger.error("Create {} command error, processDefinitionCode:{}",
                                 command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                    }
                 }
                 if (startDate != null && endDate != null) {
                     cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
                     cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
                     command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                     logger.info("Creating command, commandInfo:{}.", command);
-                    createCount = processService.createCommand(command);
-                    if (createCount > 0)
+                    createCount = commandService.createCommand(command);
+                    if (createCount > 0) {
                         logger.info("Create {} command complete, processDefinitionCode:{}",
                                 command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                    else
+                    } else {
                         logger.error("Create {} command error, processDefinitionCode:{}",
                                 command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                    }
                     // dependent process definition
                     List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
                             command.getProcessDefinitionCode());
@@ -904,12 +910,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                                     DateUtils.dateToString(listDate.get(endDateIndex)));
                             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                             logger.info("Creating command, commandInfo:{}.", command);
-                            if (processService.createCommand(command) > 0)
+                            if (commandService.createCommand(command) > 0) {
                                 logger.info("Create {} command complete, processDefinitionCode:{}",
                                         command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                            else
+                            } else {
                                 logger.error("Create {} command error, processDefinitionCode:{}",
                                         command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                            }
                             if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
                                 logger.info(
                                         "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
@@ -937,12 +944,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                             cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
                             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                             logger.info("Creating command, commandInfo:{}.", command);
-                            if (processService.createCommand(command) > 0)
+                            if (commandService.createCommand(command) > 0) {
                                 logger.info("Create {} command complete, processDefinitionCode:{}",
                                         command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                            else
+                            } else {
                                 logger.error("Create {} command error, processDefinitionCode:{}",
                                         command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+                            }
                         }
                     }
                 }
@@ -985,7 +993,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
             cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
             dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
             logger.info("Creating complement dependent command, commandInfo:{}.", command);
-            dependentProcessDefinitionCreateCount += processService.createCommand(dependentCommand);
+            dependentProcessDefinitionCreateCount += commandService.createCommand(dependentCommand);
         }
 
         return dependentProcessDefinitionCreateCount;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 58889636c0..3cd27f7834 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -57,6 +57,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
+import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
@@ -102,6 +103,9 @@ public class ExecutorServiceTest {
     @Mock
     private ProcessService processService;
 
+    @Mock
+    private CommandService commandService;
+
     @Mock
     private ProcessDefinitionMapper processDefinitionMapper;
 
@@ -194,8 +198,8 @@ public class ExecutorServiceTest {
                 .thenReturn(checkProjectAndAuth());
         Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
         Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
-        doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null));
-        doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null));
+        doReturn(1).when(commandService).createCommand(argThat(c -> c.getId() == null));
+        doReturn(0).when(commandService).createCommand(argThat(c -> c.getId() != null));
         Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
         Mockito.when(processService.findProcessInstanceDetailById(processInstanceId))
                 .thenReturn(Optional.ofNullable(processInstance));
@@ -230,7 +234,7 @@ public class ExecutorServiceTest {
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
-        verify(processService, times(1)).createCommand(any(Command.class));
+        verify(commandService, times(1)).createCommand(any(Command.class));
 
     }
 
@@ -253,7 +257,7 @@ public class ExecutorServiceTest {
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
-        verify(processService, times(1)).createCommand(any(Command.class));
+        verify(commandService, times(1)).createCommand(any(Command.class));
 
     }
 
@@ -320,7 +324,7 @@ public class ExecutorServiceTest {
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
         Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
-        verify(processService, times(0)).createCommand(any(Command.class));
+        verify(commandService, times(0)).createCommand(any(Command.class));
     }
 
     /**
@@ -342,7 +346,7 @@ public class ExecutorServiceTest {
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
-        verify(processService, times(1)).createCommand(any(Command.class));
+        verify(commandService, times(1)).createCommand(any(Command.class));
     }
 
     /**
@@ -364,7 +368,7 @@ public class ExecutorServiceTest {
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
-        verify(processService, times(31)).createCommand(any(Command.class));
+        verify(commandService, times(31)).createCommand(any(Command.class));
 
     }
 
@@ -387,7 +391,7 @@ public class ExecutorServiceTest {
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
-        verify(processService, times(15)).createCommand(any(Command.class));
+        verify(commandService, times(15)).createCommand(any(Command.class));
 
     }
 
@@ -411,7 +415,7 @@ public class ExecutorServiceTest {
 
     @Test
     public void testExecuteRepeatRunning() {
-        Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
+        Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN))
                 .thenReturn(checkProjectAndAuth());
         Map<String, Object> result =
@@ -421,7 +425,7 @@ public class ExecutorServiceTest {
 
     @Test
     public void testOfTestRun() {
-        Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
+        Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN))
                 .thenReturn(checkProjectAndAuth());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 377dff7191..80616048e2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
+import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
@@ -66,6 +67,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private CommandService commandService;
+
     @Autowired
     private ProcessInstanceDao processInstanceDao;
 
@@ -172,6 +176,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
                                     "The workflow instance is already been cached, this case shouldn't be happened");
                         }
                         WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
+                                commandService,
                                 processService,
                                 processInstanceDao,
                                 nettyExecutorManager,
@@ -225,7 +230,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
                     }
                 } catch (Exception e) {
                     logger.error("Master handle command {} error ", command.getId(), e);
-                    processService.moveToErrorCommand(command, e.toString());
+                    commandService.moveToErrorCommand(command, e.toString());
                 } finally {
                     latch.countDown();
                 }
@@ -254,7 +259,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
             int pageNumber = 0;
             int pageSize = masterConfig.getFetchCommandNum();
             final List<Command> result =
-                    processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+                    commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
             if (CollectionUtils.isNotEmpty(result)) {
                 logger.info(
                         "Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 1a896bf9c5..6b7925a97f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -76,6 +76,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
+import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.cron.CronUtils;
 import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
@@ -127,6 +128,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
     private final ProcessService processService;
 
+    private final CommandService commandService;
+
     private ProcessInstanceDao processInstanceDao;
 
     private final ProcessAlertManager processAlertManager;
@@ -233,6 +236,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      */
     public WorkflowExecuteRunnable(
                                    @NonNull ProcessInstance processInstance,
+                                   @NonNull CommandService commandService,
                                    @NonNull ProcessService processService,
                                    @NonNull ProcessInstanceDao processInstanceDao,
                                    @NonNull NettyExecutorManager nettyExecutorManager,
@@ -241,6 +245,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                                    @NonNull StateWheelExecuteThread stateWheelExecuteThread,
                                    @NonNull CuringParamsService curingParamsService) {
         this.processService = processService;
+        this.commandService = commandService;
         this.processInstanceDao = processInstanceDao;
         this.processInstance = processInstance;
         this.nettyExecutorManager = nettyExecutorManager;
@@ -657,7 +662,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         command.setProcessInstanceId(0);
         command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
         command.setTestFlag(processInstance.getTestFlag());
-        return processService.createCommand(command);
+        return commandService.createCommand(command);
     }
 
     private boolean needComplementProcess() {
@@ -750,7 +755,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         command.setProcessDefinitionCode(processDefinition.getCode());
         command.setProcessDefinitionVersion(processDefinition.getVersion());
         command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-        processService.createCommand(command);
+        commandService.createCommand(command);
     }
 
     /**
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 755311b30f..6fb703511f 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
@@ -72,6 +73,8 @@ public class WorkflowExecuteRunnableTest {
 
     private ProcessService processService;
 
+    private CommandService commandService;
+
     private ProcessInstanceDao processInstanceDao;
 
     private MasterConfig config;
@@ -90,6 +93,7 @@ public class WorkflowExecuteRunnableTest {
 
         config = new MasterConfig();
         processService = Mockito.mock(ProcessService.class);
+        commandService = Mockito.mock(CommandService.class);
         processInstanceDao = Mockito.mock(ProcessInstanceDao.class);
         processInstance = Mockito.mock(ProcessInstance.class);
         Map<String, String> cmdParam = new HashMap<>();
@@ -105,7 +109,8 @@ public class WorkflowExecuteRunnableTest {
         NettyExecutorManager nettyExecutorManager = Mockito.mock(NettyExecutorManager.class);
         ProcessAlertManager processAlertManager = Mockito.mock(ProcessAlertManager.class);
         workflowExecuteThread = Mockito.spy(
-                new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager,
+                new WorkflowExecuteRunnable(processInstance, commandService, processService, processInstanceDao,
+                        nettyExecutorManager,
                         processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
         Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
         dag.setAccessible(true);
diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
index 31251c108e..79deabf543 100644
--- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
+++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.scheduler.quartz.utils.QuartzTaskUtils;
+import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.commons.lang3.StringUtils;
@@ -49,6 +50,9 @@ public class ProcessScheduleTask extends QuartzJobBean {
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private CommandService commandService;
+
     @Counted(value = "ds.master.quartz.job.executed")
     @Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
     @Override
@@ -100,7 +104,7 @@ public class ProcessScheduleTask extends QuartzJobBean {
         command.setProcessInstancePriority(schedule.getProcessInstancePriority());
         command.setProcessDefinitionVersion(processDefinition.getVersion());
 
-        processService.createCommand(command);
+        commandService.createCommand(command);
     }
 
     private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) {
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java
new file mode 100644
index 0000000000..48d0a1bbae
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.command;
+
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.List;
+
+/**
+ * Command Service
+ */
+public interface CommandService {
+
+    /**
+     * Save error command, and delete original command. If the given command has already been moved into error command,
+     * will throw {@link java.sql.SQLIntegrityConstraintViolationException ).
+     * @param command command
+     * @param message message
+     */
+    void moveToErrorCommand(Command command, String message);
+
+    /**
+     * Create new command
+     * @param command command
+     * @return result
+     */
+    int createCommand(Command command);
+
+    /**
+     * Get command page
+     * @param pageSize page size
+     * @param pageNumber page number
+     * @param masterCount master count
+     * @param thisMasterSlot master slot
+     * @return command page
+     */
+    List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot);
+
+    /**
+     * check the input command exists in queue list
+     *
+     * @param command command
+     * @return create command result
+     */
+    boolean verifyIsNeedCreateCommand(Command command);
+
+    /**
+     * create recovery waiting thread command when thread pool is not enough for the process instance.
+     * sub work process instance need not create recovery command.
+     * create recovery waiting thread  command and delete origin command at the same time.
+     * if the recovery command is exists, only update the field update_time
+     *
+     * @param originCommand   originCommand
+     * @param processInstance processInstance
+     */
+    void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance);
+
+    /**
+     * create sub work process command
+     * @param parentProcessInstance parent process instance
+     * @param childInstance child process instance
+     * @param instanceMap process instance map
+     * @param task task instance
+     * @return command
+     */
+    Command createSubProcessCommand(ProcessInstance parentProcessInstance,
+                                    ProcessInstance childInstance,
+                                    ProcessInstanceMap instanceMap,
+                                    TaskInstance task);
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
new file mode 100644
index 0000000000..62ce978a84
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.command;
+
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.utils.ParamUtils;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import io.micrometer.core.annotation.Counted;
+
+/**
+ * Command Service implementation
+ */
+@Component
+public class CommandServiceImpl implements CommandService {
+
+    private final Logger logger = LoggerFactory.getLogger(CommandServiceImpl.class);
+
+    @Autowired
+    private ErrorCommandMapper errorCommandMapper;
+
+    @Autowired
+    private CommandMapper commandMapper;
+
+    @Autowired
+    private ScheduleMapper scheduleMapper;
+
+    @Autowired
+    private ProcessDefinitionMapper processDefineMapper;
+
+    @Override
+    public void moveToErrorCommand(Command command, String message) {
+        ErrorCommand errorCommand = new ErrorCommand(command, message);
+        this.errorCommandMapper.insert(errorCommand);
+        this.commandMapper.deleteById(command.getId());
+    }
+
+    @Override
+    @Counted("ds.workflow.create.command.count")
+    public int createCommand(Command command) {
+        int result = 0;
+        if (command == null) {
+            return result;
+        }
+        // add command timezone
+        Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
+        if (schedule != null) {
+            Map<String, String> commandParams =
+                    StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam())
+                            : new HashMap<>();
+            commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
+            command.setCommandParam(JSONUtils.toJsonString(commandParams));
+        }
+        command.setId(null);
+        result = commandMapper.insert(command);
+        return result;
+    }
+
+    @Override
+    public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
+        if (masterCount <= 0) {
+            return Lists.newArrayList();
+        }
+        return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
+    }
+
+    @Override
+    public boolean verifyIsNeedCreateCommand(Command command) {
+        boolean isNeedCreate = true;
+        EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
+        cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
+        cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
+        cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
+        CommandType commandType = command.getCommandType();
+
+        if (!cmdTypeMap.containsKey(commandType)) {
+            return true;
+        }
+
+        ObjectNode cmdParamObj = JSONUtils.parseObject(command.getCommandParam());
+        int processInstanceId = cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt();
+
+        List<Command> commands = commandMapper.selectList(null);
+        // for all commands
+        for (Command tmpCommand : commands) {
+            if (!cmdTypeMap.containsKey(tmpCommand.getCommandType())) {
+                continue;
+            }
+            ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam());
+            if (tempObj != null
+                    && processInstanceId == tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
+                isNeedCreate = false;
+                break;
+            }
+        }
+        return isNeedCreate;
+    }
+
+    @Override
+    public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
+        // sub process doesn't need to create wait command
+        if (processInstance.getIsSubProcess() == Flag.YES) {
+            if (originCommand != null) {
+                commandMapper.deleteById(originCommand.getId());
+            }
+            return;
+        }
+        Map<String, String> cmdParam = new HashMap<>();
+        cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD, String.valueOf(processInstance.getId()));
+        // process instance quit by "waiting thread" state
+        if (originCommand == null) {
+            Command command = new Command(
+                    CommandType.RECOVER_WAITING_THREAD,
+                    processInstance.getTaskDependType(),
+                    processInstance.getFailureStrategy(),
+                    processInstance.getExecutorId(),
+                    processInstance.getProcessDefinition().getCode(),
+                    JSONUtils.toJsonString(cmdParam),
+                    processInstance.getWarningType(),
+                    processInstance.getWarningGroupId(),
+                    processInstance.getScheduleTime(),
+                    processInstance.getWorkerGroup(),
+                    processInstance.getEnvironmentCode(),
+                    processInstance.getProcessInstancePriority(),
+                    processInstance.getDryRun(),
+                    processInstance.getId(),
+                    processInstance.getProcessDefinitionVersion(),
+                    processInstance.getTestFlag());
+            upsertCommand(command);
+            return;
+        }
+
+        // update the command time if current command is recover from waiting
+        if (originCommand.getCommandType() == CommandType.RECOVER_WAITING_THREAD) {
+            originCommand.setUpdateTime(new Date());
+            upsertCommand(originCommand);
+        } else {
+            // delete old command and create new waiting thread command
+            commandMapper.deleteById(originCommand.getId());
+            originCommand.setId(0);
+            originCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
+            originCommand.setUpdateTime(new Date());
+            originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
+            originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
+            upsertCommand(originCommand);
+        }
+    }
+
+    private int upsertCommand(@NotNull Command command) {
+        if (command.getId() != null) {
+            return commandMapper.updateById(command);
+        } else {
+            return commandMapper.insert(command);
+        }
+    }
+
+    @Override
+    public Command createSubProcessCommand(ProcessInstance parentProcessInstance, ProcessInstance childInstance,
+                                           ProcessInstanceMap instanceMap, TaskInstance task) {
+        CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
+        Map<String, Object> subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class);
+        long childDefineCode = 0L;
+        if (subProcessParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)) {
+            try {
+                childDefineCode =
+                        Long.parseLong(
+                                String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)));
+            } catch (NumberFormatException nfe) {
+                logger.error("processDefinitionCode is not a number", nfe);
+                return null;
+            }
+        }
+        ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode);
+
+        Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
+        List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+        Map<String, String> globalMap = ParamUtils.getGlobalParamMap(task.getVarPool());
+        Map<String, String> fatherParams = new HashMap<>();
+        if (CollectionUtils.isNotEmpty(allParam)) {
+            for (Property info : allParam) {
+                if (Direct.OUT == info.getDirect()) {
+                    continue;
+                }
+                fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
+            }
+        }
+        String processParam = ParamUtils.getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
+        int subProcessInstanceId =
+                childInstance == null ? 0 : (childInstance.getId() == null ? 0 : childInstance.getId());
+        return new Command(
+                commandType,
+                TaskDependType.TASK_POST,
+                parentProcessInstance.getFailureStrategy(),
+                parentProcessInstance.getExecutorId(),
+                subProcessDefinition.getCode(),
+                processParam,
+                parentProcessInstance.getWarningType(),
+                parentProcessInstance.getWarningGroupId(),
+                parentProcessInstance.getScheduleTime(),
+                task.getWorkerGroup(),
+                task.getEnvironmentCode(),
+                parentProcessInstance.getProcessInstancePriority(),
+                parentProcessInstance.getDryRun(),
+                subProcessInstanceId,
+                subProcessDefinition.getVersion(),
+                parentProcessInstance.getTestFlag());
+    }
+
+    /**
+     * get sub work flow command type
+     * child instance exist: child command = fatherCommand
+     * child instance not exists: child command = fatherCommand[0]
+     */
+    private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
+        CommandType commandType = parentProcessInstance.getCommandType();
+        if (childInstance == null) {
+            String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
+            commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
+        }
+        return commandType;
+    }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a18d751205..6f85ed3e0b 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -68,16 +68,6 @@ public interface ProcessService {
     ProcessInstance handleCommand(String host,
                                   Command command) throws CronParseException, CodeGenerateUtils.CodeGenerateException;
 
-    void moveToErrorCommand(Command command, String message);
-
-    int createCommand(Command command);
-
-    List<Command> findCommandPage(int pageSize, int pageNumber);
-
-    List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot);
-
-    boolean verifyIsNeedCreateCommand(Command command);
-
     Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
 
     List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);
@@ -100,8 +90,6 @@ public interface ProcessService {
 
     void recurseFindSubProcess(long parentCode, List<Long> ids);
 
-    void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance);
-
     Tenant getTenantForProcess(int tenantId, int userId);
 
     Environment findEnvironmentByCode(Long environmentCode);
@@ -116,19 +104,10 @@ public interface ProcessService {
 
     void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task);
 
-    Map<String, String> getGlobalParamMap(String globalParams);
-
-    Command createSubProcessCommand(ProcessInstance parentProcessInstance,
-                                    ProcessInstance childInstance,
-                                    ProcessInstanceMap instanceMap,
-                                    TaskInstance task);
-
     TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance);
 
     TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance);
 
-    int saveCommand(Command command);
-
     boolean saveTaskInstance(TaskInstance taskInstance);
 
     boolean createTaskInstance(TaskInstance taskInstance);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 8277dc6d71..2ef495e942 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -22,8 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
@@ -61,7 +59,6 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleExecuteSql;
 import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
 import org.apache.dolphinscheduler.dao.entity.DqTaskStatisticsValue;
 import org.apache.dolphinscheduler.dao.entity.Environment;
-import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -126,6 +123,7 @@ import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
 import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.cron.CronUtils;
 import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.exceptions.ServiceException;
@@ -138,12 +136,10 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -169,7 +165,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
-import io.micrometer.core.annotation.Counted;
 
 /**
  * process relative dao that some mappers in this.
@@ -285,6 +280,9 @@ public class ProcessServiceImpl implements ProcessService {
     @Autowired
     private LogClient logClient;
 
+    @Autowired
+    private CommandService commandService;
+
     /**
      * handle Command (construct ProcessInstance from Command) , wrapped in transaction
      *
@@ -300,7 +298,7 @@ public class ProcessServiceImpl implements ProcessService {
         // cannot construct process instance, return null
         if (processInstance == null) {
             logger.error("scan command, command parameter is error: {}", command);
-            moveToErrorCommand(command, "process instance is null");
+            commandService.moveToErrorCommand(command, "process instance is null");
             return null;
         }
         processInstance.setCommandType(command.getCommandType());
@@ -387,101 +385,6 @@ public class ProcessServiceImpl implements ProcessService {
         }
     }
 
-    /**
-     * Save error command, and delete original command. If the given command has already been moved into error command,
-     * will throw {@link java.sql.SQLIntegrityConstraintViolationException ).
-     *
-     * @param command command
-     * @param message message
-     */
-    @Override
-    public void moveToErrorCommand(Command command, String message) {
-        ErrorCommand errorCommand = new ErrorCommand(command, message);
-        this.errorCommandMapper.insert(errorCommand);
-        this.commandMapper.deleteById(command.getId());
-    }
-
-    /**
-     * insert one command
-     *
-     * @param command command
-     * @return create result
-     */
-    @Override
-    @Counted("ds.workflow.create.command.count")
-    public int createCommand(Command command) {
-        int result = 0;
-        if (command == null) {
-            return result;
-        }
-        // add command timezone
-        Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
-        if (schedule != null) {
-            Map<String, String> commandParams =
-                    StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam())
-                            : new HashMap<>();
-            commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
-            command.setCommandParam(JSONUtils.toJsonString(commandParams));
-        }
-        command.setId(null);
-        result = commandMapper.insert(command);
-        return result;
-    }
-
-    /**
-     * get command page
-     */
-    @Override
-    public List<Command> findCommandPage(int pageSize, int pageNumber) {
-        return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize);
-    }
-
-    /**
-     * get command page
-     */
-    @Override
-    public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
-        if (masterCount <= 0) {
-            return Lists.newArrayList();
-        }
-        return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
-    }
-
-    /**
-     * check the input command exists in queue list
-     *
-     * @param command command
-     * @return create command result
-     */
-    @Override
-    public boolean verifyIsNeedCreateCommand(Command command) {
-        boolean isNeedCreate = true;
-        EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
-        cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
-        cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
-        cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
-        CommandType commandType = command.getCommandType();
-
-        if (cmdTypeMap.containsKey(commandType)) {
-            ObjectNode cmdParamObj = JSONUtils.parseObject(command.getCommandParam());
-            int processInstanceId = cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt();
-
-            List<Command> commands = commandMapper.selectList(null);
-            // for all commands
-            for (Command tmpCommand : commands) {
-                if (cmdTypeMap.containsKey(tmpCommand.getCommandType())) {
-                    ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam());
-                    if (tempObj != null
-                            && processInstanceId == tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
-                        isNeedCreate = false;
-                        break;
-                    }
-                }
-            }
-        }
-        return isNeedCreate;
-    }
-
     /**
      * find process instance detail by id
      *
@@ -670,66 +573,6 @@ public class ProcessServiceImpl implements ProcessService {
         }
     }
 
-    /**
-     * create recovery waiting thread command when thread pool is not enough for the process instance.
-     * sub work process instance need not to create recovery command.
-     * create recovery waiting thread  command and delete origin command at the same time.
-     * if the recovery command is exists, only update the field update_time
-     *
-     * @param originCommand   originCommand
-     * @param processInstance processInstance
-     */
-    @Override
-    public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
-
-        // sub process doesnot need to create wait command
-        if (processInstance.getIsSubProcess() == Flag.YES) {
-            if (originCommand != null) {
-                commandMapper.deleteById(originCommand.getId());
-            }
-            return;
-        }
-        Map<String, String> cmdParam = new HashMap<>();
-        cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD, String.valueOf(processInstance.getId()));
-        // process instance quit by "waiting thread" state
-        if (originCommand == null) {
-            Command command = new Command(
-                    CommandType.RECOVER_WAITING_THREAD,
-                    processInstance.getTaskDependType(),
-                    processInstance.getFailureStrategy(),
-                    processInstance.getExecutorId(),
-                    processInstance.getProcessDefinition().getCode(),
-                    JSONUtils.toJsonString(cmdParam),
-                    processInstance.getWarningType(),
-                    processInstance.getWarningGroupId(),
-                    processInstance.getScheduleTime(),
-                    processInstance.getWorkerGroup(),
-                    processInstance.getEnvironmentCode(),
-                    processInstance.getProcessInstancePriority(),
-                    processInstance.getDryRun(),
-                    processInstance.getId(),
-                    processInstance.getProcessDefinitionVersion(),
-                    processInstance.getTestFlag());
-            saveCommand(command);
-            return;
-        }
-
-        // update the command time if current command if recover from waiting
-        if (originCommand.getCommandType() == CommandType.RECOVER_WAITING_THREAD) {
-            originCommand.setUpdateTime(new Date());
-            saveCommand(originCommand);
-        } else {
-            // delete old command and create new waiting thread command
-            commandMapper.deleteById(originCommand.getId());
-            originCommand.setId(0);
-            originCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
-            originCommand.setUpdateTime(new Date());
-            originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
-            originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
-            saveCommand(originCommand);
-        }
-    }
-
     /**
      * get schedule time from command
      *
@@ -1445,105 +1288,18 @@ public class ProcessServiceImpl implements ProcessService {
             logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId());
             return;
         }
-        Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
+        Command subProcessCommand =
+                commandService.createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
+        if (subProcessCommand == null) {
+            logger.error("create sub process command failed, so skip creating command");
+            return;
+        }
         updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode());
         initSubInstanceState(childInstance);
-        createCommand(subProcessCommand);
+        commandService.createCommand(subProcessCommand);
         logger.info("sub process command created: {} ", subProcessCommand);
     }
 
-    /**
-     * complement data needs transform parent parameter to child.
-     */
-    protected String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,
-                                         Map<String, String> fatherParams) {
-        // set sub work process command
-        String processMapStr = JSONUtils.toJsonString(instanceMap);
-        Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
-        if (parentProcessInstance.isComplementData()) {
-            Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
-            String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
-            String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
-            String scheduleTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
-            if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) {
-                cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
-                cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
-            }
-            if (StringUtils.isNotEmpty(scheduleTime)) {
-                cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime);
-            }
-            processMapStr = JSONUtils.toJsonString(cmdParam);
-        }
-        if (MapUtils.isNotEmpty(fatherParams)) {
-            cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams));
-            processMapStr = JSONUtils.toJsonString(cmdParam);
-        }
-        return processMapStr;
-    }
-
-    @Override
-    public Map<String, String> getGlobalParamMap(String globalParams) {
-        List<Property> propList;
-        Map<String, String> globalParamMap = new HashMap<>();
-        if (!Strings.isNullOrEmpty(globalParams)) {
-            propList = JSONUtils.toList(globalParams, Property.class);
-            globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
-        }
-
-        return globalParamMap;
-    }
-
-    /**
-     * create sub work process command
-     */
-    @Override
-    public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
-                                           ProcessInstance childInstance,
-                                           ProcessInstanceMap instanceMap,
-                                           TaskInstance task) {
-        CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
-        Map<String, Object> subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class);
-        long childDefineCode = 0L;
-        if (subProcessParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)) {
-            childDefineCode =
-                    Long.parseLong(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)));
-        }
-        ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode);
-
-        Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
-        List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
-        Map<String, String> globalMap = this.getGlobalParamMap(task.getVarPool());
-        Map<String, String> fatherParams = new HashMap<>();
-        if (CollectionUtils.isNotEmpty(allParam)) {
-            for (Property info : allParam) {
-                if (Direct.OUT == info.getDirect()) {
-                    continue;
-                }
-                fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
-            }
-        }
-        String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
-        int subProcessInstanceId =
-                childInstance == null ? 0 : (childInstance.getId() == null ? 0 : childInstance.getId());
-        return new Command(
-                commandType,
-                TaskDependType.TASK_POST,
-                parentProcessInstance.getFailureStrategy(),
-                parentProcessInstance.getExecutorId(),
-                subProcessDefinition.getCode(),
-                processParam,
-                parentProcessInstance.getWarningType(),
-                parentProcessInstance.getWarningGroupId(),
-                parentProcessInstance.getScheduleTime(),
-                task.getWorkerGroup(),
-                task.getEnvironmentCode(),
-                parentProcessInstance.getProcessInstancePriority(),
-                parentProcessInstance.getDryRun(),
-                subProcessInstanceId,
-                subProcessDefinition.getVersion(),
-                parentProcessInstance.getTestFlag());
-    }
-
     /**
      * initialize sub work flow state
      * child instance state would be initialized when 'recovery from pause/stop/failure'
@@ -1681,21 +1437,6 @@ public class ProcessServiceImpl implements ProcessService {
         return true;
     }
 
-    /**
-     * insert or update command
-     *
-     * @param command command
-     * @return save command result
-     */
-    @Override
-    public int saveCommand(Command command) {
-        if (command.getId() != null) {
-            return commandMapper.updateById(command);
-        } else {
-            return commandMapper.insert(command);
-        }
-    }
-
     /**
      * insert or update task instance
      *
@@ -2112,7 +1853,7 @@ public class ProcessServiceImpl implements ProcessService {
         cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
         cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
         cmd.setTestFlag(processInstance.getTestFlag());
-        createCommand(cmd);
+        commandService.createCommand(cmd);
     }
 
     /**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java
new file mode 100644
index 0000000000..b2bea95d8b
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.utils;
+
+import static org.apache.dolphinscheduler.common.Constants.*;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Strings;
+
+/**
+ * Param Utility class
+ */
+public class ParamUtils {
+
+    /**
+     * convert globalParams string to global parameter map
+     * @param globalParams  globalParams
+     * @return parameter map
+     */
+    public static Map<String, String> getGlobalParamMap(String globalParams) {
+        List<Property> propList;
+        Map<String, String> globalParamMap = new HashMap<>();
+        if (!Strings.isNullOrEmpty(globalParams)) {
+            propList = JSONUtils.toList(globalParams, Property.class);
+            globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
+        }
+        return globalParamMap;
+    }
+
+    /**
+     * Get sub workflow parameters
+     * @param instanceMap process instance map
+     * @param parentProcessInstance  parent process instance
+     * @param fatherParams fatherParams
+     * @return sub workflow parameters
+     */
+    public static String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,
+                                             Map<String, String> fatherParams) {
+        // set sub work process command
+        String processMapStr = JSONUtils.toJsonString(instanceMap);
+        Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
+        if (parentProcessInstance.isComplementData()) {
+            Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
+            String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+            String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+            String scheduleTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+            if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) {
+                cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
+                cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
+            }
+            if (StringUtils.isNotEmpty(scheduleTime)) {
+                cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime);
+            }
+            processMapStr = JSONUtils.toJsonString(cmdParam);
+        }
+        if (MapUtils.isNotEmpty(fatherParams)) {
+            cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams));
+            processMapStr = JSONUtils.toJsonString(cmdParam);
+        }
+        return processMapStr;
+    }
+
+}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java
new file mode 100644
index 0000000000..739f8a099f
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.command;
+
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.mockito.ArgumentMatchers.anyString;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class CommandServiceImplTest {
+
+    @InjectMocks
+    private CommandServiceImpl commandService;
+
+    @Mock
+    private CommandMapper commandMapper;
+
+    @Mock
+    private ProcessDefinitionMapper processDefineMapper;
+
+    @Mock
+    private ScheduleMapper scheduleMapper;
+
+    @Test
+    public void testCreateSubCommand() {
+        ProcessInstance parentInstance = new ProcessInstance();
+        parentInstance.setWarningType(WarningType.SUCCESS);
+        parentInstance.setWarningGroupId(0);
+
+        TaskInstance task = new TaskInstance();
+        task.setTaskParams("{\"processDefinitionCode\":10}}");
+        task.setId(10);
+        task.setTaskCode(1L);
+        task.setTaskDefinitionVersion(1);
+
+        ProcessInstance childInstance = null;
+        ProcessInstanceMap instanceMap = new ProcessInstanceMap();
+        instanceMap.setParentProcessInstanceId(1);
+        instanceMap.setParentTaskInstanceId(10);
+        Command command;
+
+        // father history: start; child null == command type: start
+        parentInstance.setHistoryCmd("START_PROCESS");
+        parentInstance.setCommandType(CommandType.START_PROCESS);
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setCode(10L);
+        Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition);
+        Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(processDefinition);
+        command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
+        Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
+
+        // father history: start,start failure; child null == command type: start
+        parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+        parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
+        command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
+        Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
+
+        // father history: scheduler,start failure; child null == command type: scheduler
+        parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+        parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
+        command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
+        Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType());
+
+        // father history: complement,start failure; child null == command type: complement
+
+        String startString = "2020-01-01 00:00:00";
+        String endString = "2020-01-10 00:00:00";
+        parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+        parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
+        Map<String, String> complementMap = new HashMap<>();
+        complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString);
+        complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString);
+        parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
+        command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
+        Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
+
+        JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
+        Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText());
+        Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText());
+        Assertions.assertEquals(startString, DateUtils.dateToString(start));
+        Assertions.assertEquals(endString, DateUtils.dateToString(end));
+
+        // father history: start,failure,start failure; child not null == command type: start failure
+        childInstance = new ProcessInstance();
+        parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+        parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
+        command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
+        Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
+    }
+
+    @Test
+    public void testVerifyIsNeedCreateCommand() {
+
+        List<Command> commands = new ArrayList<>();
+
+        Command command = new Command();
+        command.setCommandType(CommandType.REPEAT_RUNNING);
+        command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\"}");
+        commands.add(command);
+        Mockito.when(commandMapper.selectList(null)).thenReturn(commands);
+        Assertions.assertFalse(commandService.verifyIsNeedCreateCommand(command));
+
+        Command command1 = new Command();
+        command1.setCommandType(CommandType.REPEAT_RUNNING);
+        command1.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"222\"}");
+        Assertions.assertTrue(commandService.verifyIsNeedCreateCommand(command1));
+
+        Command command2 = new Command();
+        command2.setCommandType(CommandType.PAUSE);
+        Assertions.assertTrue(commandService.verifyIsNeedCreateCommand(command2));
+    }
+
+    @Test
+    public void testCreateRecoveryWaitingThreadCommand() {
+        int id = 123;
+        Mockito.when(commandMapper.deleteById(id)).thenReturn(1);
+        ProcessInstance subProcessInstance = new ProcessInstance();
+        subProcessInstance.setIsSubProcess(Flag.YES);
+        Command originCommand = new Command();
+        originCommand.setId(id);
+        commandService.createRecoveryWaitingThreadCommand(originCommand, subProcessInstance);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(111);
+        commandService.createRecoveryWaitingThreadCommand(null, subProcessInstance);
+
+        Command recoverCommand = new Command();
+        recoverCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
+        commandService.createRecoveryWaitingThreadCommand(recoverCommand, subProcessInstance);
+
+        Command repeatRunningCommand = new Command();
+        recoverCommand.setCommandType(CommandType.REPEAT_RUNNING);
+        commandService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance);
+
+        ProcessInstance subProcessInstance2 = new ProcessInstance();
+        subProcessInstance2.setId(111);
+        subProcessInstance2.setIsSubProcess(Flag.NO);
+        commandService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance2);
+    }
+
+    @Test
+    public void giveNullOriginCommand_thenCreateRecoveryWaitingThreadCommand_expectNoDelete() {
+        ProcessInstance subProcessInstance = new ProcessInstance();
+        subProcessInstance.setIsSubProcess(Flag.NO);
+        subProcessInstance.setId(111);
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setId(111);
+        processDefinition.setCode(10L);
+        subProcessInstance.setProcessDefinition(processDefinition);
+        subProcessInstance.setWarningGroupId(1);
+        commandService.createRecoveryWaitingThreadCommand(null, subProcessInstance);
+        Mockito.verify(commandMapper, Mockito.times(0)).deleteById(anyString());
+    }
+
+    @Test
+    public void testCreateCommand() {
+        Command command = new Command();
+        command.setProcessDefinitionCode(123);
+        command.setCommandParam("{\"ProcessInstanceId\":222}");
+        command.setCommandType(CommandType.START_PROCESS);
+        int mockResult = 1;
+        Mockito.when(commandMapper.insert(command)).thenReturn(mockResult);
+        int exeMethodResult = commandService.createCommand(command);
+        Assertions.assertEquals(mockResult, exeMethodResult);
+        Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
+    }
+
+    @Test
+    public void testFindCommandPageBySlot() {
+        int pageSize = 1;
+        int pageNumber = 0;
+        int masterCount = 0;
+        int thisMasterSlot = 2;
+        List<Command> commandList =
+                commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+        Assertions.assertEquals(0, commandList.size());
+    }
+
+}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index d0ef8f8e18..b394d9605a 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -24,15 +24,12 @@ import static org.mockito.ArgumentMatchers.any;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
 import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
@@ -42,7 +39,6 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@@ -63,7 +59,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
@@ -91,7 +86,6 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -105,8 +99,6 @@ import org.mockito.quality.Strictness;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JsonNode;
-
 /**
  * process service test
  */
@@ -165,132 +157,12 @@ public class ProcessServiceTest {
     @Mock
     private DqComparisonTypeMapper dqComparisonTypeMapper;
 
-    @Mock
-    private ScheduleMapper scheduleMapper;
-
     @Mock
     CuringParamsService curingGlobalParamsService;
 
     @Mock
     TaskPluginManager taskPluginManager;
 
-    @Test
-    public void testCreateSubCommand() {
-        ProcessInstance parentInstance = new ProcessInstance();
-        parentInstance.setWarningType(WarningType.SUCCESS);
-        parentInstance.setWarningGroupId(0);
-
-        TaskInstance task = new TaskInstance();
-        task.setTaskParams("{\"processDefinitionCode\":10}}");
-        task.setId(10);
-        task.setTaskCode(1L);
-        task.setTaskDefinitionVersion(1);
-
-        ProcessInstance childInstance = null;
-        ProcessInstanceMap instanceMap = new ProcessInstanceMap();
-        instanceMap.setParentProcessInstanceId(1);
-        instanceMap.setParentTaskInstanceId(10);
-        Command command;
-
-        // father history: start; child null == command type: start
-        parentInstance.setHistoryCmd("START_PROCESS");
-        parentInstance.setCommandType(CommandType.START_PROCESS);
-        ProcessDefinition processDefinition = new ProcessDefinition();
-        processDefinition.setCode(10L);
-        Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition);
-        Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(processDefinition);
-        command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
-        Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
-
-        // father history: start,start failure; child null == command type: start
-        parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
-        parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
-        command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
-        Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
-
-        // father history: scheduler,start failure; child null == command type: scheduler
-        parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
-        parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
-        command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
-        Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType());
-
-        // father history: complement,start failure; child null == command type: complement
-
-        String startString = "2020-01-01 00:00:00";
-        String endString = "2020-01-10 00:00:00";
-        parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
-        parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
-        Map<String, String> complementMap = new HashMap<>();
-        complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString);
-        complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString);
-        parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
-        command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
-        Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
-
-        JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
-        Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText());
-        Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText());
-        Assertions.assertEquals(startString, DateUtils.dateToString(start));
-        Assertions.assertEquals(endString, DateUtils.dateToString(end));
-
-        // father history: start,failure,start failure; child not null == command type: start failure
-        childInstance = new ProcessInstance();
-        parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
-        parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
-        command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
-        Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
-    }
-
-    @Test
-    public void testVerifyIsNeedCreateCommand() {
-
-        List<Command> commands = new ArrayList<>();
-
-        Command command = new Command();
-        command.setCommandType(CommandType.REPEAT_RUNNING);
-        command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\"}");
-        commands.add(command);
-        Mockito.when(commandMapper.selectList(null)).thenReturn(commands);
-        Assertions.assertFalse(processService.verifyIsNeedCreateCommand(command));
-
-        Command command1 = new Command();
-        command1.setCommandType(CommandType.REPEAT_RUNNING);
-        command1.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"222\"}");
-        Assertions.assertTrue(processService.verifyIsNeedCreateCommand(command1));
-
-        Command command2 = new Command();
-        command2.setCommandType(CommandType.PAUSE);
-        Assertions.assertTrue(processService.verifyIsNeedCreateCommand(command2));
-    }
-
-    @Test
-    public void testCreateRecoveryWaitingThreadCommand() {
-        int id = 123;
-        Mockito.when(commandMapper.deleteById(id)).thenReturn(1);
-        ProcessInstance subProcessInstance = new ProcessInstance();
-        subProcessInstance.setIsSubProcess(Flag.YES);
-        Command originCommand = new Command();
-        originCommand.setId(id);
-        processService.createRecoveryWaitingThreadCommand(originCommand, subProcessInstance);
-
-        ProcessInstance processInstance = new ProcessInstance();
-        processInstance.setId(111);
-        processService.createRecoveryWaitingThreadCommand(null, subProcessInstance);
-
-        Command recoverCommand = new Command();
-        recoverCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
-        processService.createRecoveryWaitingThreadCommand(recoverCommand, subProcessInstance);
-
-        Command repeatRunningCommand = new Command();
-        recoverCommand.setCommandType(CommandType.REPEAT_RUNNING);
-        processService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance);
-
-        ProcessInstance subProcessInstance2 = new ProcessInstance();
-        subProcessInstance2.setId(111);
-        subProcessInstance2.setIsSubProcess(Flag.NO);
-        processService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance2);
-    }
-
     @Test
     public void testHandleCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException {
 
@@ -789,19 +661,6 @@ public class ProcessServiceTest {
         Assertions.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount());
     }
 
-    @Test
-    public void testCreateCommand() {
-        Command command = new Command();
-        command.setProcessDefinitionCode(123);
-        command.setCommandParam("{\"ProcessInstanceId\":222}");
-        command.setCommandType(CommandType.START_PROCESS);
-        int mockResult = 1;
-        Mockito.when(commandMapper.insert(command)).thenReturn(mockResult);
-        int exeMethodResult = processService.createCommand(command);
-        Assertions.assertEquals(mockResult, exeMethodResult);
-        Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
-    }
-
     @Test
     public void testChangeOutParam() {
         TaskInstance taskInstance = new TaskInstance();
@@ -887,17 +746,6 @@ public class ProcessServiceTest {
         Assertions.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
     }
 
-    @Test
-    public void testFindCommandPageBySlot() {
-        int pageSize = 1;
-        int pageNumber = 0;
-        int masterCount = 0;
-        int thisMasterSlot = 2;
-        List<Command> commandList =
-                processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
-        Assertions.assertEquals(0, commandList.size());
-    }
-
     @Test
     public void testFindLastManualProcessInterval() {
         long definitionCode = 1L;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java
new file mode 100644
index 0000000000..cdd0d761a4
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.utils;
+
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ParamUtilsTest {
+
+    @Test
+    public void testGetGlobalParamMap() {
+        String globalParam = "[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]";
+        Map<String, String> globalParamMap = ParamUtils.getGlobalParamMap(globalParam);
+        Assertions.assertEquals(globalParamMap.size(), 1);
+        Assertions.assertEquals(globalParamMap.get("startParam1"), "");
+
+        Map<String, String> emptyParamMap = ParamUtils.getGlobalParamMap(null);
+        Assertions.assertEquals(emptyParamMap.size(), 0);
+    }
+}