You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/06/12 04:35:14 UTC
[dolphinscheduler] branch dev updated: [Improve] Enhance complement function transformation (#10376)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f6fea06f10 [Improve] Enhance complement function transformation (#10376)
f6fea06f10 is described below
commit f6fea06f1051f5962106e6299eea9696bddf12d4
Author: hstdream <33...@users.noreply.github.com>
AuthorDate: Sun Jun 12 12:35:08 2022 +0800
[Improve] Enhance complement function transformation (#10376)
---
docs/docs/en/about/glossary.md | 2 +-
docs/docs/zh/about/glossary.md | 2 +-
.../api/controller/ExecutorController.java | 2 +-
.../apache/dolphinscheduler/api/enums/Status.java | 1 +
.../api/service/impl/ExecutorServiceImpl.java | 306 ++++++++++++++-------
.../api/service/ExecutorServiceTest.java | 14 +-
.../apache/dolphinscheduler/common/Constants.java | 11 +
.../master/runner/WorkflowExecuteRunnable.java | 152 +++++-----
.../dolphinscheduler/service/corn/CronUtils.java | 53 ++--
9 files changed, 344 insertions(+), 199 deletions(-)
diff --git a/docs/docs/en/about/glossary.md b/docs/docs/en/about/glossary.md
index 492fdee963..f8ad9355bc 100644
--- a/docs/docs/en/about/glossary.md
+++ b/docs/docs/en/about/glossary.md
@@ -45,7 +45,7 @@ provided. **Continue** refers to regardless of the status of the task running in
failure. **End** means that once a failed task is found, Kill will also run the parallel task at the same time, and the
process fails and ends
-**Complement**: Supplement historical data,Supports **interval parallel and serial** two complement methods
+**Complement**: Supplement historical data,supports **interval parallel and serial** two complement methods, and two types of date selection which include **date range** and **date enumeration**.
### 2.Module introduction
diff --git a/docs/docs/zh/about/glossary.md b/docs/docs/zh/about/glossary.md
index f3ce5f3bd1..dd3043cc29 100644
--- a/docs/docs/zh/about/glossary.md
+++ b/docs/docs/zh/about/glossary.md
@@ -30,7 +30,7 @@
**失败策略**:对于并行运行的任务,如果有任务失败,提供两种失败策略处理方式,**继续**是指不管并行运行任务的状态,直到流程失败结束。**结束**是指一旦发现失败任务,则同时Kill掉正在运行的并行任务,流程失败结束
-**补数**:补历史数据,支持**区间并行和串行**两种补数方式
+**补数**:补历史数据,支持**区间并行和串行**两种补数方式,其日期选择方式包括**日期范围**和**日期枚举**两种
### 2.模块介绍
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index ba230d57a4..90c4ecab51 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -87,7 +87,7 @@ public class ExecutorController extends BaseController {
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
- * @param scheduleTime schedule time
+ * @param scheduleTime schedule time when CommandType is COMPLEMENT_DATA there are two ways to transfer parameters 1.date range, for example:{"complementStartDate":"2022-01-01 12:12:12","complementEndDate":"2022-01-6 12:12:12"} 2.manual input, for example:{"complementScheduleDateList":"2022-01-01 00:00:00,2022-01-02 12:12:12,2022-01-03 12:12:12"}
* @param failureStrategy failure strategy
* @param startNodeList start nodes list
* @param taskDependType task depend type
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 1b1bbbe463..8f5b30168d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -407,6 +407,7 @@ public enum Status {
NO_CURRENT_OPERATING_PERMISSION(1400001, "The current user does not have this permission.", "当前用户无此权限"),
FUNCTION_DISABLED(1400002, "The current feature is disabled.", "当前功能已被禁用"),
+ SCHEDULE_TIME_NUMBER(1400003, "The number of complement dates exceed 100.", "补数日期个数超过100"),
;
private final int code;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 9fafacb4f8..0d7376681d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -17,14 +17,12 @@
package org.apache.dolphinscheduler.api.service.impl;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
-
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
@@ -69,13 +67,13 @@ import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
-
-import org.apache.commons.beanutils.BeanUtils;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -84,12 +82,16 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import com.fasterxml.jackson.core.type.TypeReference;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
+import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH;
/**
* executor service impl
@@ -187,11 +189,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
+ if(!checkScheduleTimeNum(commandType,cronTime)){
+ putMsg(result, Status.SCHEDULE_TIME_NUMBER);
+ return result;
+ }
+
// check master exists
if (!checkMasterExists(result)) {
return result;
}
-
/**
* create command
*/
@@ -228,6 +234,29 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return true;
}
+ /**
+ *
+ * @param complementData
+ * @param cronTime
+ * @return CommandType is COMPLEMENT_DATA and cronTime's number is not greater than 100 return true , otherwise return false
+ */
+ private boolean checkScheduleTimeNum(CommandType complementData,String cronTime) {
+ if (!CommandType.COMPLEMENT_DATA.equals(complementData)) {
+ return true;
+ }
+ if(cronTime == null){
+ return true;
+ }
+ Map<String,String> cronMap = JSONUtils.toMap(cronTime);
+ if (cronMap.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
+ String[] stringDates = cronMap.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA);
+ if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* check whether the process definition can be executed
*
@@ -655,27 +684,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
command.setProcessInstanceId(0);
- Date start = null;
- Date end = null;
- if (!StringUtils.isEmpty(schedule)) {
- String[] interval = schedule.split(",");
- if (interval.length == 2) {
- start = DateUtils.getScheduleDate(interval[0]);
- end = DateUtils.getScheduleDate(interval[1]);
- if (start.after(end)) {
- logger.info("complement data error, wrong date start:{} and end date:{} ",
- start, end
- );
- return 0;
- }
- }
- }
// determine whether to complement
if (commandType == CommandType.COMPLEMENT_DATA) {
- if (start == null || end == null) {
+ if (schedule == null || StringUtils.isEmpty(schedule)) {
return 0;
}
- return createComplementCommandList(start, end, runMode, command, expectedParallelismNumber, complementDependentMode);
+ int check = checkScheduleTime(schedule);
+ if(check == 0){
+ return 0;
+ }
+ return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber, complementDependentMode);
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
@@ -686,89 +704,117 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* create complement command
* close left and close right
*
- * @param start
- * @param end
+ * @param scheduleTimeParam
* @param runMode
* @return
*/
- protected int createComplementCommandList(Date start, Date end, RunMode runMode, Command command,
+ protected int createComplementCommandList(String scheduleTimeParam, RunMode runMode, Command command,
Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode) {
int createCount = 0;
+ String startDate = null;
+ String endDate = null;
+ String dateList = null;
int dependentProcessDefinitionCreateCount = 0;
-
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
+ Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
+ if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ dateList = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+ dateList = removeDuplicates(dateList);
+ }
+ if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+ startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+ endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+ }
switch (runMode) {
case RUN_MODE_SERIAL: {
- if (start.after(end)) {
- logger.warn("The startDate {} is later than the endDate {}", start, end);
- break;
+ if(StringUtils.isNotEmpty(dateList)){
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ createCount = processService.createCommand(command);
}
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- createCount = processService.createCommand(command);
-
- // dependent process definition
- List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
-
- if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
- logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
- + "dependent complement data", command.getProcessDefinitionCode());
- } else {
- dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
+ if(startDate != null && endDate != null){
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ createCount = processService.createCommand(command);
+
+ // dependent process definition
+ List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+
+ if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
+ logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
+ + "dependent complement data", command.getProcessDefinitionCode());
+ } else {
+ dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
+ }
}
-
break;
}
case RUN_MODE_PARALLEL: {
- if (start.after(end)) {
- logger.warn("The startDate {} is later than the endDate {}", start, end);
- break;
- }
-
- List<Date> listDate = new ArrayList<>();
- List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
- listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules));
- int listDateSize = listDate.size();
- createCount = listDate.size();
- if (!CollectionUtils.isEmpty(listDate)) {
- if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
- createCount = Math.min(listDate.size(), expectedParallelismNumber);
- if (listDateSize < createCount) {
- createCount = listDateSize;
+ if(startDate != null && endDate != null){
+ List<Date> listDate = new ArrayList<>();
+ List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+ listDate.addAll(CronUtils.getSelfFireDateList(DateUtils.getScheduleDate(startDate), DateUtils.getScheduleDate(endDate), schedules));
+ int listDateSize = listDate.size();
+ createCount = listDate.size();
+ if (!CollectionUtils.isEmpty(listDate)) {
+ if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
+ createCount = Math.min(listDate.size(), expectedParallelismNumber);
+ if (listDateSize < createCount) {
+ createCount = listDateSize;
+ }
+ }
+ logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
+
+ // Distribute the number of tasks equally to each command.
+ // The last command with insufficient quantity will be assigned to the remaining tasks.
+ int itemsPerCommand = (listDateSize / createCount);
+ int remainingItems = (listDateSize % createCount);
+ int startDateIndex = 0;
+ int endDateIndex = 0;
+
+ for (int i = 1; i <= createCount; i++) {
+ int extra = (i <= remainingItems) ? 1 : 0;
+ int singleCommandItems = (itemsPerCommand + extra);
+
+ if (i == 1) {
+ endDateIndex += singleCommandItems - 1;
+ } else {
+ startDateIndex = endDateIndex + 1;
+ endDateIndex += singleCommandItems;
+ }
+
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex)));
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex)));
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ processService.createCommand(command);
+
+ if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
+ logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
+ + "dependent complement data", command.getProcessDefinitionCode());
+ } else {
+ dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
+ }
}
}
- logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
-
- // Distribute the number of tasks equally to each command.
- // The last command with insufficient quantity will be assigned to the remaining tasks.
- int itemsPerCommand = (listDateSize / createCount);
- int remainingItems = (listDateSize % createCount);
- int startDateIndex = 0;
- int endDateIndex = 0;
-
- for (int i = 1; i <= createCount; i++) {
- int extra = (i <= remainingItems) ? 1 : 0;
- int singleCommandItems = (itemsPerCommand + extra);
-
- if (i == 1) {
- endDateIndex += singleCommandItems - 1;
- } else {
- startDateIndex = endDateIndex + 1;
- endDateIndex += singleCommandItems;
+ }
+ if(StringUtils.isNotEmpty(dateList)){
+ List<String> listDate = Arrays.asList(dateList.split(COMMA));
+ int listDateSize = listDate.size();
+ createCount = listDate.size();
+ if (!CollectionUtils.isEmpty(listDate)) {
+ if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
+ createCount = Math.min(listDate.size(), expectedParallelismNumber);
+ if (listDateSize < createCount) {
+ createCount = listDateSize;
+ }
}
-
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex)));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex)));
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- processService.createCommand(command);
-
- if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
- logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
- + "dependent complement data", command.getProcessDefinitionCode());
- } else {
- dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
+ logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
+ for (List<String> stringDate : Lists.partition(listDate, createCount)) {
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ processService.createCommand(command);
}
}
}
@@ -854,4 +900,60 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return validDependentProcessDefinitionList;
}
+
+ /**
+ *
+ * @param schedule
+ * @return check error return 0 otherwish 1
+ */
+ private int checkScheduleTime(String schedule){
+ Date start = null;
+ Date end = null;
+ Map<String,String> scheduleResult = JSONUtils.toMap(schedule);
+ if(scheduleResult == null){
+ return 0;
+ }
+ if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ if(scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) == null){
+ return 0;
+ }
+ }
+ if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){
+ String startDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+ String endDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+ if (startDate == null || endDate == null) {
+ return 0;
+ }
+ start = DateUtils.getScheduleDate(startDate);
+ end = DateUtils.getScheduleDate(endDate);
+ if(start == null || end == null){
+ return 0;
+ }
+ if (start.after(end)) {
+ logger.error("complement data error, wrong date start:{} and end date:{} ",
+ start, end
+ );
+ return 0;
+ }
+ }
+ return 1;
+ }
+
+ /**
+ *
+ * @param scheduleTimeList
+ * @return remove duplicate date list
+ */
+ private String removeDuplicates(String scheduleTimeList){
+ HashSet<String> removeDate = new HashSet<String>();
+ List<String> resultList = new ArrayList<String>();
+ if(StringUtils.isNotEmpty(scheduleTimeList)){
+ String[] dateArrays = scheduleTimeList.split(COMMA);
+ List<String> dateList = Arrays.asList(dateArrays);
+ removeDate.addAll(dateList);
+ resultList.addAll(removeDate);
+ return String.join(COMMA, resultList);
+ }
+ return null;
+ }
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 5d170c499d..44e149285f 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -199,7 +199,7 @@ public class ExecutorServiceTest {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
- processDefinitionCode, cronTime, CommandType.START_PROCESS,
+ processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.START_PROCESS,
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
@@ -218,7 +218,7 @@ public class ExecutorServiceTest {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
- processDefinitionCode, cronTime, CommandType.START_PROCESS,
+ processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.START_PROCESS,
null, "n1,n2",
null, null, 0,
RunMode.RUN_MODE_SERIAL,
@@ -237,7 +237,7 @@ public class ExecutorServiceTest {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
- processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
+ processDefinitionCode, "{\"complementStartDate\":\"2022-01-07 12:12:12\",\"complementEndDate\":\"2022-01-06 12:12:12\"}", CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
@@ -255,7 +255,7 @@ public class ExecutorServiceTest {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
- processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
+ processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
@@ -273,7 +273,7 @@ public class ExecutorServiceTest {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
- processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
+ processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
@@ -292,7 +292,7 @@ public class ExecutorServiceTest {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(oneSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
- processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
+ processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
@@ -308,7 +308,7 @@ public class ExecutorServiceTest {
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new ArrayList<>());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
- processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
+ processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index b908cd233d..90820f6d40 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -361,6 +361,11 @@ public final class Constants {
*/
public static final String CMDPARAM_COMPLEMENT_DATA_END_DATE = "complementEndDate";
+ /**
+ * complement data Schedule date
+ */
+ public static final String CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST = "complementScheduleDateList";
+
/**
* complement date default cron string
*/
@@ -809,4 +814,10 @@ public final class Constants {
* tenant
*/
public static final int TENANT_FULL_NAME_MAX_LENGTH = 30;
+
+ /**
+ * schedule time the amount of date data is too large, affecting the memory, so set 100
+ */
+ public static final int SCHEDULE_TIME_MAX_LENGTH = 100;
+
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index f9caf000d8..3365e67c35 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -17,16 +17,10 @@
package org.apache.dolphinscheduler.server.master.runner;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
-
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -75,10 +69,8 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -96,10 +88,18 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
/**
* Workflow execute task, used to execute a workflow instance.
@@ -282,14 +282,14 @@ public class WorkflowExecuteRunnable implements Runnable {
public String getKey() {
if (StringUtils.isNotEmpty(key)
- || this.processDefinition == null) {
+ || this.processDefinition == null) {
return key;
}
key = String.format("%d_%d_%d",
- this.processDefinition.getCode(),
- this.processDefinition.getVersion(),
- this.processInstance.getId());
+ this.processDefinition.getCode(),
+ this.processDefinition.getVersion(),
+ this.processInstance.getId());
return key;
}
@@ -503,7 +503,7 @@ public class WorkflowExecuteRunnable implements Runnable {
} else {
ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
- org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
+ org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
}
}
}
@@ -526,12 +526,12 @@ public class WorkflowExecuteRunnable implements Runnable {
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) {
logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
- processInstance.getId(),
- newTaskInstance.getTaskCode(),
- newTaskInstance.getState(),
- newTaskInstance.getRetryTimes(),
- newTaskInstance.getMaxRetryTimes(),
- newTaskInstance.getRetryInterval());
+ processInstance.getId(),
+ newTaskInstance.getTaskCode(),
+ newTaskInstance.getState(),
+ newTaskInstance.getRetryTimes(),
+ newTaskInstance.getMaxRetryTimes(),
+ newTaskInstance.getRetryInterval());
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
} else {
@@ -562,7 +562,7 @@ public class WorkflowExecuteRunnable implements Runnable {
logger.info("process instance update: {}", processInstanceId);
processInstance = processService.findProcessInstanceById(processInstanceId);
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
}
@@ -591,8 +591,8 @@ public class WorkflowExecuteRunnable implements Runnable {
public boolean checkProcessInstance(StateEvent stateEvent) {
if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.error("mismatch process instance id: {}, state event:{}",
- this.processInstance.getId(),
- stateEvent);
+ this.processInstance.getId(),
+ stateEvent);
return false;
}
return true;
@@ -774,7 +774,12 @@ public class WorkflowExecuteRunnable implements Runnable {
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
}
- cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss", null));
+ if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).substring(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).indexOf(COMMA)+1));
+ }
+ if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){
+ cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, YYYY_MM_DD_HH_MM_SS, null));
+ }
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setTaskDependType(processInstance.getTaskDependType());
command.setFailureStrategy(processInstance.getFailureStrategy());
@@ -794,7 +799,7 @@ public class WorkflowExecuteRunnable implements Runnable {
private boolean needComplementProcess() {
if (processInstance.isComplementData()
- && Flag.NO == processInstance.getIsSubProcess()) {
+ && Flag.NO == processInstance.getIsSubProcess()) {
return true;
}
return false;
@@ -874,7 +879,7 @@ public class WorkflowExecuteRunnable implements Runnable {
return;
}
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
@@ -894,7 +899,7 @@ public class WorkflowExecuteRunnable implements Runnable {
List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(taskNodeList,
- startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
+ startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
if (processDag == null) {
logger.error("processDag is null");
return;
@@ -956,15 +961,24 @@ public class WorkflowExecuteRunnable implements Runnable {
if (processInstance.isComplementData() && complementListDate.isEmpty()) {
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
- if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+ if (cmdParam != null) {
// reset global params while there are start parameters
setGlobalParamIfCommanded(processDefinition, cmdParam);
- Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
- Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+ Date start = null;
+ Date end = null;
+ if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+ start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
+ end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+ }
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
if (complementListDate.isEmpty() && needComplementProcess()) {
- complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
+ if(start != null && end != null){
+ complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
+ }
+ if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ complementListDate = CronUtils.getSelfScheduleDateList(cmdParam);
+ }
logger.info(" process definition code:{} complement data: {}",
processInstance.getProcessDefinitionCode(), complementListDate);
@@ -996,15 +1010,15 @@ public class WorkflowExecuteRunnable implements Runnable {
taskProcessor.init(taskInstance, processInstance);
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
- && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
+ && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
- processInstance.getId(), processInstance.getName(),
- taskInstance.getId(), taskInstance.getName());
+ processInstance.getId(), processInstance.getName(),
+ taskInstance.getId(), taskInstance.getName());
return null;
}
@@ -1475,10 +1489,10 @@ public class WorkflowExecuteRunnable implements Runnable {
*/
private ExecutionStatus runningState(ExecutionStatus state) {
if (state == ExecutionStatus.READY_STOP
- || state == ExecutionStatus.READY_PAUSE
- || state == ExecutionStatus.WAITING_THREAD
- || state == ExecutionStatus.READY_BLOCK
- || state == ExecutionStatus.DELAY_EXECUTION) {
+ || state == ExecutionStatus.READY_PAUSE
+ || state == ExecutionStatus.WAITING_THREAD
+ || state == ExecutionStatus.READY_BLOCK
+ || state == ExecutionStatus.DELAY_EXECUTION) {
// if the running task is not completed, the state remains unchanged
return state;
} else {
@@ -1514,8 +1528,8 @@ public class WorkflowExecuteRunnable implements Runnable {
}
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
return readyToSubmitTaskQueue.size() == 0
- && activeTaskProcessorMaps.size() == 0
- && waitToRetryTaskInstanceMap.size() == 0;
+ && activeTaskProcessorMaps.size() == 0
+ && waitToRetryTaskInstanceMap.size() == 0;
}
}
return false;
@@ -1546,9 +1560,9 @@ public class WorkflowExecuteRunnable implements Runnable {
List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
if (CollectionUtils.isNotEmpty(pauseList)
- || processInstance.isBlocked()
- || !isComplementEnd()
- || readyToSubmitTaskQueue.size() > 0) {
+ || processInstance.isBlocked()
+ || !isComplementEnd()
+ || readyToSubmitTaskQueue.size() > 0) {
return ExecutionStatus.PAUSE;
} else {
return ExecutionStatus.SUCCESS;
@@ -1613,9 +1627,9 @@ public class WorkflowExecuteRunnable implements Runnable {
List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
List<TaskInstance> failList = getCompleteTaskByState(ExecutionStatus.FAILURE);
if (CollectionUtils.isNotEmpty(stopList)
- || CollectionUtils.isNotEmpty(killList)
- || CollectionUtils.isNotEmpty(failList)
- || !isComplementEnd()) {
+ || CollectionUtils.isNotEmpty(killList)
+ || CollectionUtils.isNotEmpty(failList)
+ || !isComplementEnd()) {
return ExecutionStatus.STOP;
} else {
return ExecutionStatus.SUCCESS;
@@ -1673,10 +1687,10 @@ public class WorkflowExecuteRunnable implements Runnable {
ExecutionStatus state = getProcessInstanceState(processInstance);
if (processInstance.getState() != state) {
logger.info(
- "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
- processInstance.getId(), processInstance.getName(),
- processInstance.getState(), state,
- processInstance.getCommandType());
+ "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+ processInstance.getId(), processInstance.getName(),
+ processInstance.getState(), state,
+ processInstance.getCommandType());
processInstance.setState(state);
if (state.typeIsFinished()) {
@@ -1701,10 +1715,10 @@ public class WorkflowExecuteRunnable implements Runnable {
ExecutionStatus state = stateEvent.getExecutionStatus();
if (processInstance.getState() != state) {
logger.info(
- "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
- processInstance.getId(), processInstance.getName(),
- processInstance.getState(), state,
- processInstance.getCommandType());
+ "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+ processInstance.getId(), processInstance.getName(),
+ processInstance.getState(), state,
+ processInstance.getCommandType());
processInstance.setState(state);
if (state.typeIsFinished()) {
@@ -1751,14 +1765,14 @@ public class WorkflowExecuteRunnable implements Runnable {
*/
private void removeTaskFromStandbyList(TaskInstance taskInstance) {
logger.info("remove task from stand by list, id: {} name:{}",
- taskInstance.getId(),
- taskInstance.getName());
+ taskInstance.getId(),
+ taskInstance.getName());
try {
readyToSubmitTaskQueue.remove(taskInstance);
} catch (Exception e) {
logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}",
- taskInstance.getId(),
- taskInstance.getName(), e);
+ taskInstance.getId(),
+ taskInstance.getName(), e);
}
}
@@ -1781,7 +1795,7 @@ public class WorkflowExecuteRunnable implements Runnable {
*/
private void killAllTasks() {
logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
- activeTaskProcessorMaps.size());
+ activeTaskProcessorMaps.size());
if (readyToSubmitTaskQueue.size() > 0) {
readyToSubmitTaskQueue.clear();
@@ -2070,4 +2084,4 @@ public class WorkflowExecuteRunnable implements Runnable {
break;
}
}
-}
+}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java
index f1ed739da8..376e533844 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java
@@ -17,22 +17,19 @@
package org.apache.dolphinscheduler.service.corn;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.day;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.hour;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.min;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.month;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.week;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.year;
-
-import static com.cronutils.model.CronType.QUARTZ;
-
+import com.cronutils.model.Cron;
+import com.cronutils.model.definition.CronDefinitionBuilder;
+import com.cronutils.parser.CronParser;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
-
-import org.apache.commons.collections.CollectionUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.quartz.CronExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.ArrayList;
@@ -41,14 +38,17 @@ import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
+import java.util.Map;
-import org.quartz.CronExpression;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cronutils.model.Cron;
-import com.cronutils.model.definition.CronDefinitionBuilder;
-import com.cronutils.parser.CronParser;
+import static com.cronutils.model.CronType.QUARTZ;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.day;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.hour;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.min;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.month;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.week;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.year;
/**
* // todo: this utils is heavy, it rely on quartz and corn-utils.
@@ -283,4 +283,21 @@ public class CronUtils {
return end.getTime();
}
+ /**
+ * get Schedule Date
+ * @param param
+ * @return date list
+ */
+ public static List<Date> getSelfScheduleDateList(Map<String, String> param){
+ List<Date> result = new ArrayList<>();
+ String scheduleDates = param.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+ if(StringUtils.isNotEmpty(scheduleDates)){
+ for (String stringDate : scheduleDates.split(COMMA)) {
+ result.add(DateUtils.stringToDate(stringDate));
+ }
+ return result;
+ }
+ return null;
+ }
+
}