You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2024/03/30 01:43:47 UTC

(dolphinscheduler) branch dev updated: Fix TaskGroupCoordinator might cause OOM when there is a lot of waiting TaskGroupQueue (#15773)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new dc4dad135c Fix TaskGroupCoordinator might cause OOM when there is a lot of waiting TaskGroupQueue (#15773)
dc4dad135c is described below

commit dc4dad135c9ccacd376dfa139f174eb0dd086b24
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Mar 30 09:43:42 2024 +0800

    Fix TaskGroupCoordinator might cause OOM when there is a lot of waiting TaskGroupQueue (#15773)
---
 .../dao/mapper/TaskGroupQueueMapper.java           | 14 ++++
 .../dao/repository/TaskGroupQueueDao.java          | 31 +++++++++
 .../dao/repository/impl/TaskGroupQueueDaoImpl.java | 22 +++++++
 .../dao/mapper/TaskGroupQueueMapper.xml            | 27 ++++++++
 .../repository/impl/TaskGroupQueueDaoImplTest.java | 76 ++++++++++++++++++++++
 .../runner/taskgroup/TaskGroupCoordinator.java     | 60 ++++++++++++++---
 6 files changed, 222 insertions(+), 8 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
index cada1c7092..8b8241a2ad 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
@@ -122,4 +122,18 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
                                                            @Param("status") int status,
                                                            @Param("inQueue") int inQueue,
                                                            @Param("forceStart") int forceStart);
+
+    int countUsingTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId,
+                                          @Param("status") int status,
+                                          @Param("inQueue") int inQueue,
+                                          @Param("forceStart") int forceStart);
+
+    List<TaskGroupQueue> queryInQueueTaskGroupQueue(@Param("inQueue") int inQueue,
+                                                    @Param("minTaskGroupQueueId") int minTaskGroupQueueId,
+                                                    @Param("limit") int limit);
+
+    List<TaskGroupQueue> queryWaitNotifyForceStartTaskGroupQueue(@Param("inQueue") int inQueue,
+                                                                 @Param("forceStart") int forceStart,
+                                                                 @Param("minTaskGroupQueueId") int minTaskGroupQueueId,
+                                                                 @Param("limit") int limit);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java
index a788b29bb1..468b7b758c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java
@@ -38,6 +38,17 @@ public interface TaskGroupQueueDao extends IDao<TaskGroupQueue> {
      */
     List<TaskGroupQueue> queryAllInQueueTaskGroupQueue();
 
+    /**
+     * Query all {@link TaskGroupQueue} which
+     * in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES}
+     * and id > minTaskGroupQueueId
+     * ordered by id asc
+     * limit #{limit}
+     *
+     * @return TaskGroupQueue ordered by id asc
+     */
+    List<TaskGroupQueue> queryInQueueTaskGroupQueue(int minTaskGroupQueueId, int limit);
+
     /**
      * Query all {@link TaskGroupQueue} which in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES} and taskGroupId is taskGroupId
      *
@@ -61,4 +72,24 @@ public interface TaskGroupQueueDao extends IDao<TaskGroupQueue> {
      * @return TaskGroupQueue
      */
     List<TaskGroupQueue> queryAcquiredTaskGroupQueueByGroupId(Integer taskGroupId);
+
+    /**
+     * Count all {@link TaskGroupQueue} which status is TaskGroupQueueStatus.ACQUIRE_SUCCESS and forceStart is {@link org.apache.dolphinscheduler.common.enums.Flag#NO}.
+     *
+     * @param taskGroupId taskGroupId
+     * @return TaskGroupQueue
+     */
+    int countUsingTaskGroupQueueByGroupId(Integer taskGroupId);
+
+    /**
+     * Query all {@link TaskGroupQueue} which
+     * in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES}
+     * and forceStart is {@link org.apache.dolphinscheduler.common.enums.Flag#YES}
+     * and id > minTaskGroupQueueId
+     * order by id asc
+     * limit #{limit}
+     *
+     * @return TaskGroupQueue ordered by priority desc
+     */
+    List<TaskGroupQueue> queryWaitNotifyForceStartTaskGroupQueue(int minTaskGroupQueueId, int limit);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java
index a1808a9091..5fd50deaae 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java
@@ -52,6 +52,11 @@ public class TaskGroupQueueDaoImpl extends BaseDao<TaskGroupQueue, TaskGroupQueu
         return mybatisMapper.queryAllTaskGroupQueueByInQueue(Flag.YES.getCode());
     }
 
+    @Override
+    public List<TaskGroupQueue> queryInQueueTaskGroupQueue(int minTaskGroupQueueId, int limit) {
+        return mybatisMapper.queryInQueueTaskGroupQueue(Flag.YES.getCode(), minTaskGroupQueueId, limit);
+    }
+
     @Override
     public List<TaskGroupQueue> queryAllInQueueTaskGroupQueueByGroupId(Integer taskGroupId) {
         return mybatisMapper.queryAllInQueueTaskGroupQueueByGroupId(taskGroupId, Flag.YES.getCode());
@@ -70,4 +75,21 @@ public class TaskGroupQueueDaoImpl extends BaseDao<TaskGroupQueue, TaskGroupQueu
                 Flag.YES.getCode(),
                 Flag.NO.getCode());
     }
+
+    @Override
+    public int countUsingTaskGroupQueueByGroupId(Integer taskGroupId) {
+        return mybatisMapper.countUsingTaskGroupQueueByGroupId(taskGroupId,
+                TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode(),
+                Flag.YES.ordinal(),
+                Flag.NO.getCode());
+    }
+
+    @Override
+    public List<TaskGroupQueue> queryWaitNotifyForceStartTaskGroupQueue(int minTaskGroupQueueId, int limit) {
+        return mybatisMapper.queryWaitNotifyForceStartTaskGroupQueue(
+                Flag.YES.getCode(),
+                Flag.YES.getCode(),
+                minTaskGroupQueueId,
+                limit);
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
index 790ad7bfae..800e82e847 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
@@ -219,6 +219,16 @@
         where in_queue = #{inQueue} order by priority desc
     </select>
 
+    <select id="queryInQueueTaskGroupQueue" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
+        select
+        <include refid="baseSql"/>
+        from t_ds_task_group_queue
+        where in_queue = #{inQueue}
+        and id &gt; #{minTaskGroupQueueId}
+        order by id asc
+        limit #{limit}
+    </select>
+
     <select id="queryByTaskInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
         select
         <include refid="baseSql" />
@@ -233,4 +243,21 @@
         where group_id = #{taskGroupId} and status = #{status} and force_start = #{forceStart} and in_queue = #{inQueue}
     </select>
 
+    <select id="countUsingTaskGroupQueueByGroupId" resultType="java.lang.Integer">
+        select count(1)
+        from t_ds_task_group_queue
+        where group_id = #{taskGroupId} and status = #{status} and force_start = #{forceStart} and in_queue = #{inQueue}
+    </select>
+
+    <select id="queryWaitNotifyForceStartTaskGroupQueue" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
+        select
+        <include refid="baseSql"/>
+        from t_ds_task_group_queue
+        where in_queue = #{inQueue}
+        and force_start = #{forceStart}
+        and id &gt; #{minTaskGroupQueueId}
+        order by id asc
+        limit #{limit}
+    </select>
+
 </mapper>
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java
index 17c1537184..13dcf91f55 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java
@@ -27,7 +27,11 @@ import org.apache.dolphinscheduler.dao.BaseDaoTest;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao;
 
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+
 import java.util.Date;
+import java.util.List;
 
 import org.assertj.core.util.Lists;
 import org.junit.jupiter.api.Test;
@@ -55,6 +59,35 @@ class TaskGroupQueueDaoImplTest extends BaseDaoTest {
         assertEquals(1, taskGroupQueueDao.queryAllInQueueTaskGroupQueue().size());
     }
 
+    @Test
+    void queryInQueueTaskGroupQueue_withMinId() {
+        // Insert 1w ~ 10w records
+        int insertCount = RandomUtils.nextInt(10000, 100000);
+        List<TaskGroupQueue> insertTaskGroupQueue = Lists.newArrayList();
+        for (int i = 0; i < insertCount; i++) {
+            TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+            insertTaskGroupQueue.add(taskGroupQueue);
+        }
+        taskGroupQueueDao.insertBatch(insertTaskGroupQueue);
+
+        int minTaskGroupQueueId = -1;
+        int limit = 1000;
+        int queryCount = 0;
+        while (true) {
+            List<TaskGroupQueue> taskGroupQueues =
+                    taskGroupQueueDao.queryInQueueTaskGroupQueue(minTaskGroupQueueId, limit);
+            if (CollectionUtils.isEmpty(taskGroupQueues)) {
+                break;
+            }
+            queryCount += taskGroupQueues.size();
+            if (taskGroupQueues.size() < limit) {
+                break;
+            }
+            minTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId();
+        }
+        assertEquals(insertCount, queryCount);
+    }
+
     @Test
     void queryAllInQueueTaskGroupQueueByGroupId() {
         TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
@@ -91,6 +124,49 @@ class TaskGroupQueueDaoImplTest extends BaseDaoTest {
         assertEquals(1, taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(1).size());
     }
 
+    @Test
+    void countUsingTaskGroupQueueByGroupId() {
+        assertEquals(0, taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(1));
+
+        TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+        taskGroupQueueDao.insert(taskGroupQueue);
+        assertEquals(1, taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(1));
+
+        taskGroupQueue = createTaskGroupQueue(Flag.YES, TaskGroupQueueStatus.WAIT_QUEUE);
+        taskGroupQueueDao.insert(taskGroupQueue);
+        assertEquals(1, taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(1));
+    }
+
+    @Test
+    void queryWaitNotifyForceStartTaskGroupQueue() {
+        // Insert 1w records
+        int insertCount = RandomUtils.nextInt(10000, 20000);
+        List<TaskGroupQueue> insertTaskGroupQueue = Lists.newArrayList();
+        for (int i = 0; i < insertCount; i++) {
+            TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.YES, TaskGroupQueueStatus.ACQUIRE_SUCCESS);
+
+            insertTaskGroupQueue.add(taskGroupQueue);
+        }
+        taskGroupQueueDao.insertBatch(insertTaskGroupQueue);
+
+        int beginTaskGroupQueueId = -1;
+        int limit = 1000;
+        int queryCount = 0;
+        while (true) {
+            List<TaskGroupQueue> taskGroupQueues =
+                    taskGroupQueueDao.queryWaitNotifyForceStartTaskGroupQueue(beginTaskGroupQueueId, limit);
+            if (CollectionUtils.isEmpty(taskGroupQueues)) {
+                break;
+            }
+            queryCount += taskGroupQueues.size();
+            if (taskGroupQueues.size() < limit) {
+                break;
+            }
+            beginTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId();
+        }
+        assertEquals(insertCount, queryCount);
+    }
+
     private TaskGroupQueue createTaskGroupQueue(Flag forceStart, TaskGroupQueueStatus taskGroupQueueStatus) {
         return TaskGroupQueue.builder()
                 .taskId(1)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
index fae0d2b91f..bd7af94611 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
@@ -96,6 +96,8 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
     @Autowired
     private ProcessInstanceDao processInstanceDao;
 
+    private static int DEFAULT_LIMIT = 1000;
+
     public TaskGroupCoordinator() {
         super("TaskGroupCoordinator");
     }
@@ -147,10 +149,10 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
         if (CollectionUtils.isEmpty(taskGroups)) {
             return;
         }
+        StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted();
+
         for (TaskGroup taskGroup : taskGroups) {
-            List<TaskGroupQueue> taskGroupQueues =
-                    taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(taskGroup.getId());
-            int actualUseSize = taskGroupQueues.size();
+            int actualUseSize = taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(taskGroup.getId());
             if (taskGroup.getUseSize() == actualUseSize) {
                 continue;
             }
@@ -160,13 +162,35 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
             taskGroup.setUseSize(actualUseSize);
             taskGroupDao.updateById(taskGroup);
         }
+        log.info("Success amend TaskGroup useSize cost: {}/ms", taskGroupCoordinatorRoundTimeCost.getTime());
     }
 
     /**
      * Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished.
      */
     private void amendTaskGroupQueueStatus() {
-        List<TaskGroupQueue> taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue();
+        int minTaskGroupQueueId = -1;
+        int limit = DEFAULT_LIMIT;
+        StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted();
+        while (true) {
+            List<TaskGroupQueue> taskGroupQueues =
+                    taskGroupQueueDao.queryInQueueTaskGroupQueue(minTaskGroupQueueId, limit);
+            if (CollectionUtils.isEmpty(taskGroupQueues)) {
+                break;
+            }
+            amendTaskGroupQueueStatus(taskGroupQueues);
+            if (taskGroupQueues.size() < limit) {
+                break;
+            }
+            minTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId();
+        }
+        log.info("Success amend TaskGroupQueue status cost: {}/ms", taskGroupCoordinatorRoundTimeCost.getTime());
+    }
+
+    /**
+     * Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished.
+     */
+    private void amendTaskGroupQueueStatus(List<TaskGroupQueue> taskGroupQueues) {
         List<Integer> taskInstanceIds = taskGroupQueues.stream()
                 .map(TaskGroupQueue::getTaskId)
                 .collect(Collectors.toList());
@@ -198,10 +222,30 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
         // Find the force start task group queue(Which is inQueue and forceStart is YES)
         // Notify the related waiting task instance
         // Set the taskGroupQueue status to RELEASE and remove it from queue
-        List<TaskGroupQueue> taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue()
-                .stream()
-                .filter(taskGroupQueue -> Flag.YES.getCode() == taskGroupQueue.getForceStart())
-                .collect(Collectors.toList());
+        // We use limit here to avoid OOM, and we will retry to notify force start queue at next time
+        int minTaskGroupQueueId = -1;
+        int limit = DEFAULT_LIMIT;
+        StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted();
+        while (true) {
+            List<TaskGroupQueue> taskGroupQueues =
+                    taskGroupQueueDao.queryWaitNotifyForceStartTaskGroupQueue(minTaskGroupQueueId, limit);
+            if (CollectionUtils.isEmpty(taskGroupQueues)) {
+                break;
+            }
+            dealWithForceStartTaskGroupQueue(taskGroupQueues);
+            if (taskGroupQueues.size() < limit) {
+                break;
+            }
+            minTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId();
+        }
+        log.info("Success deal with force start TaskGroupQueue cost: {}/ms",
+                taskGroupCoordinatorRoundTimeCost.getTime());
+    }
+
+    private void dealWithForceStartTaskGroupQueue(List<TaskGroupQueue> taskGroupQueues) {
+        // Find the force start task group queue(Which is inQueue and forceStart is YES)
+        // Notify the related waiting task instance
+        // Set the taskGroupQueue status to RELEASE and remove it from queue
         for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
             try {
                 LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId());