You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/11 06:03:44 UTC
[incubator-doris] 03/09: [fix](routine load) Routine load task doesn't reallocate when previous BE is down. (#8824)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit a44f304826ef1d0a1e134bb8734446133405016c
Author: Henry2SS <45...@users.noreply.github.com>
AuthorDate: Sat Apr 9 19:02:55 2022 +0800
[fix](routine load) Routine load task doesn't reallocate when previous BE is down. (#8824)
if previous be is not alive, should assigned another available BE instead.
---
.../doris/load/routineload/RoutineLoadManager.java | 36 ++++++++++++----------
1 file changed, 20 insertions(+), 16 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index ed6e2eaf81..2d6c13adab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -41,6 +41,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -410,36 +411,39 @@ public class RoutineLoadManager implements Writable {
// check if the specified BE is available for running task
// return true if it is available. return false if otherwise.
// throw exception if unrecoverable errors happen.
- public long getAvailableBeForTask(long previoudBeId, String clusterName) throws LoadException {
+ public long getAvailableBeForTask(long previousBeId, String clusterName) throws LoadException {
List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
if (beIdsInCluster == null) {
throw new LoadException("The " + clusterName + " has been deleted");
}
- if (previoudBeId != -1L && !beIdsInCluster.contains(previoudBeId)) {
- return -1L;
- }
-
// check if be has idle slot
readLock();
try {
Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
+
// 1. Find if the given BE id has available slots
- if (previoudBeId != -1L) {
- int idleTaskNum = 0;
- if (!beIdToMaxConcurrentTasks.containsKey(previoudBeId)) {
- idleTaskNum = 0;
- } else if (beIdToConcurrentTasks.containsKey(previoudBeId)) {
- idleTaskNum = beIdToMaxConcurrentTasks.get(previoudBeId) - beIdToConcurrentTasks.get(previoudBeId);
- } else {
- idleTaskNum = Config.max_routine_load_task_num_per_be;
- }
- if (idleTaskNum > 0) {
- return previoudBeId;
+ if (previousBeId != -1L && beIdsInCluster.contains(previousBeId)) {
+ // get the previousBackend info
+ Backend previousBackend = Catalog.getCurrentSystemInfo().getBackend(previousBeId);
+ // check previousBackend is not null && load available
+ if (previousBackend != null && previousBackend.isLoadAvailable()) {
+ int idleTaskNum = 0;
+ if (!beIdToMaxConcurrentTasks.containsKey(previousBeId)) {
+ idleTaskNum = 0;
+ } else if (beIdToConcurrentTasks.containsKey(previousBeId)) {
+ idleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId) - beIdToConcurrentTasks.get(previousBeId);
+ } else {
+ idleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId);
+ }
+ if (idleTaskNum > 0) {
+ return previousBeId;
+ }
}
}
// 2. The given BE id does not have available slots, find a BE with min tasks
+ // 3. The previos BE is not in cluster && is not load available, find a new BE with min tasks
int idleTaskNum = 0;
long resultBeId = -1L;
int maxIdleSlotNum = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org