You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/07/29 11:45:08 UTC

[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #6342: [Bug][RoutineLoad] Avoid TOO_MANY_TASKS error

EmmyMiao87 commented on a change in pull request #6342:
URL: https://github.com/apache/incubator-doris/pull/6342#discussion_r679074901



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
##########
@@ -344,30 +345,54 @@ public long getMinTaskBeId(String clusterName) throws LoadException {
     // check if the specified BE is available for running task
     // return true if it is available. return false if otherwise.
     // throw exception if unrecoverable errors happen.
-    public boolean checkBeToTask(long beId, String clusterName) throws LoadException {
+    public long getAvailableBeForTask(long previoudBeId, String clusterName) throws LoadException {
         List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
         if (beIdsInCluster == null) {
             throw new LoadException("The " + clusterName + " has been deleted");
         }
 
-        if (!beIdsInCluster.contains(beId)) {
-            return false;
+        if (previoudBeId != -1L && !beIdsInCluster.contains(previoudBeId)) {
+            return -1L;
         }
 
         // check if be has idle slot
         readLock();
         try {
-            int idleTaskNum = 0;
             Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
-            if (beIdToConcurrentTasks.containsKey(beId)) {
-                idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
-            } else {
-                idleTaskNum = Config.max_routine_load_task_num_per_be;
+            // 1. Find if the given BE id has available slots
+            if (previoudBeId != -1L) {
+                int idleTaskNum = 0;
+                if (beIdToConcurrentTasks.containsKey(previoudBeId)) {
+                    idleTaskNum = beIdToMaxConcurrentTasks.get(previoudBeId) - beIdToConcurrentTasks.get(previoudBeId);
+                } else {
+                    idleTaskNum = Config.max_routine_load_task_num_per_be;
+                }
+                if (idleTaskNum > 0) {
+                    return previoudBeId;
+                }
             }
-            if (idleTaskNum > 0) {
-                return true;
+
+            // 2. The given BE id does not have available slots, find a BE with min tasks
+            updateBeIdToMaxConcurrentTasks();
+            int idleTaskNum = 0;
+            long resultBeId = -1L;
+            int maxIdleSlotNum = 0;
+            for (Long beId : beIdsInCluster) {

Review comment:
       This logic is consistent with manager's getMinTaskBeId function logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org