You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/11 06:03:44 UTC

[incubator-doris] 03/09: [fix](routine load) Routine load task doesn't reallocate when previous BE is down. (#8824)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit a44f304826ef1d0a1e134bb8734446133405016c
Author: Henry2SS <45...@users.noreply.github.com>
AuthorDate: Sat Apr 9 19:02:55 2022 +0800

    [fix](routine load) Routine load task doesn't reallocate when previous BE is down. (#8824)
    
    if previous be is not alive, should assigned another available BE instead.
---
 .../doris/load/routineload/RoutineLoadManager.java | 36 ++++++++++++----------
 1 file changed, 20 insertions(+), 16 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index ed6e2eaf81..2d6c13adab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -41,6 +41,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -410,36 +411,39 @@ public class RoutineLoadManager implements Writable {
     // check if the specified BE is available for running task
     // return true if it is available. return false if otherwise.
     // throw exception if unrecoverable errors happen.
-    public long getAvailableBeForTask(long previoudBeId, String clusterName) throws LoadException {
+    public long getAvailableBeForTask(long previousBeId, String clusterName) throws LoadException {
         List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
         if (beIdsInCluster == null) {
             throw new LoadException("The " + clusterName + " has been deleted");
         }
 
-        if (previoudBeId != -1L && !beIdsInCluster.contains(previoudBeId)) {
-            return -1L;
-        }
-
         // check if be has idle slot
         readLock();
         try {
             Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
+
             // 1. Find if the given BE id has available slots
-            if (previoudBeId != -1L) {
-                int idleTaskNum = 0;
-                if (!beIdToMaxConcurrentTasks.containsKey(previoudBeId)) {
-                    idleTaskNum = 0;
-                } else if (beIdToConcurrentTasks.containsKey(previoudBeId)) {
-                    idleTaskNum = beIdToMaxConcurrentTasks.get(previoudBeId) - beIdToConcurrentTasks.get(previoudBeId);
-                } else {
-                    idleTaskNum = Config.max_routine_load_task_num_per_be;
-                }
-                if (idleTaskNum > 0) {
-                    return previoudBeId;
+            if (previousBeId != -1L && beIdsInCluster.contains(previousBeId)) {
+                // get the previousBackend info
+                Backend previousBackend = Catalog.getCurrentSystemInfo().getBackend(previousBeId);
+                // check previousBackend is not null && load available
+                if (previousBackend != null && previousBackend.isLoadAvailable()) {
+                    int idleTaskNum = 0;
+                    if (!beIdToMaxConcurrentTasks.containsKey(previousBeId)) {
+                        idleTaskNum = 0;
+                    } else if (beIdToConcurrentTasks.containsKey(previousBeId)) {
+                        idleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId) - beIdToConcurrentTasks.get(previousBeId);
+                    } else {
+                        idleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId);
+                    }
+                    if (idleTaskNum > 0) {
+                        return previousBeId;
+                    }
                 }
             }
 
             // 2. The given BE id does not have available slots, find a BE with min tasks
+            // 3. The previos BE is not in cluster && is not load available, find a new BE with min tasks
             int idleTaskNum = 0;
             long resultBeId = -1L;
             int maxIdleSlotNum = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org