You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2024/03/30 01:43:47 UTC
(dolphinscheduler) branch dev updated: Fix TaskGroupCoordinator might cause OOM when there is a lot of waiting TaskGroupQueue (#15773)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new dc4dad135c Fix TaskGroupCoordinator might cause OOM when there is a lot of waiting TaskGroupQueue (#15773)
dc4dad135c is described below
commit dc4dad135c9ccacd376dfa139f174eb0dd086b24
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Mar 30 09:43:42 2024 +0800
Fix TaskGroupCoordinator might cause OOM when there is a lot of waiting TaskGroupQueue (#15773)
---
.../dao/mapper/TaskGroupQueueMapper.java | 14 ++++
.../dao/repository/TaskGroupQueueDao.java | 31 +++++++++
.../dao/repository/impl/TaskGroupQueueDaoImpl.java | 22 +++++++
.../dao/mapper/TaskGroupQueueMapper.xml | 27 ++++++++
.../repository/impl/TaskGroupQueueDaoImplTest.java | 76 ++++++++++++++++++++++
.../runner/taskgroup/TaskGroupCoordinator.java | 60 ++++++++++++++---
6 files changed, 222 insertions(+), 8 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
index cada1c7092..8b8241a2ad 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
@@ -122,4 +122,18 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
@Param("status") int status,
@Param("inQueue") int inQueue,
@Param("forceStart") int forceStart);
+
+ int countUsingTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId,
+ @Param("status") int status,
+ @Param("inQueue") int inQueue,
+ @Param("forceStart") int forceStart);
+
+ List<TaskGroupQueue> queryInQueueTaskGroupQueue(@Param("inQueue") int inQueue,
+ @Param("minTaskGroupQueueId") int minTaskGroupQueueId,
+ @Param("limit") int limit);
+
+ List<TaskGroupQueue> queryWaitNotifyForceStartTaskGroupQueue(@Param("inQueue") int inQueue,
+ @Param("forceStart") int forceStart,
+ @Param("minTaskGroupQueueId") int minTaskGroupQueueId,
+ @Param("limit") int limit);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java
index a788b29bb1..468b7b758c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java
@@ -38,6 +38,17 @@ public interface TaskGroupQueueDao extends IDao<TaskGroupQueue> {
*/
List<TaskGroupQueue> queryAllInQueueTaskGroupQueue();
+ /**
+ * Query all {@link TaskGroupQueue} which
+ * in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES}
+ * and id > minTaskGroupQueueId
+ * ordered by id asc
+ * limit #{limit}
+ *
+ * @return TaskGroupQueue ordered by id asc
+ */
+ List<TaskGroupQueue> queryInQueueTaskGroupQueue(int minTaskGroupQueueId, int limit);
+
/**
* Query all {@link TaskGroupQueue} which in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES} and taskGroupId is taskGroupId
*
@@ -61,4 +72,24 @@ public interface TaskGroupQueueDao extends IDao<TaskGroupQueue> {
* @return TaskGroupQueue
*/
List<TaskGroupQueue> queryAcquiredTaskGroupQueueByGroupId(Integer taskGroupId);
+
+ /**
+ * Count all {@link TaskGroupQueue} which status is TaskGroupQueueStatus.ACQUIRE_SUCCESS and forceStart is {@link org.apache.dolphinscheduler.common.enums.Flag#NO}.
+ *
+ * @param taskGroupId taskGroupId
+ * @return TaskGroupQueue
+ */
+ int countUsingTaskGroupQueueByGroupId(Integer taskGroupId);
+
+ /**
+ * Query all {@link TaskGroupQueue} which
+ * in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES}
+ * and forceStart is {@link org.apache.dolphinscheduler.common.enums.Flag#YES}
+ * and id > minTaskGroupQueueId
+ * order by id asc
+ * limit #{limit}
+ *
+ * @return TaskGroupQueue ordered by priority desc
+ */
+ List<TaskGroupQueue> queryWaitNotifyForceStartTaskGroupQueue(int minTaskGroupQueueId, int limit);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java
index a1808a9091..5fd50deaae 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java
@@ -52,6 +52,11 @@ public class TaskGroupQueueDaoImpl extends BaseDao<TaskGroupQueue, TaskGroupQueu
return mybatisMapper.queryAllTaskGroupQueueByInQueue(Flag.YES.getCode());
}
+ @Override
+ public List<TaskGroupQueue> queryInQueueTaskGroupQueue(int minTaskGroupQueueId, int limit) {
+ return mybatisMapper.queryInQueueTaskGroupQueue(Flag.YES.getCode(), minTaskGroupQueueId, limit);
+ }
+
@Override
public List<TaskGroupQueue> queryAllInQueueTaskGroupQueueByGroupId(Integer taskGroupId) {
return mybatisMapper.queryAllInQueueTaskGroupQueueByGroupId(taskGroupId, Flag.YES.getCode());
@@ -70,4 +75,21 @@ public class TaskGroupQueueDaoImpl extends BaseDao<TaskGroupQueue, TaskGroupQueu
Flag.YES.getCode(),
Flag.NO.getCode());
}
+
+ @Override
+ public int countUsingTaskGroupQueueByGroupId(Integer taskGroupId) {
+ return mybatisMapper.countUsingTaskGroupQueueByGroupId(taskGroupId,
+ TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode(),
+ Flag.YES.ordinal(),
+ Flag.NO.getCode());
+ }
+
+ @Override
+ public List<TaskGroupQueue> queryWaitNotifyForceStartTaskGroupQueue(int minTaskGroupQueueId, int limit) {
+ return mybatisMapper.queryWaitNotifyForceStartTaskGroupQueue(
+ Flag.YES.getCode(),
+ Flag.YES.getCode(),
+ minTaskGroupQueueId,
+ limit);
+ }
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
index 790ad7bfae..800e82e847 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
@@ -219,6 +219,16 @@
where in_queue = #{inQueue} order by priority desc
</select>
+ <select id="queryInQueueTaskGroupQueue" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
+ select
+ <include refid="baseSql"/>
+ from t_ds_task_group_queue
+ where in_queue = #{inQueue}
+ and id > #{minTaskGroupQueueId}
+ order by id asc
+ limit #{limit}
+ </select>
+
<select id="queryByTaskInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
<include refid="baseSql" />
@@ -233,4 +243,21 @@
where group_id = #{taskGroupId} and status = #{status} and force_start = #{forceStart} and in_queue = #{inQueue}
</select>
+ <select id="countUsingTaskGroupQueueByGroupId" resultType="java.lang.Integer">
+ select count(1)
+ from t_ds_task_group_queue
+ where group_id = #{taskGroupId} and status = #{status} and force_start = #{forceStart} and in_queue = #{inQueue}
+ </select>
+
+ <select id="queryWaitNotifyForceStartTaskGroupQueue" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
+ select
+ <include refid="baseSql"/>
+ from t_ds_task_group_queue
+ where in_queue = #{inQueue}
+ and force_start = #{forceStart}
+ and id > #{minTaskGroupQueueId}
+ order by id asc
+ limit #{limit}
+ </select>
+
</mapper>
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java
index 17c1537184..13dcf91f55 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java
@@ -27,7 +27,11 @@ import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+
import java.util.Date;
+import java.util.List;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
@@ -55,6 +59,35 @@ class TaskGroupQueueDaoImplTest extends BaseDaoTest {
assertEquals(1, taskGroupQueueDao.queryAllInQueueTaskGroupQueue().size());
}
+ @Test
+ void queryInQueueTaskGroupQueue_withMinId() {
+ // Insert 1w ~ 10w records
+ int insertCount = RandomUtils.nextInt(10000, 100000);
+ List<TaskGroupQueue> insertTaskGroupQueue = Lists.newArrayList();
+ for (int i = 0; i < insertCount; i++) {
+ TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+ insertTaskGroupQueue.add(taskGroupQueue);
+ }
+ taskGroupQueueDao.insertBatch(insertTaskGroupQueue);
+
+ int minTaskGroupQueueId = -1;
+ int limit = 1000;
+ int queryCount = 0;
+ while (true) {
+ List<TaskGroupQueue> taskGroupQueues =
+ taskGroupQueueDao.queryInQueueTaskGroupQueue(minTaskGroupQueueId, limit);
+ if (CollectionUtils.isEmpty(taskGroupQueues)) {
+ break;
+ }
+ queryCount += taskGroupQueues.size();
+ if (taskGroupQueues.size() < limit) {
+ break;
+ }
+ minTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId();
+ }
+ assertEquals(insertCount, queryCount);
+ }
+
@Test
void queryAllInQueueTaskGroupQueueByGroupId() {
TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
@@ -91,6 +124,49 @@ class TaskGroupQueueDaoImplTest extends BaseDaoTest {
assertEquals(1, taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(1).size());
}
+ @Test
+ void countUsingTaskGroupQueueByGroupId() {
+ assertEquals(0, taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(1));
+
+ TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+ taskGroupQueueDao.insert(taskGroupQueue);
+ assertEquals(1, taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(1));
+
+ taskGroupQueue = createTaskGroupQueue(Flag.YES, TaskGroupQueueStatus.WAIT_QUEUE);
+ taskGroupQueueDao.insert(taskGroupQueue);
+ assertEquals(1, taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(1));
+ }
+
+ @Test
+ void queryWaitNotifyForceStartTaskGroupQueue() {
+ // Insert 1w records
+ int insertCount = RandomUtils.nextInt(10000, 20000);
+ List<TaskGroupQueue> insertTaskGroupQueue = Lists.newArrayList();
+ for (int i = 0; i < insertCount; i++) {
+ TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.YES, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+
+ insertTaskGroupQueue.add(taskGroupQueue);
+ }
+ taskGroupQueueDao.insertBatch(insertTaskGroupQueue);
+
+ int beginTaskGroupQueueId = -1;
+ int limit = 1000;
+ int queryCount = 0;
+ while (true) {
+ List<TaskGroupQueue> taskGroupQueues =
+ taskGroupQueueDao.queryWaitNotifyForceStartTaskGroupQueue(beginTaskGroupQueueId, limit);
+ if (CollectionUtils.isEmpty(taskGroupQueues)) {
+ break;
+ }
+ queryCount += taskGroupQueues.size();
+ if (taskGroupQueues.size() < limit) {
+ break;
+ }
+ beginTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId();
+ }
+ assertEquals(insertCount, queryCount);
+ }
+
private TaskGroupQueue createTaskGroupQueue(Flag forceStart, TaskGroupQueueStatus taskGroupQueueStatus) {
return TaskGroupQueue.builder()
.taskId(1)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
index fae0d2b91f..bd7af94611 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
@@ -96,6 +96,8 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
@Autowired
private ProcessInstanceDao processInstanceDao;
+ private static int DEFAULT_LIMIT = 1000;
+
public TaskGroupCoordinator() {
super("TaskGroupCoordinator");
}
@@ -147,10 +149,10 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
if (CollectionUtils.isEmpty(taskGroups)) {
return;
}
+ StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted();
+
for (TaskGroup taskGroup : taskGroups) {
- List<TaskGroupQueue> taskGroupQueues =
- taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(taskGroup.getId());
- int actualUseSize = taskGroupQueues.size();
+ int actualUseSize = taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(taskGroup.getId());
if (taskGroup.getUseSize() == actualUseSize) {
continue;
}
@@ -160,13 +162,35 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
taskGroup.setUseSize(actualUseSize);
taskGroupDao.updateById(taskGroup);
}
+ log.info("Success amend TaskGroup useSize cost: {}/ms", taskGroupCoordinatorRoundTimeCost.getTime());
}
/**
* Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished.
*/
private void amendTaskGroupQueueStatus() {
- List<TaskGroupQueue> taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue();
+ int minTaskGroupQueueId = -1;
+ int limit = DEFAULT_LIMIT;
+ StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted();
+ while (true) {
+ List<TaskGroupQueue> taskGroupQueues =
+ taskGroupQueueDao.queryInQueueTaskGroupQueue(minTaskGroupQueueId, limit);
+ if (CollectionUtils.isEmpty(taskGroupQueues)) {
+ break;
+ }
+ amendTaskGroupQueueStatus(taskGroupQueues);
+ if (taskGroupQueues.size() < limit) {
+ break;
+ }
+ minTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId();
+ }
+ log.info("Success amend TaskGroupQueue status cost: {}/ms", taskGroupCoordinatorRoundTimeCost.getTime());
+ }
+
+ /**
+ * Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished.
+ */
+ private void amendTaskGroupQueueStatus(List<TaskGroupQueue> taskGroupQueues) {
List<Integer> taskInstanceIds = taskGroupQueues.stream()
.map(TaskGroupQueue::getTaskId)
.collect(Collectors.toList());
@@ -198,10 +222,30 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
// Find the force start task group queue(Which is inQueue and forceStart is YES)
// Notify the related waiting task instance
// Set the taskGroupQueue status to RELEASE and remove it from queue
- List<TaskGroupQueue> taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue()
- .stream()
- .filter(taskGroupQueue -> Flag.YES.getCode() == taskGroupQueue.getForceStart())
- .collect(Collectors.toList());
+ // We use limit here to avoid OOM, and we will retry to notify force start queue at next time
+ int minTaskGroupQueueId = -1;
+ int limit = DEFAULT_LIMIT;
+ StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted();
+ while (true) {
+ List<TaskGroupQueue> taskGroupQueues =
+ taskGroupQueueDao.queryWaitNotifyForceStartTaskGroupQueue(minTaskGroupQueueId, limit);
+ if (CollectionUtils.isEmpty(taskGroupQueues)) {
+ break;
+ }
+ dealWithForceStartTaskGroupQueue(taskGroupQueues);
+ if (taskGroupQueues.size() < limit) {
+ break;
+ }
+ minTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId();
+ }
+ log.info("Success deal with force start TaskGroupQueue cost: {}/ms",
+ taskGroupCoordinatorRoundTimeCost.getTime());
+ }
+
+ private void dealWithForceStartTaskGroupQueue(List<TaskGroupQueue> taskGroupQueues) {
+ // Find the force start task group queue(Which is inQueue and forceStart is YES)
+ // Notify the related waiting task instance
+ // Set the taskGroupQueue status to RELEASE and remove it from queue
for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
try {
LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId());