You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/10/22 04:39:30 UTC
[dolphinscheduler] branch dev updated: [Refactor] Migrate all command-related interface functions from ProcessServiceImpl (#12474)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b936b882bb [Refactor] Migrate all command-related interface functions from ProcessServiceImpl (#12474)
b936b882bb is described below
commit b936b882bb05a6058a90b6f5f271793b9ac70aaa
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Sat Oct 22 12:39:25 2022 +0800
[Refactor] Migrate all command-related interface functions from ProcessServiceImpl (#12474)
* migrate all command-related interface functions to CommonService
---
.../api/service/impl/ExecutorServiceImpl.java | 36 ++-
.../api/service/ExecutorServiceTest.java | 24 +-
.../master/runner/MasterSchedulerBootstrap.java | 9 +-
.../master/runner/WorkflowExecuteRunnable.java | 9 +-
.../master/runner/WorkflowExecuteRunnableTest.java | 7 +-
.../scheduler/quartz/ProcessScheduleTask.java | 6 +-
.../service/command/CommandService.java | 88 +++++++
.../service/command/CommandServiceImpl.java | 272 ++++++++++++++++++++
.../service/process/ProcessService.java | 21 --
.../service/process/ProcessServiceImpl.java | 285 +--------------------
.../dolphinscheduler/service/utils/ParamUtils.java | 90 +++++++
.../service/command/CommandServiceImplTest.java | 227 ++++++++++++++++
.../service/process/ProcessServiceTest.java | 152 -----------
.../service/utils/ParamUtilsTest.java | 37 +++
14 files changed, 788 insertions(+), 475 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 41ac529318..2ba1ccd38f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -77,6 +77,7 @@ import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeComman
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -130,6 +131,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
private ProcessService processService;
+ @Autowired
+ private CommandService commandService;
+
@Autowired
private ProcessInstanceDao processInstanceDao;
@@ -626,7 +630,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setProcessDefinitionVersion(processVersion);
command.setProcessInstanceId(instanceId);
command.setTestFlag(testFlag);
- if (!processService.verifyIsNeedCreateCommand(command)) {
+ if (!commandService.verifyIsNeedCreateCommand(command)) {
logger.warn(
"Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
processDefinitionCode, processVersion, instanceId);
@@ -635,7 +639,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
logger.info("Creating command, commandInfo:{}.", command);
- int create = processService.createCommand(command);
+ int create = commandService.createCommand(command);
if (create > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.",
@@ -784,7 +788,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
- return processService.createCommand(command);
+ return commandService.createCommand(command);
}
}
@@ -824,26 +828,28 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
- createCount = processService.createCommand(command);
- if (createCount > 0)
+ createCount = commandService.createCommand(command);
+ if (createCount > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
- else
+ } else {
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ }
}
if (startDate != null && endDate != null) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
- createCount = processService.createCommand(command);
- if (createCount > 0)
+ createCount = commandService.createCommand(command);
+ if (createCount > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
- else
+ } else {
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ }
// dependent process definition
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode());
@@ -904,12 +910,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
DateUtils.dateToString(listDate.get(endDateIndex)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
- if (processService.createCommand(command) > 0)
+ if (commandService.createCommand(command) > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
- else
+ } else {
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ }
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
logger.info(
"Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
@@ -937,12 +944,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
- if (processService.createCommand(command) > 0)
+ if (commandService.createCommand(command) > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
- else
+ } else {
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ }
}
}
}
@@ -985,7 +993,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating complement dependent command, commandInfo:{}.", command);
- dependentProcessDefinitionCreateCount += processService.createCommand(dependentCommand);
+ dependentProcessDefinitionCreateCount += commandService.createCommand(dependentCommand);
}
return dependentProcessDefinitionCreateCount;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 58889636c0..3cd27f7834 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -57,6 +57,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
+import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
@@ -102,6 +103,9 @@ public class ExecutorServiceTest {
@Mock
private ProcessService processService;
+ @Mock
+ private CommandService commandService;
+
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@@ -194,8 +198,8 @@ public class ExecutorServiceTest {
.thenReturn(checkProjectAndAuth());
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
- doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null));
- doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null));
+ doReturn(1).when(commandService).createCommand(argThat(c -> c.getId() == null));
+ doReturn(0).when(commandService).createCommand(argThat(c -> c.getId() != null));
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId))
.thenReturn(Optional.ofNullable(processInstance));
@@ -230,7 +234,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- verify(processService, times(1)).createCommand(any(Command.class));
+ verify(commandService, times(1)).createCommand(any(Command.class));
}
@@ -253,7 +257,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- verify(processService, times(1)).createCommand(any(Command.class));
+ verify(commandService, times(1)).createCommand(any(Command.class));
}
@@ -320,7 +324,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
- verify(processService, times(0)).createCommand(any(Command.class));
+ verify(commandService, times(0)).createCommand(any(Command.class));
}
/**
@@ -342,7 +346,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- verify(processService, times(1)).createCommand(any(Command.class));
+ verify(commandService, times(1)).createCommand(any(Command.class));
}
/**
@@ -364,7 +368,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- verify(processService, times(31)).createCommand(any(Command.class));
+ verify(commandService, times(31)).createCommand(any(Command.class));
}
@@ -387,7 +391,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- verify(processService, times(15)).createCommand(any(Command.class));
+ verify(commandService, times(15)).createCommand(any(Command.class));
}
@@ -411,7 +415,7 @@ public class ExecutorServiceTest {
@Test
public void testExecuteRepeatRunning() {
- Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
+ Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN))
.thenReturn(checkProjectAndAuth());
Map<String, Object> result =
@@ -421,7 +425,7 @@ public class ExecutorServiceTest {
@Test
public void testOfTestRun() {
- Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
+ Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN))
.thenReturn(checkProjectAndAuth());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 377dff7191..80616048e2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
+import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
@@ -66,6 +67,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired
private ProcessService processService;
+ @Autowired
+ private CommandService commandService;
+
@Autowired
private ProcessInstanceDao processInstanceDao;
@@ -172,6 +176,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
"The workflow instance is already been cached, this case shouldn't be happened");
}
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
+ commandService,
processService,
processInstanceDao,
nettyExecutorManager,
@@ -225,7 +230,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
}
} catch (Exception e) {
logger.error("Master handle command {} error ", command.getId(), e);
- processService.moveToErrorCommand(command, e.toString());
+ commandService.moveToErrorCommand(command, e.toString());
} finally {
latch.countDown();
}
@@ -254,7 +259,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
int pageNumber = 0;
int pageSize = masterConfig.getFetchCommandNum();
final List<Command> result =
- processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+ commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
logger.info(
"Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 1a896bf9c5..6b7925a97f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -76,6 +76,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
+import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
@@ -127,6 +128,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final ProcessService processService;
+ private final CommandService commandService;
+
private ProcessInstanceDao processInstanceDao;
private final ProcessAlertManager processAlertManager;
@@ -233,6 +236,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
public WorkflowExecuteRunnable(
@NonNull ProcessInstance processInstance,
+ @NonNull CommandService commandService,
@NonNull ProcessService processService,
@NonNull ProcessInstanceDao processInstanceDao,
@NonNull NettyExecutorManager nettyExecutorManager,
@@ -241,6 +245,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
@NonNull StateWheelExecuteThread stateWheelExecuteThread,
@NonNull CuringParamsService curingParamsService) {
this.processService = processService;
+ this.commandService = commandService;
this.processInstanceDao = processInstanceDao;
this.processInstance = processInstance;
this.nettyExecutorManager = nettyExecutorManager;
@@ -657,7 +662,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
command.setTestFlag(processInstance.getTestFlag());
- return processService.createCommand(command);
+ return commandService.createCommand(command);
}
private boolean needComplementProcess() {
@@ -750,7 +755,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
command.setProcessDefinitionCode(processDefinition.getCode());
command.setProcessDefinitionVersion(processDefinition.getVersion());
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- processService.createCommand(command);
+ commandService.createCommand(command);
}
/**
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index 755311b30f..6fb703511f 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -72,6 +73,8 @@ public class WorkflowExecuteRunnableTest {
private ProcessService processService;
+ private CommandService commandService;
+
private ProcessInstanceDao processInstanceDao;
private MasterConfig config;
@@ -90,6 +93,7 @@ public class WorkflowExecuteRunnableTest {
config = new MasterConfig();
processService = Mockito.mock(ProcessService.class);
+ commandService = Mockito.mock(CommandService.class);
processInstanceDao = Mockito.mock(ProcessInstanceDao.class);
processInstance = Mockito.mock(ProcessInstance.class);
Map<String, String> cmdParam = new HashMap<>();
@@ -105,7 +109,8 @@ public class WorkflowExecuteRunnableTest {
NettyExecutorManager nettyExecutorManager = Mockito.mock(NettyExecutorManager.class);
ProcessAlertManager processAlertManager = Mockito.mock(ProcessAlertManager.class);
workflowExecuteThread = Mockito.spy(
- new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager,
+ new WorkflowExecuteRunnable(processInstance, commandService, processService, processInstanceDao,
+ nettyExecutorManager,
processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
index 31251c108e..79deabf543 100644
--- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
+++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.scheduler.quartz.utils.QuartzTaskUtils;
+import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils;
@@ -49,6 +50,9 @@ public class ProcessScheduleTask extends QuartzJobBean {
@Autowired
private ProcessService processService;
+ @Autowired
+ private CommandService commandService;
+
@Counted(value = "ds.master.quartz.job.executed")
@Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
@@ -100,7 +104,7 @@ public class ProcessScheduleTask extends QuartzJobBean {
command.setProcessInstancePriority(schedule.getProcessInstancePriority());
command.setProcessDefinitionVersion(processDefinition.getVersion());
- processService.createCommand(command);
+ commandService.createCommand(command);
}
private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) {
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java
new file mode 100644
index 0000000000..48d0a1bbae
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.command;
+
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.List;
+
+/**
+ * Command Service
+ */
+public interface CommandService {
+
+ /**
+ * Save error command, and delete original command. If the given command has already been moved into error command,
+ * will throw {@link java.sql.SQLIntegrityConstraintViolationException ).
+ * @param command command
+ * @param message message
+ */
+ void moveToErrorCommand(Command command, String message);
+
+ /**
+ * Create new command
+ * @param command command
+ * @return result
+ */
+ int createCommand(Command command);
+
+ /**
+ * Get command page
+ * @param pageSize page size
+ * @param pageNumber page number
+ * @param masterCount master count
+ * @param thisMasterSlot master slot
+ * @return command page
+ */
+ List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot);
+
+ /**
+ * check the input command exists in queue list
+ *
+ * @param command command
+ * @return create command result
+ */
+ boolean verifyIsNeedCreateCommand(Command command);
+
+ /**
+ * create recovery waiting thread command when thread pool is not enough for the process instance.
+ * sub work process instance need not create recovery command.
+ * create recovery waiting thread command and delete origin command at the same time.
+ * if the recovery command is exists, only update the field update_time
+ *
+ * @param originCommand originCommand
+ * @param processInstance processInstance
+ */
+ void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance);
+
+ /**
+ * create sub work process command
+ * @param parentProcessInstance parent process instance
+ * @param childInstance child process instance
+ * @param instanceMap process instance map
+ * @param task task instance
+ * @return command
+ */
+ Command createSubProcessCommand(ProcessInstance parentProcessInstance,
+ ProcessInstance childInstance,
+ ProcessInstanceMap instanceMap,
+ TaskInstance task);
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
new file mode 100644
index 0000000000..62ce978a84
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.command;
+
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.utils.ParamUtils;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import io.micrometer.core.annotation.Counted;
+
+/**
+ * Command Service implementation
+ */
+@Component
+public class CommandServiceImpl implements CommandService {
+
+ private final Logger logger = LoggerFactory.getLogger(CommandServiceImpl.class);
+
+ @Autowired
+ private ErrorCommandMapper errorCommandMapper;
+
+ @Autowired
+ private CommandMapper commandMapper;
+
+ @Autowired
+ private ScheduleMapper scheduleMapper;
+
+ @Autowired
+ private ProcessDefinitionMapper processDefineMapper;
+
+ @Override
+ public void moveToErrorCommand(Command command, String message) {
+ ErrorCommand errorCommand = new ErrorCommand(command, message);
+ this.errorCommandMapper.insert(errorCommand);
+ this.commandMapper.deleteById(command.getId());
+ }
+
+ @Override
+ @Counted("ds.workflow.create.command.count")
+ public int createCommand(Command command) {
+ int result = 0;
+ if (command == null) {
+ return result;
+ }
+ // add command timezone
+ Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
+ if (schedule != null) {
+ Map<String, String> commandParams =
+ StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam())
+ : new HashMap<>();
+ commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
+ command.setCommandParam(JSONUtils.toJsonString(commandParams));
+ }
+ command.setId(null);
+ result = commandMapper.insert(command);
+ return result;
+ }
+
+ @Override
+ public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
+ if (masterCount <= 0) {
+ return Lists.newArrayList();
+ }
+ return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
+ }
+
+ @Override
+ public boolean verifyIsNeedCreateCommand(Command command) {
+ boolean isNeedCreate = true;
+ EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
+ cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
+ cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
+ cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
+ CommandType commandType = command.getCommandType();
+
+ if (!cmdTypeMap.containsKey(commandType)) {
+ return true;
+ }
+
+ ObjectNode cmdParamObj = JSONUtils.parseObject(command.getCommandParam());
+ int processInstanceId = cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt();
+
+ List<Command> commands = commandMapper.selectList(null);
+ // for all commands
+ for (Command tmpCommand : commands) {
+ if (!cmdTypeMap.containsKey(tmpCommand.getCommandType())) {
+ continue;
+ }
+ ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam());
+ if (tempObj != null
+ && processInstanceId == tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
+ isNeedCreate = false;
+ break;
+ }
+ }
+ return isNeedCreate;
+ }
+
+ @Override
+ public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
+ // sub process doesn't need to create wait command
+ if (processInstance.getIsSubProcess() == Flag.YES) {
+ if (originCommand != null) {
+ commandMapper.deleteById(originCommand.getId());
+ }
+ return;
+ }
+ Map<String, String> cmdParam = new HashMap<>();
+ cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD, String.valueOf(processInstance.getId()));
+ // process instance quit by "waiting thread" state
+ if (originCommand == null) {
+ Command command = new Command(
+ CommandType.RECOVER_WAITING_THREAD,
+ processInstance.getTaskDependType(),
+ processInstance.getFailureStrategy(),
+ processInstance.getExecutorId(),
+ processInstance.getProcessDefinition().getCode(),
+ JSONUtils.toJsonString(cmdParam),
+ processInstance.getWarningType(),
+ processInstance.getWarningGroupId(),
+ processInstance.getScheduleTime(),
+ processInstance.getWorkerGroup(),
+ processInstance.getEnvironmentCode(),
+ processInstance.getProcessInstancePriority(),
+ processInstance.getDryRun(),
+ processInstance.getId(),
+ processInstance.getProcessDefinitionVersion(),
+ processInstance.getTestFlag());
+ upsertCommand(command);
+ return;
+ }
+
+ // update the command time if current command is recover from waiting
+ if (originCommand.getCommandType() == CommandType.RECOVER_WAITING_THREAD) {
+ originCommand.setUpdateTime(new Date());
+ upsertCommand(originCommand);
+ } else {
+ // delete old command and create new waiting thread command
+ commandMapper.deleteById(originCommand.getId());
+ originCommand.setId(0);
+ originCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
+ originCommand.setUpdateTime(new Date());
+ originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
+ upsertCommand(originCommand);
+ }
+ }
+
+ private int upsertCommand(@NotNull Command command) {
+ if (command.getId() != null) {
+ return commandMapper.updateById(command);
+ } else {
+ return commandMapper.insert(command);
+ }
+ }
+
+ @Override
+ public Command createSubProcessCommand(ProcessInstance parentProcessInstance, ProcessInstance childInstance,
+ ProcessInstanceMap instanceMap, TaskInstance task) {
+ CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
+ Map<String, Object> subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class);
+ long childDefineCode = 0L;
+ if (subProcessParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)) {
+ try {
+ childDefineCode =
+ Long.parseLong(
+ String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)));
+ } catch (NumberFormatException nfe) {
+ logger.error("processDefinitionCode is not a number", nfe);
+ return null;
+ }
+ }
+ ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode);
+
+ Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
+ List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+ Map<String, String> globalMap = ParamUtils.getGlobalParamMap(task.getVarPool());
+ Map<String, String> fatherParams = new HashMap<>();
+ if (CollectionUtils.isNotEmpty(allParam)) {
+ for (Property info : allParam) {
+ if (Direct.OUT == info.getDirect()) {
+ continue;
+ }
+ fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
+ }
+ }
+ String processParam = ParamUtils.getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
+ int subProcessInstanceId =
+ childInstance == null ? 0 : (childInstance.getId() == null ? 0 : childInstance.getId());
+ return new Command(
+ commandType,
+ TaskDependType.TASK_POST,
+ parentProcessInstance.getFailureStrategy(),
+ parentProcessInstance.getExecutorId(),
+ subProcessDefinition.getCode(),
+ processParam,
+ parentProcessInstance.getWarningType(),
+ parentProcessInstance.getWarningGroupId(),
+ parentProcessInstance.getScheduleTime(),
+ task.getWorkerGroup(),
+ task.getEnvironmentCode(),
+ parentProcessInstance.getProcessInstancePriority(),
+ parentProcessInstance.getDryRun(),
+ subProcessInstanceId,
+ subProcessDefinition.getVersion(),
+ parentProcessInstance.getTestFlag());
+ }
+
+ /**
+ * get sub work flow command type
+ * child instance exist: child command = fatherCommand
+ * child instance not exists: child command = fatherCommand[0]
+ */
+ private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
+ CommandType commandType = parentProcessInstance.getCommandType();
+ if (childInstance == null) {
+ String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
+ commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
+ }
+ return commandType;
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a18d751205..6f85ed3e0b 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -68,16 +68,6 @@ public interface ProcessService {
ProcessInstance handleCommand(String host,
Command command) throws CronParseException, CodeGenerateUtils.CodeGenerateException;
- void moveToErrorCommand(Command command, String message);
-
- int createCommand(Command command);
-
- List<Command> findCommandPage(int pageSize, int pageNumber);
-
- List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot);
-
- boolean verifyIsNeedCreateCommand(Command command);
-
Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);
@@ -100,8 +90,6 @@ public interface ProcessService {
void recurseFindSubProcess(long parentCode, List<Long> ids);
- void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance);
-
Tenant getTenantForProcess(int tenantId, int userId);
Environment findEnvironmentByCode(Long environmentCode);
@@ -116,19 +104,10 @@ public interface ProcessService {
void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task);
- Map<String, String> getGlobalParamMap(String globalParams);
-
- Command createSubProcessCommand(ProcessInstance parentProcessInstance,
- ProcessInstance childInstance,
- ProcessInstanceMap instanceMap,
- TaskInstance task);
-
TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance);
TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance);
- int saveCommand(Command command);
-
boolean saveTaskInstance(TaskInstance taskInstance);
boolean createTaskInstance(TaskInstance taskInstance);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 8277dc6d71..2ef495e942 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -22,8 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
@@ -61,7 +59,6 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleExecuteSql;
import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
import org.apache.dolphinscheduler.dao.entity.DqTaskStatisticsValue;
import org.apache.dolphinscheduler.dao.entity.Environment;
-import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -126,6 +123,7 @@ import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
@@ -138,12 +136,10 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
-import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -169,7 +165,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import io.micrometer.core.annotation.Counted;
/**
* process relative dao that some mappers in this.
@@ -285,6 +280,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private LogClient logClient;
+ @Autowired
+ private CommandService commandService;
+
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
@@ -300,7 +298,7 @@ public class ProcessServiceImpl implements ProcessService {
// cannot construct process instance, return null
if (processInstance == null) {
logger.error("scan command, command parameter is error: {}", command);
- moveToErrorCommand(command, "process instance is null");
+ commandService.moveToErrorCommand(command, "process instance is null");
return null;
}
processInstance.setCommandType(command.getCommandType());
@@ -387,101 +385,6 @@ public class ProcessServiceImpl implements ProcessService {
}
}
- /**
- * Save error command, and delete original command. If the given command has already been moved into error command,
- * will throw {@link java.sql.SQLIntegrityConstraintViolationException ).
- *
- * @param command command
- * @param message message
- */
- @Override
- public void moveToErrorCommand(Command command, String message) {
- ErrorCommand errorCommand = new ErrorCommand(command, message);
- this.errorCommandMapper.insert(errorCommand);
- this.commandMapper.deleteById(command.getId());
- }
-
- /**
- * insert one command
- *
- * @param command command
- * @return create result
- */
- @Override
- @Counted("ds.workflow.create.command.count")
- public int createCommand(Command command) {
- int result = 0;
- if (command == null) {
- return result;
- }
- // add command timezone
- Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
- if (schedule != null) {
- Map<String, String> commandParams =
- StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam())
- : new HashMap<>();
- commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
- command.setCommandParam(JSONUtils.toJsonString(commandParams));
- }
- command.setId(null);
- result = commandMapper.insert(command);
- return result;
- }
-
- /**
- * get command page
- */
- @Override
- public List<Command> findCommandPage(int pageSize, int pageNumber) {
- return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize);
- }
-
- /**
- * get command page
- */
- @Override
- public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
- if (masterCount <= 0) {
- return Lists.newArrayList();
- }
- return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
- }
-
- /**
- * check the input command exists in queue list
- *
- * @param command command
- * @return create command result
- */
- @Override
- public boolean verifyIsNeedCreateCommand(Command command) {
- boolean isNeedCreate = true;
- EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
- cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
- cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
- cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
- CommandType commandType = command.getCommandType();
-
- if (cmdTypeMap.containsKey(commandType)) {
- ObjectNode cmdParamObj = JSONUtils.parseObject(command.getCommandParam());
- int processInstanceId = cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt();
-
- List<Command> commands = commandMapper.selectList(null);
- // for all commands
- for (Command tmpCommand : commands) {
- if (cmdTypeMap.containsKey(tmpCommand.getCommandType())) {
- ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam());
- if (tempObj != null
- && processInstanceId == tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
- isNeedCreate = false;
- break;
- }
- }
- }
- }
- return isNeedCreate;
- }
-
/**
* find process instance detail by id
*
@@ -670,66 +573,6 @@ public class ProcessServiceImpl implements ProcessService {
}
}
- /**
- * create recovery waiting thread command when thread pool is not enough for the process instance.
- * sub work process instance need not to create recovery command.
- * create recovery waiting thread command and delete origin command at the same time.
- * if the recovery command is exists, only update the field update_time
- *
- * @param originCommand originCommand
- * @param processInstance processInstance
- */
- @Override
- public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
-
- // sub process doesnot need to create wait command
- if (processInstance.getIsSubProcess() == Flag.YES) {
- if (originCommand != null) {
- commandMapper.deleteById(originCommand.getId());
- }
- return;
- }
- Map<String, String> cmdParam = new HashMap<>();
- cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD, String.valueOf(processInstance.getId()));
- // process instance quit by "waiting thread" state
- if (originCommand == null) {
- Command command = new Command(
- CommandType.RECOVER_WAITING_THREAD,
- processInstance.getTaskDependType(),
- processInstance.getFailureStrategy(),
- processInstance.getExecutorId(),
- processInstance.getProcessDefinition().getCode(),
- JSONUtils.toJsonString(cmdParam),
- processInstance.getWarningType(),
- processInstance.getWarningGroupId(),
- processInstance.getScheduleTime(),
- processInstance.getWorkerGroup(),
- processInstance.getEnvironmentCode(),
- processInstance.getProcessInstancePriority(),
- processInstance.getDryRun(),
- processInstance.getId(),
- processInstance.getProcessDefinitionVersion(),
- processInstance.getTestFlag());
- saveCommand(command);
- return;
- }
-
- // update the command time if current command if recover from waiting
- if (originCommand.getCommandType() == CommandType.RECOVER_WAITING_THREAD) {
- originCommand.setUpdateTime(new Date());
- saveCommand(originCommand);
- } else {
- // delete old command and create new waiting thread command
- commandMapper.deleteById(originCommand.getId());
- originCommand.setId(0);
- originCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
- originCommand.setUpdateTime(new Date());
- originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
- originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
- saveCommand(originCommand);
- }
- }
-
/**
* get schedule time from command
*
@@ -1445,105 +1288,18 @@ public class ProcessServiceImpl implements ProcessService {
logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId());
return;
}
- Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
+ Command subProcessCommand =
+ commandService.createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
+ if (subProcessCommand == null) {
+ logger.error("create sub process command failed, so skip creating command");
+ return;
+ }
updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode());
initSubInstanceState(childInstance);
- createCommand(subProcessCommand);
+ commandService.createCommand(subProcessCommand);
logger.info("sub process command created: {} ", subProcessCommand);
}
- /**
- * complement data needs transform parent parameter to child.
- */
- protected String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,
- Map<String, String> fatherParams) {
- // set sub work process command
- String processMapStr = JSONUtils.toJsonString(instanceMap);
- Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
- if (parentProcessInstance.isComplementData()) {
- Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
- String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
- String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
- String scheduleTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
- if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) {
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
- }
- if (StringUtils.isNotEmpty(scheduleTime)) {
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime);
- }
- processMapStr = JSONUtils.toJsonString(cmdParam);
- }
- if (MapUtils.isNotEmpty(fatherParams)) {
- cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams));
- processMapStr = JSONUtils.toJsonString(cmdParam);
- }
- return processMapStr;
- }
-
- @Override
- public Map<String, String> getGlobalParamMap(String globalParams) {
- List<Property> propList;
- Map<String, String> globalParamMap = new HashMap<>();
- if (!Strings.isNullOrEmpty(globalParams)) {
- propList = JSONUtils.toList(globalParams, Property.class);
- globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
- }
-
- return globalParamMap;
- }
-
- /**
- * create sub work process command
- */
- @Override
- public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
- ProcessInstance childInstance,
- ProcessInstanceMap instanceMap,
- TaskInstance task) {
- CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
- Map<String, Object> subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class);
- long childDefineCode = 0L;
- if (subProcessParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)) {
- childDefineCode =
- Long.parseLong(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)));
- }
- ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode);
-
- Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
- List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
- Map<String, String> globalMap = this.getGlobalParamMap(task.getVarPool());
- Map<String, String> fatherParams = new HashMap<>();
- if (CollectionUtils.isNotEmpty(allParam)) {
- for (Property info : allParam) {
- if (Direct.OUT == info.getDirect()) {
- continue;
- }
- fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
- }
- }
- String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
- int subProcessInstanceId =
- childInstance == null ? 0 : (childInstance.getId() == null ? 0 : childInstance.getId());
- return new Command(
- commandType,
- TaskDependType.TASK_POST,
- parentProcessInstance.getFailureStrategy(),
- parentProcessInstance.getExecutorId(),
- subProcessDefinition.getCode(),
- processParam,
- parentProcessInstance.getWarningType(),
- parentProcessInstance.getWarningGroupId(),
- parentProcessInstance.getScheduleTime(),
- task.getWorkerGroup(),
- task.getEnvironmentCode(),
- parentProcessInstance.getProcessInstancePriority(),
- parentProcessInstance.getDryRun(),
- subProcessInstanceId,
- subProcessDefinition.getVersion(),
- parentProcessInstance.getTestFlag());
- }
-
/**
* initialize sub work flow state
* child instance state would be initialized when 'recovery from pause/stop/failure'
@@ -1681,21 +1437,6 @@ public class ProcessServiceImpl implements ProcessService {
return true;
}
- /**
- * insert or update command
- *
- * @param command command
- * @return save command result
- */
- @Override
- public int saveCommand(Command command) {
- if (command.getId() != null) {
- return commandMapper.updateById(command);
- } else {
- return commandMapper.insert(command);
- }
- }
-
/**
* insert or update task instance
*
@@ -2112,7 +1853,7 @@ public class ProcessServiceImpl implements ProcessService {
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
cmd.setTestFlag(processInstance.getTestFlag());
- createCommand(cmd);
+ commandService.createCommand(cmd);
}
/**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java
new file mode 100644
index 0000000000..b2bea95d8b
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.utils;
+
+import static org.apache.dolphinscheduler.common.Constants.*;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Strings;
+
+/**
+ * Param Utility class
+ */
+public class ParamUtils {
+
+ /**
+ * convert globalParams string to global parameter map
+ * @param globalParams globalParams
+ * @return parameter map
+ */
+ public static Map<String, String> getGlobalParamMap(String globalParams) {
+ List<Property> propList;
+ Map<String, String> globalParamMap = new HashMap<>();
+ if (!Strings.isNullOrEmpty(globalParams)) {
+ propList = JSONUtils.toList(globalParams, Property.class);
+ globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
+ }
+ return globalParamMap;
+ }
+
+ /**
+ * Get sub workflow parameters
+ * @param instanceMap process instance map
+ * @param parentProcessInstance parent process instance
+ * @param fatherParams fatherParams
+ * @return sub workflow parameters
+ */
+ public static String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,
+ Map<String, String> fatherParams) {
+ // set sub work process command
+ String processMapStr = JSONUtils.toJsonString(instanceMap);
+ Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
+ if (parentProcessInstance.isComplementData()) {
+ Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
+ String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+ String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+ String scheduleTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+ if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) {
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
+ }
+ if (StringUtils.isNotEmpty(scheduleTime)) {
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime);
+ }
+ processMapStr = JSONUtils.toJsonString(cmdParam);
+ }
+ if (MapUtils.isNotEmpty(fatherParams)) {
+ cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams));
+ processMapStr = JSONUtils.toJsonString(cmdParam);
+ }
+ return processMapStr;
+ }
+
+}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java
new file mode 100644
index 0000000000..739f8a099f
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.command;
+
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.mockito.ArgumentMatchers.anyString;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class CommandServiceImplTest {
+
+ @InjectMocks
+ private CommandServiceImpl commandService;
+
+ @Mock
+ private CommandMapper commandMapper;
+
+ @Mock
+ private ProcessDefinitionMapper processDefineMapper;
+
+ @Mock
+ private ScheduleMapper scheduleMapper;
+
+ @Test
+ public void testCreateSubCommand() {
+ ProcessInstance parentInstance = new ProcessInstance();
+ parentInstance.setWarningType(WarningType.SUCCESS);
+ parentInstance.setWarningGroupId(0);
+
+ TaskInstance task = new TaskInstance();
+ task.setTaskParams("{\"processDefinitionCode\":10}}");
+ task.setId(10);
+ task.setTaskCode(1L);
+ task.setTaskDefinitionVersion(1);
+
+ ProcessInstance childInstance = null;
+ ProcessInstanceMap instanceMap = new ProcessInstanceMap();
+ instanceMap.setParentProcessInstanceId(1);
+ instanceMap.setParentTaskInstanceId(10);
+ Command command;
+
+ // father history: start; child null == command type: start
+ parentInstance.setHistoryCmd("START_PROCESS");
+ parentInstance.setCommandType(CommandType.START_PROCESS);
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setCode(10L);
+ Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition);
+ Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(processDefinition);
+ command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
+ Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
+
+ // father history: start,start failure; child null == command type: start
+ parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+ parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
+ command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
+ Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
+
+ // father history: scheduler,start failure; child null == command type: scheduler
+ parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+ parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
+ command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
+ Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType());
+
+ // father history: complement,start failure; child null == command type: complement
+
+ String startString = "2020-01-01 00:00:00";
+ String endString = "2020-01-10 00:00:00";
+ parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+ parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
+ Map<String, String> complementMap = new HashMap<>();
+ complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString);
+ complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString);
+ parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
+ command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
+ Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
+
+ JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
+ Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText());
+ Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText());
+ Assertions.assertEquals(startString, DateUtils.dateToString(start));
+ Assertions.assertEquals(endString, DateUtils.dateToString(end));
+
+ // father history: start,failure,start failure; child not null == command type: start failure
+ childInstance = new ProcessInstance();
+ parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+ parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
+ command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
+ Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
+ }
+
+ @Test
+ public void testVerifyIsNeedCreateCommand() {
+
+ List<Command> commands = new ArrayList<>();
+
+ Command command = new Command();
+ command.setCommandType(CommandType.REPEAT_RUNNING);
+ command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\"}");
+ commands.add(command);
+ Mockito.when(commandMapper.selectList(null)).thenReturn(commands);
+ Assertions.assertFalse(commandService.verifyIsNeedCreateCommand(command));
+
+ Command command1 = new Command();
+ command1.setCommandType(CommandType.REPEAT_RUNNING);
+ command1.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"222\"}");
+ Assertions.assertTrue(commandService.verifyIsNeedCreateCommand(command1));
+
+ Command command2 = new Command();
+ command2.setCommandType(CommandType.PAUSE);
+ Assertions.assertTrue(commandService.verifyIsNeedCreateCommand(command2));
+ }
+
+ @Test
+ public void testCreateRecoveryWaitingThreadCommand() {
+ int id = 123;
+ Mockito.when(commandMapper.deleteById(id)).thenReturn(1);
+ ProcessInstance subProcessInstance = new ProcessInstance();
+ subProcessInstance.setIsSubProcess(Flag.YES);
+ Command originCommand = new Command();
+ originCommand.setId(id);
+ commandService.createRecoveryWaitingThreadCommand(originCommand, subProcessInstance);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(111);
+ commandService.createRecoveryWaitingThreadCommand(null, subProcessInstance);
+
+ Command recoverCommand = new Command();
+ recoverCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
+ commandService.createRecoveryWaitingThreadCommand(recoverCommand, subProcessInstance);
+
+ Command repeatRunningCommand = new Command();
+ recoverCommand.setCommandType(CommandType.REPEAT_RUNNING);
+ commandService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance);
+
+ ProcessInstance subProcessInstance2 = new ProcessInstance();
+ subProcessInstance2.setId(111);
+ subProcessInstance2.setIsSubProcess(Flag.NO);
+ commandService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance2);
+ }
+
+ @Test
+ public void giveNullOriginCommand_thenCreateRecoveryWaitingThreadCommand_expectNoDelete() {
+ ProcessInstance subProcessInstance = new ProcessInstance();
+ subProcessInstance.setIsSubProcess(Flag.NO);
+ subProcessInstance.setId(111);
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setId(111);
+ processDefinition.setCode(10L);
+ subProcessInstance.setProcessDefinition(processDefinition);
+ subProcessInstance.setWarningGroupId(1);
+ commandService.createRecoveryWaitingThreadCommand(null, subProcessInstance);
+ Mockito.verify(commandMapper, Mockito.times(0)).deleteById(anyString());
+ }
+
+ @Test
+ public void testCreateCommand() {
+ Command command = new Command();
+ command.setProcessDefinitionCode(123);
+ command.setCommandParam("{\"ProcessInstanceId\":222}");
+ command.setCommandType(CommandType.START_PROCESS);
+ int mockResult = 1;
+ Mockito.when(commandMapper.insert(command)).thenReturn(mockResult);
+ int exeMethodResult = commandService.createCommand(command);
+ Assertions.assertEquals(mockResult, exeMethodResult);
+ Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
+ }
+
+ @Test
+ public void testFindCommandPageBySlot() {
+ int pageSize = 1;
+ int pageNumber = 0;
+ int masterCount = 0;
+ int thisMasterSlot = 2;
+ List<Command> commandList =
+ commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+ Assertions.assertEquals(0, commandList.size());
+ }
+
+}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index d0ef8f8e18..b394d9605a 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -24,15 +24,12 @@ import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
@@ -42,7 +39,6 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@@ -63,7 +59,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
@@ -91,7 +86,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -105,8 +99,6 @@ import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JsonNode;
-
/**
* process service test
*/
@@ -165,132 +157,12 @@ public class ProcessServiceTest {
@Mock
private DqComparisonTypeMapper dqComparisonTypeMapper;
- @Mock
- private ScheduleMapper scheduleMapper;
-
@Mock
CuringParamsService curingGlobalParamsService;
@Mock
TaskPluginManager taskPluginManager;
- @Test
- public void testCreateSubCommand() {
- ProcessInstance parentInstance = new ProcessInstance();
- parentInstance.setWarningType(WarningType.SUCCESS);
- parentInstance.setWarningGroupId(0);
-
- TaskInstance task = new TaskInstance();
- task.setTaskParams("{\"processDefinitionCode\":10}}");
- task.setId(10);
- task.setTaskCode(1L);
- task.setTaskDefinitionVersion(1);
-
- ProcessInstance childInstance = null;
- ProcessInstanceMap instanceMap = new ProcessInstanceMap();
- instanceMap.setParentProcessInstanceId(1);
- instanceMap.setParentTaskInstanceId(10);
- Command command;
-
- // father history: start; child null == command type: start
- parentInstance.setHistoryCmd("START_PROCESS");
- parentInstance.setCommandType(CommandType.START_PROCESS);
- ProcessDefinition processDefinition = new ProcessDefinition();
- processDefinition.setCode(10L);
- Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition);
- Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(processDefinition);
- command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
- Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
-
- // father history: start,start failure; child null == command type: start
- parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
- parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
- command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
- Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
-
- // father history: scheduler,start failure; child null == command type: scheduler
- parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
- parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
- command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
- Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType());
-
- // father history: complement,start failure; child null == command type: complement
-
- String startString = "2020-01-01 00:00:00";
- String endString = "2020-01-10 00:00:00";
- parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
- parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
- Map<String, String> complementMap = new HashMap<>();
- complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString);
- complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString);
- parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
- command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
- Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
-
- JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
- Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText());
- Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText());
- Assertions.assertEquals(startString, DateUtils.dateToString(start));
- Assertions.assertEquals(endString, DateUtils.dateToString(end));
-
- // father history: start,failure,start failure; child not null == command type: start failure
- childInstance = new ProcessInstance();
- parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
- parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
- command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
- Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
- }
-
- @Test
- public void testVerifyIsNeedCreateCommand() {
-
- List<Command> commands = new ArrayList<>();
-
- Command command = new Command();
- command.setCommandType(CommandType.REPEAT_RUNNING);
- command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\"}");
- commands.add(command);
- Mockito.when(commandMapper.selectList(null)).thenReturn(commands);
- Assertions.assertFalse(processService.verifyIsNeedCreateCommand(command));
-
- Command command1 = new Command();
- command1.setCommandType(CommandType.REPEAT_RUNNING);
- command1.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"222\"}");
- Assertions.assertTrue(processService.verifyIsNeedCreateCommand(command1));
-
- Command command2 = new Command();
- command2.setCommandType(CommandType.PAUSE);
- Assertions.assertTrue(processService.verifyIsNeedCreateCommand(command2));
- }
-
- @Test
- public void testCreateRecoveryWaitingThreadCommand() {
- int id = 123;
- Mockito.when(commandMapper.deleteById(id)).thenReturn(1);
- ProcessInstance subProcessInstance = new ProcessInstance();
- subProcessInstance.setIsSubProcess(Flag.YES);
- Command originCommand = new Command();
- originCommand.setId(id);
- processService.createRecoveryWaitingThreadCommand(originCommand, subProcessInstance);
-
- ProcessInstance processInstance = new ProcessInstance();
- processInstance.setId(111);
- processService.createRecoveryWaitingThreadCommand(null, subProcessInstance);
-
- Command recoverCommand = new Command();
- recoverCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
- processService.createRecoveryWaitingThreadCommand(recoverCommand, subProcessInstance);
-
- Command repeatRunningCommand = new Command();
- recoverCommand.setCommandType(CommandType.REPEAT_RUNNING);
- processService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance);
-
- ProcessInstance subProcessInstance2 = new ProcessInstance();
- subProcessInstance2.setId(111);
- subProcessInstance2.setIsSubProcess(Flag.NO);
- processService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance2);
- }
-
@Test
public void testHandleCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException {
@@ -789,19 +661,6 @@ public class ProcessServiceTest {
Assertions.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount());
}
- @Test
- public void testCreateCommand() {
- Command command = new Command();
- command.setProcessDefinitionCode(123);
- command.setCommandParam("{\"ProcessInstanceId\":222}");
- command.setCommandType(CommandType.START_PROCESS);
- int mockResult = 1;
- Mockito.when(commandMapper.insert(command)).thenReturn(mockResult);
- int exeMethodResult = processService.createCommand(command);
- Assertions.assertEquals(mockResult, exeMethodResult);
- Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
- }
-
@Test
public void testChangeOutParam() {
TaskInstance taskInstance = new TaskInstance();
@@ -887,17 +746,6 @@ public class ProcessServiceTest {
Assertions.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
}
- @Test
- public void testFindCommandPageBySlot() {
- int pageSize = 1;
- int pageNumber = 0;
- int masterCount = 0;
- int thisMasterSlot = 2;
- List<Command> commandList =
- processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
- Assertions.assertEquals(0, commandList.size());
- }
-
@Test
public void testFindLastManualProcessInterval() {
long definitionCode = 1L;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java
new file mode 100644
index 0000000000..cdd0d761a4
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.utils;
+
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ParamUtilsTest {
+
+ @Test
+ public void testGetGlobalParamMap() {
+ String globalParam = "[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]";
+ Map<String, String> globalParamMap = ParamUtils.getGlobalParamMap(globalParam);
+ Assertions.assertEquals(globalParamMap.size(), 1);
+ Assertions.assertEquals(globalParamMap.get("startParam1"), "");
+
+ Map<String, String> emptyParamMap = ParamUtils.getGlobalParamMap(null);
+ Assertions.assertEquals(emptyParamMap.size(), 0);
+ }
+}