You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2021/12/25 04:22:29 UTC

[dolphinscheduler] branch dev updated: [Feature-7576][Master] Optimize complement task's date (#7585)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3d9d91c  [Feature-7576][Master] Optimize complement task's date (#7585)
3d9d91c is described below

commit 3d9d91ccc37f9dc2f6d6081892f8e18acff82fe5
Author: xiangzihao <46...@qq.com>
AuthorDate: Sat Dec 25 12:22:22 2021 +0800

    [Feature-7576][Master] Optimize complement task's date (#7585)
    
    * feature_7576
    
    * feature_7576
    
    * feature 7576
    
    * feature 7576
---
 .../api/service/impl/ExecutorServiceImpl.java      | 43 +++++++++++++++++-----
 .../api/service/ExecutorServiceTest.java           | 37 ++++++++++++++-----
 .../service/quartz/cron/CronUtils.java             |  4 +-
 3 files changed, 62 insertions(+), 22 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index f3f7ed2..5991e79 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -621,6 +621,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
         switch (runMode) {
             case RUN_MODE_SERIAL: {
+                if (start.after(end)) {
+                    logger.warn("The startDate {} is later than the endDate {}", start, end);
+                    break;
+                }
                 cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
                 cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
                 command.setCommandParam(JSONUtils.toJsonString(cmdParam));
@@ -628,26 +632,45 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 break;
             }
             case RUN_MODE_PARALLEL: {
+                if (start.after(end)) {
+                    logger.warn("The startDate {} is later than the endDate {}", start, end);
+                    break;
+                }
+
                 LinkedList<Date> listDate = new LinkedList<>();
                 List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
                 listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules));
+                int listDateSize = listDate.size();
                 createCount = listDate.size();
                 if (!CollectionUtils.isEmpty(listDate)) {
                     if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
                         createCount = Math.min(listDate.size(), expectedParallelismNumber);
+                        if (listDateSize < createCount) {
+                            createCount = listDateSize;
+                        }
                     }
                     logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
 
-                    listDate.addLast(end);
-                    int chunkSize = listDate.size() / createCount;
-
-                    for (int i = 0; i < createCount; i++) {
-                        int rangeStart = i == 0 ? i : (i * chunkSize);
-                        int rangeEnd = i == createCount - 1 ? listDate.size() - 1
-                                : rangeStart + chunkSize;
-
-                        cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart)));
-                        cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd)));
+                    // Distribute the number of tasks equally to each command.
+                    // The last command with insufficient quantity will be assigned to the remaining tasks.
+                    int itemsPerCommand = (listDateSize / createCount);
+                    int remainingItems = (listDateSize % createCount);
+                    int startDateIndex = 0;
+                    int endDateIndex = 0;
+
+                    for (int i = 1; i <= createCount; i++) {
+                        int extra = (i <= remainingItems) ? 1 : 0;
+                        int singleCommandItems = (itemsPerCommand + extra);
+
+                        if (i == 1) {
+                            endDateIndex += singleCommandItems - 1;
+                        } else {
+                            startDateIndex = endDateIndex + 1;
+                            endDateIndex += singleCommandItems;
+                        }
+
+                        cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex)));
+                        cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex)));
                         command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                         processService.createCommand(command);
                     }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index da942fb..90fb173 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -32,6 +34,8 @@ import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.RunMode;
 import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -340,22 +344,35 @@ public class ExecutorServiceTest {
         listDate.add(1);
         listDate.add(2);
         listDate.add(3);
+        listDate.add(4);
 
+        int listDateSize = listDate.size();
         int createCount = Math.min(listDate.size(), expectedParallelismNumber);
         logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
 
-        listDate.addLast(4);
-        int chunkSize = listDate.size() / createCount;
-        for (int i = 0; i < createCount; i++) {
-            int rangeStart = i == 0 ? i : (i * chunkSize);
-            int rangeEnd = i == createCount - 1 ? listDate.size() - 1 : rangeStart + chunkSize;
-            logger.info("rangeStart:{}, rangeEnd:{}",rangeStart, rangeEnd);
-            result.add(listDate.get(rangeStart) + "," + listDate.get(rangeEnd));
+        int itemsPerCommand = (listDateSize / createCount);
+        int remainingItems = (listDateSize % createCount);
+        int startDateIndex = 0;
+        int endDateIndex = 0;
+
+        for (int i = 1; i <= createCount; i++) {
+            int extra = (i <= remainingItems) ? 1 : 0;
+            int singleCommandItems = (itemsPerCommand + extra);
+
+            if (i == 1) {
+                endDateIndex += singleCommandItems - 1;
+            } else {
+                startDateIndex = endDateIndex + 1;
+                endDateIndex += singleCommandItems;
+            }
+
+            logger.info("startDate:{}, endDate:{}", listDate.get(startDateIndex), listDate.get(endDateIndex));
+            result.add(listDate.get(startDateIndex) + "," + listDate.get(endDateIndex));
         }
 
         Assert.assertEquals("0,1", result.get(0));
-        Assert.assertEquals("1,2", result.get(1));
-        Assert.assertEquals("2,4", result.get(2));
-
+        Assert.assertEquals("2,3", result.get(1));
+        Assert.assertEquals("4,4", result.get(2));
     }
+    
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
index 3e7007a..d784722 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
@@ -191,9 +191,9 @@ public class CronUtils {
             return result;
         }
 
-        // support left closed and right open time interval (startDate <= N < endDate)
+        // support left closed and right closed time interval (startDate <= N <= endDate)
         Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS);
-        Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS);
+        Date to = new Date(endTime.getTime() + Constants.SECOND_TIME_MILLIS);
 
         List<Schedule> listSchedule = new ArrayList<>();
         listSchedule.addAll(schedules);