You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/03/28 01:52:18 UTC
[dolphinscheduler] branch dev updated: [Improvement-9227][master]implement use the slot to scan the database (#9228)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d3251c9 [Improvement-9227][master]implement use the slot to scan the database (#9228)
d3251c9 is described below
commit d3251c9bcc5744905723df7fab11a676029e7d6e
Author: worry <70...@qq.com>
AuthorDate: Mon Mar 28 09:52:12 2022 +0800
[Improvement-9227][master]implement use the slot to scan the database (#9228)
when the master assigns tasks by slot,implement use the slot to scan the database.
This closes #9227
---
.../dolphinscheduler/dao/mapper/CommandMapper.java | 6 ++++
.../dolphinscheduler/dao/mapper/CommandMapper.xml | 8 ++++++
.../dao/mapper/CommandMapperTest.java | 33 ++++++++++++++++++++++
.../master/runner/MasterSchedulerService.java | 25 ++++------------
.../service/process/ProcessService.java | 10 +++++++
.../service/process/ProcessServiceTest.java | 10 +++++++
6 files changed, 72 insertions(+), 20 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index c1e30fd..8305ae2 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -52,4 +52,10 @@ public interface CommandMapper extends BaseMapper<Command> {
*/
List<Command> queryCommandPage(@Param("limit") int limit, @Param("offset") int offset);
+
+ /**
+ * query command page by slot
+ * @return command list
+ */
+ List<Command> queryCommandPageBySlot(@Param("limit") int limit, @Param("offset") int offset, @Param("masterCount") int masterCount, @Param("thisMasterSlot") int thisMasterSlot);
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index b0ea477..aa2bf13 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -39,4 +39,12 @@
order by process_instance_priority, id asc
limit #{limit} offset #{offset}
</select>
+
+ <select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
+ select *
+ from t_ds_command
+ where id % #{masterCount} = #{thisMasterSlot}
+ order by process_instance_priority, id asc
+ limit #{limit} offset #{offset}
+ </select>
</mapper>
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
index b056d91..c937eab 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.mapper;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -167,6 +168,38 @@ public class CommandMapperTest extends BaseDaoTest {
}
/**
+ * test query command page by slot
+ */
+ @Test
+ public void testQueryCommandPageBySlot() {
+ int masterCount = 4;
+ int thisMasterSlot = 2;
+ // for hit or miss
+ toTestQueryCommandPageBySlot(masterCount,thisMasterSlot);
+ toTestQueryCommandPageBySlot(masterCount,thisMasterSlot);
+ toTestQueryCommandPageBySlot(masterCount,thisMasterSlot);
+ toTestQueryCommandPageBySlot(masterCount,thisMasterSlot);
+ }
+
+ private boolean toTestQueryCommandPageBySlot(int masterCount, int thisMasterSlot) {
+ Command command = createCommand();
+ int id = command.getId();
+ boolean hit = id % masterCount == thisMasterSlot;
+ List<Command> commandList = commandMapper.queryCommandPageBySlot(1, 0, masterCount, thisMasterSlot);
+ if (hit) {
+ assertEquals(id,commandList.get(0).getId());
+ } else {
+ commandList.forEach(o -> {
+ assertNotEquals(id, o.getId());
+ assertEquals(thisMasterSlot, o.getId() % masterCount);
+ });
+ }
+ return hit;
+ }
+
+
+
+ /**
* create command map
* @param count map count
* @param commandType comman type
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 56b6aac..f448f0f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -226,27 +226,12 @@ public class MasterSchedulerService extends Thread {
int pageNumber = 0;
int pageSize = masterConfig.getFetchCommandNum();
List<Command> result = new ArrayList<>();
- while (Stopper.isRunning()) {
- // todo: Can we use the slot to scan database?
- List<Command> commandList = processService.findCommandPage(pageSize, pageNumber);
- if (commandList.size() == 0) {
- return result;
- }
- for (Command command : commandList) {
- SlotCheckState slotCheckState = slotCheck(command);
- if (slotCheckState.equals(SlotCheckState.CHANGE)) {
- // return and wait next scan, don't reset param, waste resources of cpu
- return new ArrayList<>();
- }
- if (slotCheckState.equals(SlotCheckState.PASS)) {
- result.add(command);
- }
- }
- if (CollectionUtils.isNotEmpty(result)) {
- logger.info("find {} commands, slot:{}", result.size(), ServerNodeManager.getSlot());
- break;
+ if (Stopper.isRunning()) {
+ int thisMasterSlot = ServerNodeManager.getSlot();
+ int masterCount = ServerNodeManager.getMasterSize();
+ if (masterCount > 0) {
+ result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
}
- pageNumber += 1;
}
return result;
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index deaf568..cf12b05 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -408,6 +408,16 @@ public class ProcessService {
}
/**
+ * get command page
+ */
+ public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
+ if (masterCount <= 0) {
+ return Lists.newArrayList();
+ }
+ return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
+ }
+
+ /**
* check the input command exists in queue list
*
* @param command command
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 37c12f3..c57e6ba 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -844,6 +844,16 @@ public class ProcessServiceTest {
Assert.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
}
+ @Test
+ public void testFindCommandPageBySlot() {
+ int pageSize = 1;
+ int pageNumber = 0;
+ int masterCount = 0;
+ int thisMasterSlot = 2;
+ List<Command> commandList = processService.findCommandPageBySlot(pageSize,pageNumber,masterCount,thisMasterSlot);
+ Assert.assertEquals(0,commandList.size());
+ }
+
private TaskGroupQueue getTaskGroupQueue() {
TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
taskGroupQueue.setTaskName("task name");