You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/06/12 04:35:14 UTC

[dolphinscheduler] branch dev updated: [Improve] Enhance complement function transformation (#10376)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f6fea06f10 [Improve] Enhance complement function transformation (#10376)
f6fea06f10 is described below

commit f6fea06f1051f5962106e6299eea9696bddf12d4
Author: hstdream <33...@users.noreply.github.com>
AuthorDate: Sun Jun 12 12:35:08 2022 +0800

    [Improve] Enhance complement function transformation (#10376)
---
 docs/docs/en/about/glossary.md                     |   2 +-
 docs/docs/zh/about/glossary.md                     |   2 +-
 .../api/controller/ExecutorController.java         |   2 +-
 .../apache/dolphinscheduler/api/enums/Status.java  |   1 +
 .../api/service/impl/ExecutorServiceImpl.java      | 306 ++++++++++++++-------
 .../api/service/ExecutorServiceTest.java           |  14 +-
 .../apache/dolphinscheduler/common/Constants.java  |  11 +
 .../master/runner/WorkflowExecuteRunnable.java     | 152 +++++-----
 .../dolphinscheduler/service/corn/CronUtils.java   |  53 ++--
 9 files changed, 344 insertions(+), 199 deletions(-)

diff --git a/docs/docs/en/about/glossary.md b/docs/docs/en/about/glossary.md
index 492fdee963..f8ad9355bc 100644
--- a/docs/docs/en/about/glossary.md
+++ b/docs/docs/en/about/glossary.md
@@ -45,7 +45,7 @@ provided. **Continue** refers to regardless of the status of the task running in
 failure. **End** means that once a failed task is found, Kill will also run the parallel task at the same time, and the
 process fails and ends
 
-**Complement**: Supplement historical data,Supports **interval parallel and serial** two complement methods
+**Complement**: Supplement historical data,supports **interval parallel and serial** two complement methods, and two types of date selection which include **date range** and **date enumeration**.
 
 ### 2.Module introduction
 
diff --git a/docs/docs/zh/about/glossary.md b/docs/docs/zh/about/glossary.md
index f3ce5f3bd1..dd3043cc29 100644
--- a/docs/docs/zh/about/glossary.md
+++ b/docs/docs/zh/about/glossary.md
@@ -30,7 +30,7 @@
 
 **失败策略**:对于并行运行的任务,如果有任务失败,提供两种失败策略处理方式,**继续**是指不管并行运行任务的状态,直到流程失败结束。**结束**是指一旦发现失败任务,则同时Kill掉正在运行的并行任务,流程失败结束
 
-**补数**:补历史数据,支持**区间并行和串行**两种补数方式
+**补数**:补历史数据,支持**区间并行和串行**两种补数方式,其日期选择方式包括**日期范围**和**日期枚举**两种
 
 ### 2.模块介绍
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index ba230d57a4..90c4ecab51 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -87,7 +87,7 @@ public class ExecutorController extends BaseController {
      * @param loginUser login user
      * @param projectCode project code
      * @param processDefinitionCode process definition code
-     * @param scheduleTime schedule time
+     * @param scheduleTime schedule time when CommandType is COMPLEMENT_DATA  there are two ways to transfer parameters 1.date range, for example:{"complementStartDate":"2022-01-01 12:12:12","complementEndDate":"2022-01-6 12:12:12"} 2.manual input,  for example:{"complementScheduleDateList":"2022-01-01 00:00:00,2022-01-02 12:12:12,2022-01-03 12:12:12"}
      * @param failureStrategy failure strategy
      * @param startNodeList start nodes list
      * @param taskDependType task depend type
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 1b1bbbe463..8f5b30168d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -407,6 +407,7 @@ public enum Status {
 
     NO_CURRENT_OPERATING_PERMISSION(1400001, "The current user does not have this permission.", "当前用户无此权限"),
     FUNCTION_DISABLED(1400002, "The current feature is disabled.", "当前功能已被禁用"),
+    SCHEDULE_TIME_NUMBER(1400003, "The number of complement dates exceed 100.", "补数日期个数超过100"),
     ;
 
     private final int code;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 9fafacb4f8..0d7376681d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -17,14 +17,12 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
-
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
@@ -69,13 +67,13 @@ import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.service.corn.CronUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-
-import org.apache.commons.beanutils.BeanUtils;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,12 +82,16 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import com.fasterxml.jackson.core.type.TypeReference;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
+import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH;
 
 /**
  * executor service impl
@@ -187,11 +189,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
             return result;
         }
 
+        if(!checkScheduleTimeNum(commandType,cronTime)){
+            putMsg(result, Status.SCHEDULE_TIME_NUMBER);
+            return result;
+        }
+
         // check master exists
         if (!checkMasterExists(result)) {
             return result;
         }
-
         /**
          * create command
          */
@@ -228,6 +234,29 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         return true;
     }
 
+    /**
+     *
+     * @param complementData
+     * @param cronTime
+     * @return CommandType is COMPLEMENT_DATA and cronTime's number is not greater than 100 return true , otherwise return false
+     */
+    private boolean checkScheduleTimeNum(CommandType complementData,String cronTime) {
+        if (!CommandType.COMPLEMENT_DATA.equals(complementData)) {
+            return true;
+        }
+        if(cronTime == null){
+            return true;
+        }
+        Map<String,String> cronMap =  JSONUtils.toMap(cronTime);
+        if (cronMap.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
+            String[] stringDates = cronMap.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA);
+            if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     /**
      * check whether the process definition can be executed
      *
@@ -655,27 +684,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         }
         command.setProcessInstanceId(0);
 
-        Date start = null;
-        Date end = null;
-        if (!StringUtils.isEmpty(schedule)) {
-            String[] interval = schedule.split(",");
-            if (interval.length == 2) {
-                start = DateUtils.getScheduleDate(interval[0]);
-                end = DateUtils.getScheduleDate(interval[1]);
-                if (start.after(end)) {
-                    logger.info("complement data error, wrong date start:{} and end date:{} ",
-                            start, end
-                    );
-                    return 0;
-                }
-            }
-        }
         // determine whether to complement
         if (commandType == CommandType.COMPLEMENT_DATA) {
-            if (start == null || end == null) {
+            if (schedule == null || StringUtils.isEmpty(schedule)) {
                 return 0;
             }
-            return createComplementCommandList(start, end, runMode, command, expectedParallelismNumber, complementDependentMode);
+            int check = checkScheduleTime(schedule);
+            if(check == 0){
+                return 0;
+            }
+            return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber, complementDependentMode);
         } else {
             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
             return processService.createCommand(command);
@@ -686,89 +704,117 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      * create complement command
      * close left and close right
      *
-     * @param start
-     * @param end
+     * @param scheduleTimeParam
      * @param runMode
      * @return
      */
-    protected int createComplementCommandList(Date start, Date end, RunMode runMode, Command command,
+    protected int createComplementCommandList(String scheduleTimeParam, RunMode runMode, Command command,
                                             Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode) {
         int createCount = 0;
+        String startDate = null;
+        String endDate = null;
+        String dateList = null;
         int dependentProcessDefinitionCreateCount = 0;
-
         runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
         Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
+        Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
+        if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+            dateList = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+            dateList = removeDuplicates(dateList);
+        }
+        if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+            startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+            endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+        }
         switch (runMode) {
             case RUN_MODE_SERIAL: {
-                if (start.after(end)) {
-                    logger.warn("The startDate {} is later than the endDate {}", start, end);
-                    break;
+                if(StringUtils.isNotEmpty(dateList)){
+                    cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
+                    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                    createCount = processService.createCommand(command);
                 }
-                cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
-                cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
-                command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                createCount = processService.createCommand(command);
-
-                // dependent process definition
-                List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
-
-                if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
-                    logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
-                            + "dependent complement data", command.getProcessDefinitionCode());
-                } else {
-                    dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
+                if(startDate != null && endDate != null){
+                    cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
+                    cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
+                    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                    createCount = processService.createCommand(command);
+
+                    // dependent process definition
+                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+
+                    if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
+                        logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
+                                + "dependent complement data", command.getProcessDefinitionCode());
+                    } else {
+                        dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
+                    }
                 }
-
                 break;
             }
             case RUN_MODE_PARALLEL: {
-                if (start.after(end)) {
-                    logger.warn("The startDate {} is later than the endDate {}", start, end);
-                    break;
-                }
-
-                List<Date> listDate = new ArrayList<>();
-                List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
-                listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules));
-                int listDateSize = listDate.size();
-                createCount = listDate.size();
-                if (!CollectionUtils.isEmpty(listDate)) {
-                    if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
-                        createCount = Math.min(listDate.size(), expectedParallelismNumber);
-                        if (listDateSize < createCount) {
-                            createCount = listDateSize;
+                if(startDate != null && endDate != null){
+                    List<Date> listDate = new ArrayList<>();
+                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+                    listDate.addAll(CronUtils.getSelfFireDateList(DateUtils.getScheduleDate(startDate), DateUtils.getScheduleDate(endDate), schedules));
+                    int listDateSize = listDate.size();
+                    createCount = listDate.size();
+                    if (!CollectionUtils.isEmpty(listDate)) {
+                        if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
+                            createCount = Math.min(listDate.size(), expectedParallelismNumber);
+                            if (listDateSize < createCount) {
+                                createCount = listDateSize;
+                            }
+                        }
+                        logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
+
+                        // Distribute the number of tasks equally to each command.
+                        // The last command with insufficient quantity will be assigned to the remaining tasks.
+                        int itemsPerCommand = (listDateSize / createCount);
+                        int remainingItems = (listDateSize % createCount);
+                        int startDateIndex = 0;
+                        int endDateIndex = 0;
+
+                        for (int i = 1; i <= createCount; i++) {
+                            int extra = (i <= remainingItems) ? 1 : 0;
+                            int singleCommandItems = (itemsPerCommand + extra);
+
+                            if (i == 1) {
+                                endDateIndex += singleCommandItems - 1;
+                            } else {
+                                startDateIndex = endDateIndex + 1;
+                                endDateIndex += singleCommandItems;
+                            }
+
+                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex)));
+                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex)));
+                            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                            processService.createCommand(command);
+
+                            if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
+                                logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
+                                        + "dependent complement data", command.getProcessDefinitionCode());
+                            } else {
+                                dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
+                            }
                         }
                     }
-                    logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
-
-                    // Distribute the number of tasks equally to each command.
-                    // The last command with insufficient quantity will be assigned to the remaining tasks.
-                    int itemsPerCommand = (listDateSize / createCount);
-                    int remainingItems = (listDateSize % createCount);
-                    int startDateIndex = 0;
-                    int endDateIndex = 0;
-
-                    for (int i = 1; i <= createCount; i++) {
-                        int extra = (i <= remainingItems) ? 1 : 0;
-                        int singleCommandItems = (itemsPerCommand + extra);
-
-                        if (i == 1) {
-                            endDateIndex += singleCommandItems - 1;
-                        } else {
-                            startDateIndex = endDateIndex + 1;
-                            endDateIndex += singleCommandItems;
+                }
+                if(StringUtils.isNotEmpty(dateList)){
+                    List<String> listDate = Arrays.asList(dateList.split(COMMA));
+                    int listDateSize = listDate.size();
+                    createCount = listDate.size();
+                    if (!CollectionUtils.isEmpty(listDate)) {
+                        if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
+                            createCount = Math.min(listDate.size(), expectedParallelismNumber);
+                            if (listDateSize < createCount) {
+                                createCount = listDateSize;
+                            }
                         }
-
-                        cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex)));
-                        cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex)));
-                        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                        processService.createCommand(command);
-
-                        if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
-                            logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
-                                    + "dependent complement data", command.getProcessDefinitionCode());
-                        } else {
-                            dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
+                        logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
+                        for (List<String> stringDate : Lists.partition(listDate, createCount)) {
+                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
+                            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+                            processService.createCommand(command);
                         }
                     }
                 }
@@ -854,4 +900,60 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
 
         return validDependentProcessDefinitionList;
     }
+
+    /**
+     *
+     * @param schedule
+     * @return check error return 0 otherwish 1
+     */
+    private int checkScheduleTime(String schedule){
+        Date start = null;
+        Date end = null;
+        Map<String,String> scheduleResult = JSONUtils.toMap(schedule);
+        if(scheduleResult == null){
+            return 0;
+        }
+        if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+            if(scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) == null){
+                return 0;
+            }
+        }
+        if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){
+            String startDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+            String endDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+            if (startDate == null || endDate == null) {
+              return 0;
+            }
+            start = DateUtils.getScheduleDate(startDate);
+            end = DateUtils.getScheduleDate(endDate);
+            if(start == null || end == null){
+                return 0;
+            }
+            if (start.after(end)) {
+                logger.error("complement data error, wrong date start:{} and end date:{} ",
+                        start, end
+                );
+                return 0;
+            }
+        }
+        return 1;
+    }
+
+    /**
+     *
+     * @param scheduleTimeList
+     * @return remove duplicate date list
+     */
+    private String removeDuplicates(String scheduleTimeList){
+        HashSet<String> removeDate = new HashSet<String>();
+        List<String> resultList = new ArrayList<String>();
+        if(StringUtils.isNotEmpty(scheduleTimeList)){
+            String[] dateArrays = scheduleTimeList.split(COMMA);
+            List<String> dateList = Arrays.asList(dateArrays);
+            removeDate.addAll(dateList);
+            resultList.addAll(removeDate);
+            return String.join(COMMA, resultList);
+        }
+        return null;
+    }
 }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 5d170c499d..44e149285f 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -199,7 +199,7 @@ public class ExecutorServiceTest {
 
         Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
-                processDefinitionCode, cronTime, CommandType.START_PROCESS,
+                processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.START_PROCESS,
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
@@ -218,7 +218,7 @@ public class ExecutorServiceTest {
 
         Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
-                processDefinitionCode, cronTime, CommandType.START_PROCESS,
+                processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.START_PROCESS,
                 null, "n1,n2",
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
@@ -237,7 +237,7 @@ public class ExecutorServiceTest {
 
         Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
-                processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
+                processDefinitionCode, "{\"complementStartDate\":\"2022-01-07 12:12:12\",\"complementEndDate\":\"2022-01-06 12:12:12\"}", CommandType.COMPLEMENT_DATA,
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
@@ -255,7 +255,7 @@ public class ExecutorServiceTest {
 
         Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
-                processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
+                processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA,
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
@@ -273,7 +273,7 @@ public class ExecutorServiceTest {
 
         Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
-                processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
+                processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA,
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
@@ -292,7 +292,7 @@ public class ExecutorServiceTest {
 
         Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(oneSchedulerList());
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
-                processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
+                processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA,
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
@@ -308,7 +308,7 @@ public class ExecutorServiceTest {
         Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new ArrayList<>());
 
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
-                processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
+                processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA,
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index b908cd233d..90820f6d40 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -361,6 +361,11 @@ public final class Constants {
      */
     public static final String CMDPARAM_COMPLEMENT_DATA_END_DATE = "complementEndDate";
 
+    /**
+     * complement data Schedule date
+     */
+    public static final String CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST = "complementScheduleDateList";
+
     /**
      * complement date default cron string
      */
@@ -809,4 +814,10 @@ public final class Constants {
      * tenant
      */
     public static final int TENANT_FULL_NAME_MAX_LENGTH = 30;
+
+    /**
+     * schedule time  the amount of date data is too large, affecting the memory, so set 100
+     */
+    public static final int SCHEDULE_TIME_MAX_LENGTH = 100;
+
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index f9caf000d8..3365e67c35 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -17,16 +17,10 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
-
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -75,10 +69,8 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.corn.CronUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -96,10 +88,18 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
 
 /**
  * Workflow execute task, used to execute a workflow instance.
@@ -282,14 +282,14 @@ public class WorkflowExecuteRunnable implements Runnable {
 
     public String getKey() {
         if (StringUtils.isNotEmpty(key)
-            || this.processDefinition == null) {
+                || this.processDefinition == null) {
             return key;
         }
 
         key = String.format("%d_%d_%d",
-            this.processDefinition.getCode(),
-            this.processDefinition.getVersion(),
-            this.processInstance.getId());
+                this.processDefinition.getCode(),
+                this.processDefinition.getVersion(),
+                this.processInstance.getId());
         return key;
     }
 
@@ -503,7 +503,7 @@ public class WorkflowExecuteRunnable implements Runnable {
                 } else {
                     ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
                     this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
-                        org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
+                            org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
                 }
             }
         }
@@ -526,12 +526,12 @@ public class WorkflowExecuteRunnable implements Runnable {
         waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
         if (!taskInstance.retryTaskIntervalOverTime()) {
             logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
-                processInstance.getId(),
-                newTaskInstance.getTaskCode(),
-                newTaskInstance.getState(),
-                newTaskInstance.getRetryTimes(),
-                newTaskInstance.getMaxRetryTimes(),
-                newTaskInstance.getRetryInterval());
+                    processInstance.getId(),
+                    newTaskInstance.getTaskCode(),
+                    newTaskInstance.getState(),
+                    newTaskInstance.getRetryTimes(),
+                    newTaskInstance.getMaxRetryTimes(),
+                    newTaskInstance.getRetryInterval());
             stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
             stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
         } else {
@@ -562,7 +562,7 @@ public class WorkflowExecuteRunnable implements Runnable {
         logger.info("process instance update: {}", processInstanceId);
         processInstance = processService.findProcessInstanceById(processInstanceId);
         processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-            processInstance.getProcessDefinitionVersion());
+                processInstance.getProcessDefinitionVersion());
         processInstance.setProcessDefinition(processDefinition);
     }
 
@@ -591,8 +591,8 @@ public class WorkflowExecuteRunnable implements Runnable {
     public boolean checkProcessInstance(StateEvent stateEvent) {
         if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
             logger.error("mismatch process instance id: {}, state event:{}",
-                this.processInstance.getId(),
-                stateEvent);
+                    this.processInstance.getId(),
+                    stateEvent);
             return false;
         }
         return true;
@@ -774,7 +774,12 @@ public class WorkflowExecuteRunnable implements Runnable {
         if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
             cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
         }
-        cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss", null));
+        if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+            cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).substring(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).indexOf(COMMA)+1));
+        }
+        if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){
+            cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, YYYY_MM_DD_HH_MM_SS, null));
+        }
         command.setCommandParam(JSONUtils.toJsonString(cmdParam));
         command.setTaskDependType(processInstance.getTaskDependType());
         command.setFailureStrategy(processInstance.getFailureStrategy());
@@ -794,7 +799,7 @@ public class WorkflowExecuteRunnable implements Runnable {
 
     private boolean needComplementProcess() {
         if (processInstance.isComplementData()
-            && Flag.NO == processInstance.getIsSubProcess()) {
+                && Flag.NO == processInstance.getIsSubProcess()) {
             return true;
         }
         return false;
@@ -874,7 +879,7 @@ public class WorkflowExecuteRunnable implements Runnable {
             return;
         }
         processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-            processInstance.getProcessDefinitionVersion());
+                processInstance.getProcessDefinitionVersion());
         processInstance.setProcessDefinition(processDefinition);
 
         List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
@@ -894,7 +899,7 @@ public class WorkflowExecuteRunnable implements Runnable {
         List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
         List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
         ProcessDag processDag = generateFlowDag(taskNodeList,
-            startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
+                startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
         if (processDag == null) {
             logger.error("processDag is null");
             return;
@@ -956,15 +961,24 @@ public class WorkflowExecuteRunnable implements Runnable {
 
         if (processInstance.isComplementData() && complementListDate.isEmpty()) {
             Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
-            if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+            if (cmdParam != null) {
                 // reset global params while there are start parameters
                 setGlobalParamIfCommanded(processDefinition, cmdParam);
 
-                Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
-                Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+                Date start = null;
+                Date end = null;
+                if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+                    start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
+                    end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+                }
                 List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
                 if (complementListDate.isEmpty() && needComplementProcess()) {
-                    complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
+                    if(start != null && end != null){
+                        complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
+                    }
+                    if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+                        complementListDate = CronUtils.getSelfScheduleDateList(cmdParam);
+                    }
                     logger.info(" process definition code:{} complement data: {}",
                             processInstance.getProcessDefinitionCode(), complementListDate);
 
@@ -996,15 +1010,15 @@ public class WorkflowExecuteRunnable implements Runnable {
             taskProcessor.init(taskInstance, processInstance);
 
             if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
-                && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
+                    && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
                 notifyProcessHostUpdate(taskInstance);
             }
 
             boolean submit = taskProcessor.action(TaskAction.SUBMIT);
             if (!submit) {
                 logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
-                    processInstance.getId(), processInstance.getName(),
-                    taskInstance.getId(), taskInstance.getName());
+                        processInstance.getId(), processInstance.getName(),
+                        taskInstance.getId(), taskInstance.getName());
                 return null;
             }
 
@@ -1475,10 +1489,10 @@ public class WorkflowExecuteRunnable implements Runnable {
      */
     private ExecutionStatus runningState(ExecutionStatus state) {
         if (state == ExecutionStatus.READY_STOP
-            || state == ExecutionStatus.READY_PAUSE
-            || state == ExecutionStatus.WAITING_THREAD
-            || state == ExecutionStatus.READY_BLOCK
-            || state == ExecutionStatus.DELAY_EXECUTION) {
+                || state == ExecutionStatus.READY_PAUSE
+                || state == ExecutionStatus.WAITING_THREAD
+                || state == ExecutionStatus.READY_BLOCK
+                || state == ExecutionStatus.DELAY_EXECUTION) {
             // if the running task is not completed, the state remains unchanged
             return state;
         } else {
@@ -1514,8 +1528,8 @@ public class WorkflowExecuteRunnable implements Runnable {
             }
             if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
                 return readyToSubmitTaskQueue.size() == 0
-                    && activeTaskProcessorMaps.size() == 0
-                    && waitToRetryTaskInstanceMap.size() == 0;
+                        && activeTaskProcessorMaps.size() == 0
+                        && waitToRetryTaskInstanceMap.size() == 0;
             }
         }
         return false;
@@ -1546,9 +1560,9 @@ public class WorkflowExecuteRunnable implements Runnable {
 
         List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
         if (CollectionUtils.isNotEmpty(pauseList)
-            || processInstance.isBlocked()
-            || !isComplementEnd()
-            || readyToSubmitTaskQueue.size() > 0) {
+                || processInstance.isBlocked()
+                || !isComplementEnd()
+                || readyToSubmitTaskQueue.size() > 0) {
             return ExecutionStatus.PAUSE;
         } else {
             return ExecutionStatus.SUCCESS;
@@ -1613,9 +1627,9 @@ public class WorkflowExecuteRunnable implements Runnable {
             List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
             List<TaskInstance> failList = getCompleteTaskByState(ExecutionStatus.FAILURE);
             if (CollectionUtils.isNotEmpty(stopList)
-                || CollectionUtils.isNotEmpty(killList)
-                || CollectionUtils.isNotEmpty(failList)
-                || !isComplementEnd()) {
+                    || CollectionUtils.isNotEmpty(killList)
+                    || CollectionUtils.isNotEmpty(failList)
+                    || !isComplementEnd()) {
                 return ExecutionStatus.STOP;
             } else {
                 return ExecutionStatus.SUCCESS;
@@ -1673,10 +1687,10 @@ public class WorkflowExecuteRunnable implements Runnable {
         ExecutionStatus state = getProcessInstanceState(processInstance);
         if (processInstance.getState() != state) {
             logger.info(
-                "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
-                processInstance.getId(), processInstance.getName(),
-                processInstance.getState(), state,
-                processInstance.getCommandType());
+                    "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+                    processInstance.getId(), processInstance.getName(),
+                    processInstance.getState(), state,
+                    processInstance.getCommandType());
 
             processInstance.setState(state);
             if (state.typeIsFinished()) {
@@ -1701,10 +1715,10 @@ public class WorkflowExecuteRunnable implements Runnable {
         ExecutionStatus state = stateEvent.getExecutionStatus();
         if (processInstance.getState() != state) {
             logger.info(
-                "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
-                processInstance.getId(), processInstance.getName(),
-                processInstance.getState(), state,
-                processInstance.getCommandType());
+                    "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+                    processInstance.getId(), processInstance.getName(),
+                    processInstance.getState(), state,
+                    processInstance.getCommandType());
 
             processInstance.setState(state);
             if (state.typeIsFinished()) {
@@ -1751,14 +1765,14 @@ public class WorkflowExecuteRunnable implements Runnable {
      */
     private void removeTaskFromStandbyList(TaskInstance taskInstance) {
         logger.info("remove task from stand by list, id: {} name:{}",
-            taskInstance.getId(),
-            taskInstance.getName());
+                taskInstance.getId(),
+                taskInstance.getName());
         try {
             readyToSubmitTaskQueue.remove(taskInstance);
         } catch (Exception e) {
             logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}",
-                taskInstance.getId(),
-                taskInstance.getName(), e);
+                    taskInstance.getId(),
+                    taskInstance.getName(), e);
         }
     }
 
@@ -1781,7 +1795,7 @@ public class WorkflowExecuteRunnable implements Runnable {
      */
     private void killAllTasks() {
         logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
-            activeTaskProcessorMaps.size());
+                activeTaskProcessorMaps.size());
 
         if (readyToSubmitTaskQueue.size() > 0) {
             readyToSubmitTaskQueue.clear();
@@ -2070,4 +2084,4 @@ public class WorkflowExecuteRunnable implements Runnable {
                 break;
         }
     }
-}
+}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java
index f1ed739da8..376e533844 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java
@@ -17,22 +17,19 @@
 
 package org.apache.dolphinscheduler.service.corn;
 
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.day;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.hour;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.min;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.month;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.week;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.year;
-
-import static com.cronutils.model.CronType.QUARTZ;
-
+import com.cronutils.model.Cron;
+import com.cronutils.model.definition.CronDefinitionBuilder;
+import com.cronutils.parser.CronParser;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CycleEnum;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
-
-import org.apache.commons.collections.CollectionUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.quartz.CronExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.text.ParseException;
 import java.util.ArrayList;
@@ -41,14 +38,17 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
+import java.util.Map;
 
-import org.quartz.CronExpression;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cronutils.model.Cron;
-import com.cronutils.model.definition.CronDefinitionBuilder;
-import com.cronutils.parser.CronParser;
+import static com.cronutils.model.CronType.QUARTZ;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.day;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.hour;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.min;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.month;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.week;
+import static org.apache.dolphinscheduler.service.corn.CycleFactory.year;
 
 /**
  * // todo: this utils is heavy, it rely on quartz and corn-utils.
@@ -283,4 +283,21 @@ public class CronUtils {
         return end.getTime();
     }
 
+    /**
+     * get Schedule Date
+     * @param param
+     * @return  date list
+     */
+    public static List<Date> getSelfScheduleDateList(Map<String, String> param){
+        List<Date> result = new ArrayList<>();
+        String scheduleDates = param.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+        if(StringUtils.isNotEmpty(scheduleDates)){
+            for (String stringDate : scheduleDates.split(COMMA)) {
+                result.add(DateUtils.stringToDate(stringDate));
+            }
+            return result;
+        }
+        return null;
+    }
+
 }