You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2021/12/25 04:22:29 UTC
[dolphinscheduler] branch dev updated: [Feature-7576][Master] Optimize complement task's date (#7585)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3d9d91c [Feature-7576][Master] Optimize complement task's date (#7585)
3d9d91c is described below
commit 3d9d91ccc37f9dc2f6d6081892f8e18acff82fe5
Author: xiangzihao <46...@qq.com>
AuthorDate: Sat Dec 25 12:22:22 2021 +0800
[Feature-7576][Master] Optimize complement task's date (#7585)
* feature_7576
* feature_7576
* feature 7576
* feature 7576
---
.../api/service/impl/ExecutorServiceImpl.java | 43 +++++++++++++++++-----
.../api/service/ExecutorServiceTest.java | 37 ++++++++++++++-----
.../service/quartz/cron/CronUtils.java | 4 +-
3 files changed, 62 insertions(+), 22 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index f3f7ed2..5991e79 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -621,6 +621,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
switch (runMode) {
case RUN_MODE_SERIAL: {
+ if (start.after(end)) {
+ logger.warn("The startDate {} is later than the endDate {}", start, end);
+ break;
+ }
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
@@ -628,26 +632,45 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break;
}
case RUN_MODE_PARALLEL: {
+ if (start.after(end)) {
+ logger.warn("The startDate {} is later than the endDate {}", start, end);
+ break;
+ }
+
LinkedList<Date> listDate = new LinkedList<>();
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules));
+ int listDateSize = listDate.size();
createCount = listDate.size();
if (!CollectionUtils.isEmpty(listDate)) {
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
createCount = Math.min(listDate.size(), expectedParallelismNumber);
+ if (listDateSize < createCount) {
+ createCount = listDateSize;
+ }
}
logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
- listDate.addLast(end);
- int chunkSize = listDate.size() / createCount;
-
- for (int i = 0; i < createCount; i++) {
- int rangeStart = i == 0 ? i : (i * chunkSize);
- int rangeEnd = i == createCount - 1 ? listDate.size() - 1
- : rangeStart + chunkSize;
-
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart)));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd)));
+ // Distribute the number of tasks equally to each command.
+ // The last command with insufficient quantity will be assigned to the remaining tasks.
+ int itemsPerCommand = (listDateSize / createCount);
+ int remainingItems = (listDateSize % createCount);
+ int startDateIndex = 0;
+ int endDateIndex = 0;
+
+ for (int i = 1; i <= createCount; i++) {
+ int extra = (i <= remainingItems) ? 1 : 0;
+ int singleCommandItems = (itemsPerCommand + extra);
+
+ if (i == 1) {
+ endDateIndex += singleCommandItems - 1;
+ } else {
+ startDateIndex = endDateIndex + 1;
+ endDateIndex += singleCommandItems;
+ }
+
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex)));
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index da942fb..90fb173 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.service;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -32,6 +34,8 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -340,22 +344,35 @@ public class ExecutorServiceTest {
listDate.add(1);
listDate.add(2);
listDate.add(3);
+ listDate.add(4);
+ int listDateSize = listDate.size();
int createCount = Math.min(listDate.size(), expectedParallelismNumber);
logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
- listDate.addLast(4);
- int chunkSize = listDate.size() / createCount;
- for (int i = 0; i < createCount; i++) {
- int rangeStart = i == 0 ? i : (i * chunkSize);
- int rangeEnd = i == createCount - 1 ? listDate.size() - 1 : rangeStart + chunkSize;
- logger.info("rangeStart:{}, rangeEnd:{}",rangeStart, rangeEnd);
- result.add(listDate.get(rangeStart) + "," + listDate.get(rangeEnd));
+ int itemsPerCommand = (listDateSize / createCount);
+ int remainingItems = (listDateSize % createCount);
+ int startDateIndex = 0;
+ int endDateIndex = 0;
+
+ for (int i = 1; i <= createCount; i++) {
+ int extra = (i <= remainingItems) ? 1 : 0;
+ int singleCommandItems = (itemsPerCommand + extra);
+
+ if (i == 1) {
+ endDateIndex += singleCommandItems - 1;
+ } else {
+ startDateIndex = endDateIndex + 1;
+ endDateIndex += singleCommandItems;
+ }
+
+ logger.info("startDate:{}, endDate:{}", listDate.get(startDateIndex), listDate.get(endDateIndex));
+ result.add(listDate.get(startDateIndex) + "," + listDate.get(endDateIndex));
}
Assert.assertEquals("0,1", result.get(0));
- Assert.assertEquals("1,2", result.get(1));
- Assert.assertEquals("2,4", result.get(2));
-
+ Assert.assertEquals("2,3", result.get(1));
+ Assert.assertEquals("4,4", result.get(2));
}
+
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
index 3e7007a..d784722 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
@@ -191,9 +191,9 @@ public class CronUtils {
return result;
}
- // support left closed and right open time interval (startDate <= N < endDate)
+ // support left closed and right closed time interval (startDate <= N <= endDate)
Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS);
- Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS);
+ Date to = new Date(endTime.getTime() + Constants.SECOND_TIME_MILLIS);
List<Schedule> listSchedule = new ArrayList<>();
listSchedule.addAll(schedules);