You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/01 15:29:17 UTC

[incubator-doris] 22/22: [fix](routine-load) fix bug that routine load task can not find backend (#9902)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 07a119abfbac0ce29981f048cd8d9df326f1c4e7
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Jun 1 17:55:30 2022 +0800

    [fix](routine-load) fix bug that routine load task can not find backend (#9902)
    
    Introduced from #9492.
---
 .../doris/load/routineload/RoutineLoadManager.java  |  5 +++--
 .../org/apache/doris/system/SystemInfoService.java  | 21 ++++++++++++++++-----
 .../apache/doris/system/SystemInfoServiceTest.java  | 13 +++++++++----
 3 files changed, 28 insertions(+), 11 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index ed220f88b8..9dac66b996 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -496,13 +496,14 @@ public class RoutineLoadManager implements Writable {
         } else {
             tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
             if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
-                // user may be dropped. Here we fall back to use replica tag
+                // user may be dropped, or may not set resource tag property.
+                // Here we fall back to use replica tag
                 tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
             }
         }
         BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().setCluster(cluster)
                 .addTags(tags).build();
-        return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 20000);
+        return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1 /* as many as possible */);
     }
 
     private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 82571cd3d8..ef31b53e19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -776,14 +776,16 @@ public class SystemInfoService {
      * Select a set of backends by the given policy.
      *
      * @param policy
-     * @param number number of backends which need to be selected.
+     * @param number number of backends which need to be selected. -1 means return as many as possible.
      * @return return #number of backend ids,
      * or empty set if no backends match the policy, or the number of matched backends is less than "number";
      */
     public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
+        Preconditions.checkArgument(number >= -1);
         List<Backend> candidates =
                 idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList());
-        if (candidates.size() < number) {
+        if ((number != -1 && candidates.size() < number) || candidates.isEmpty()) {
+            LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
             return Lists.newArrayList();
         }
         // If only need one Backend, just return a random one.
@@ -794,7 +796,11 @@ public class SystemInfoService {
 
         if (policy.allowOnSameHost) {
             Collections.shuffle(candidates);
-            return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+            if (number == -1) {
+                return candidates.stream().map(b -> b.getId()).collect(Collectors.toList());
+            } else {
+                return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+            }
         }
 
         // for each host, random select one backend.
@@ -813,11 +819,16 @@ public class SystemInfoService {
             Collections.shuffle(list);
             candidates.add(list.get(0));
         }
-        if (candidates.size() < number) {
+        if (number != -1 && candidates.size() < number) {
+            LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
             return Lists.newArrayList();
         }
         Collections.shuffle(candidates);
-        return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+        if (number != -1) {
+            return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+        } else {
+            return candidates.stream().map(b -> b.getId()).collect(Collectors.toList());
+        }
     }
 
     public ImmutableMap<Long, Backend> getIdToBackend() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index b2570095a0..9134c66c44 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -185,15 +185,20 @@ public class SystemInfoServiceTest {
         BeSelectionPolicy policy10 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga, tagb))
                 .setStorageMedium(TStorageMedium.SSD).build();
         Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, 4).size());
+        Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy10, 3).size());
+        // check return as many as possible
+        Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, -1).size());
         Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy10, 5).size());
 
-        BeSelectionPolicy policy11 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb))
-                .setStorageMedium(TStorageMedium.HDD).build();
+        BeSelectionPolicy policy11 =
+                new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb)).setStorageMedium(TStorageMedium.HDD)
+                        .build();
         Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy11, 1).size());
 
         // 7. check disk usage
-        BeSelectionPolicy policy12 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
-                .setStorageMedium(TStorageMedium.HDD).build();
+        BeSelectionPolicy policy12 =
+                new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)).setStorageMedium(TStorageMedium.HDD)
+                        .build();
         Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy12, 1).size());
         BeSelectionPolicy policy13 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
                 .setStorageMedium(TStorageMedium.HDD).needCheckDiskUsage().build();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org