You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by "zhuangchong (via GitHub)" <gi...@apache.org> on 2023/07/05 02:25:21 UTC

[GitHub] [dolphinscheduler] zhuangchong commented on a diff in pull request #14450: [Feature-14321][API] Support to complement data in descending or ascending order of date

zhuangchong commented on code in PR #14450:
URL: https://github.com/apache/dolphinscheduler/pull/14450#discussion_r1252471774


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -822,19 +836,62 @@ private int createCommand(Long triggerCode, CommandType commandType, long proces
         }
     }
 
+    private int createComplementCommand(Long triggerCode, Command command, Map<String, String> cmdParam,
+                                        List<ZonedDateTime> dateTimeList, List<Schedule> schedules,
+                                        ComplementDependentMode complementDependentMode, boolean allLevelDependent) {
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+        int createCount = 0;
+        String dateTimeListStr = dateTimeList.stream()
+                .map(item -> item.toLocalDateTime().format(formatter))

Review Comment:
   Can you use the DateUtils tool class?
   



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -843,153 +900,73 @@ protected int createComplementCommandList(Long triggerCode, String scheduleTimeP
         runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
         Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
         Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
-        if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
-            dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
-            dateList = removeDuplicates(dateList);
+
+        if (Objects.isNull(executionOrder)) {
+            executionOrder = ExecutionOrder.DESC_ORDER;
         }
+
+        List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
+                command.getProcessDefinitionCode());
+
+        List<ZonedDateTime> listDate = new ArrayList<>();
         if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(
                 CMD_PARAM_COMPLEMENT_DATA_END_DATE)) {
             startDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE);
             endDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE);
+            if (startDate != null && endDate != null) {
+                listDate = CronUtils.getSelfFireDateList(
+                        DateUtils.stringToZoneDateTime(startDate),
+                        DateUtils.stringToZoneDateTime(endDate),
+                        schedules);
+            }
+        }
+
+        if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
+            dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+            dateList = removeDuplicates(dateList);
+
+            listDate = Splitter.on(COMMA).splitToStream(dateList).map(item -> DateUtils.stringToZoneDateTime(item))
+                    .collect(
+                            Collectors.toList());

Review Comment:
   These two lines of code, namely loop deduplication and loop conversion, is it better to put them together for loop processing?



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -843,153 +900,73 @@ protected int createComplementCommandList(Long triggerCode, String scheduleTimeP
         runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
         Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
         Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
-        if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
-            dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
-            dateList = removeDuplicates(dateList);
+
+        if (Objects.isNull(executionOrder)) {
+            executionOrder = ExecutionOrder.DESC_ORDER;
         }
+
+        List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
+                command.getProcessDefinitionCode());
+
+        List<ZonedDateTime> listDate = new ArrayList<>();
         if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(
                 CMD_PARAM_COMPLEMENT_DATA_END_DATE)) {
             startDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE);
             endDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE);
+            if (startDate != null && endDate != null) {
+                listDate = CronUtils.getSelfFireDateList(
+                        DateUtils.stringToZoneDateTime(startDate),
+                        DateUtils.stringToZoneDateTime(endDate),
+                        schedules);
+            }
+        }
+
+        if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
+            dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+            dateList = removeDuplicates(dateList);
+
+            listDate = Splitter.on(COMMA).splitToStream(dateList).map(item -> DateUtils.stringToZoneDateTime(item))
+                    .collect(
+                            Collectors.toList());
         }
+
+        if (executionOrder.equals(ExecutionOrder.DESC_ORDER)) {
+            Collections.sort(listDate, Collections.reverseOrder());
+        } else {
+            Collections.sort(listDate);
+        }
+
         switch (runMode) {
             case RUN_MODE_SERIAL: {
                 log.info("RunMode of {} command is serial run, processDefinitionCode:{}.",
                         command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                if (StringUtils.isNotEmpty(dateList)) {
-                    cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
-                    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                    log.info("Creating command, commandInfo:{}.", command);
-                    createCount = commandService.createCommand(command);
-                    if (createCount > 0) {
-                        log.info("Create {} command complete, processDefinitionCode:{}",
-                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                    } else {
-                        log.error("Create {} command error, processDefinitionCode:{}",
-                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                    }
-                }
-                if (startDate != null && endDate != null) {
-                    cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, startDate);
-                    cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endDate);
-                    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                    log.info("Creating command, commandInfo:{}.", command);
-                    createCount = commandService.createCommand(command);
-                    if (createCount > 0) {
-                        log.info("Create {} command complete, processDefinitionCode:{}",
-                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                    } else {
-                        log.error("Create {} command error, processDefinitionCode:{}",
-                                command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                    }
-                    // dependent process definition
-                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
-                            command.getProcessDefinitionCode());
-
-                    if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
-                        log.info(
-                                "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
-                                command.getProcessDefinitionCode());
-                    } else {
-                        log.info(
-                                "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
-                                command.getProcessDefinitionCode());
-                        dependentProcessDefinitionCreateCount +=
-                                createComplementDependentCommand(schedules, command, allLevelDependent);
-                    }
-                }
-                if (createCount > 0) {
-                    triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId());
-                }
+                createCount = createComplementCommand(triggerCode, command, cmdParam, listDate, schedules,
+                        complementDependentMode, allLevelDependent);
                 break;
             }
             case RUN_MODE_PARALLEL: {
                 log.info("RunMode of {} command is parallel run, processDefinitionCode:{}.",
                         command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                if (startDate != null && endDate != null) {
-                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
-                            command.getProcessDefinitionCode());
-                    List<ZonedDateTime> listDate = CronUtils.getSelfFireDateList(
-                            DateUtils.stringToZoneDateTime(startDate),
-                            DateUtils.stringToZoneDateTime(endDate),
-                            schedules);
-                    int listDateSize = listDate.size();
-                    createCount = listDate.size();
-                    if (!CollectionUtils.isEmpty(listDate)) {
-                        if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
-                            createCount = Math.min(createCount, expectedParallelismNumber);
-                        }
-                        log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
-                                createCount);
-
-                        // Distribute the number of tasks equally to each command.
-                        // The last command with insufficient quantity will be assigned to the remaining tasks.
-                        int itemsPerCommand = (listDateSize / createCount);
-                        int remainingItems = (listDateSize % createCount);
-                        int startDateIndex = 0;
-                        int endDateIndex = 0;
-
-                        for (int i = 1; i <= createCount; i++) {
-                            int extra = (i <= remainingItems) ? 1 : 0;
-                            int singleCommandItems = (itemsPerCommand + extra);
-
-                            if (i == 1) {
-                                endDateIndex += singleCommandItems - 1;
-                            } else {
-                                startDateIndex = endDateIndex + 1;
-                                endDateIndex += singleCommandItems;
-                            }
-
-                            cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE,
-                                    DateUtils.dateToString(listDate.get(startDateIndex)));
-                            cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE,
-                                    DateUtils.dateToString(listDate.get(endDateIndex)));
-                            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                            log.info("Creating command, commandInfo:{}.", command);
-                            if (commandService.createCommand(command) > 0) {
-                                log.info("Create {} command complete, processDefinitionCode:{}",
-                                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                                triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode,
-                                        command.getId());
-                            } else {
-                                log.error("Create {} command error, processDefinitionCode:{}",
-                                        command.getCommandType().getDescp(), command.getProcessDefinitionCode());
-                            }
-                            if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
-                                log.info(
-                                        "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
-                                        command.getProcessDefinitionCode());
-                            } else {
-                                log.info(
-                                        "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
-                                        command.getProcessDefinitionCode());
-                                dependentProcessDefinitionCreateCount +=
-                                        createComplementDependentCommand(schedules, command, allLevelDependent);
-                            }
-                        }
+
+                int queueNum = 0;
+                if (CollectionUtils.isNotEmpty(listDate)) {

Review Comment:
   should be placed in line 932?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org