You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/01 15:29:17 UTC
[incubator-doris] 22/22: [fix](routine-load) fix bug that routine load task can not find backend (#9902)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 07a119abfbac0ce29981f048cd8d9df326f1c4e7
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Jun 1 17:55:30 2022 +0800
[fix](routine-load) fix bug that routine load task can not find backend (#9902)
Introduced from #9492.
---
.../doris/load/routineload/RoutineLoadManager.java | 5 +++--
.../org/apache/doris/system/SystemInfoService.java | 21 ++++++++++++++++-----
.../apache/doris/system/SystemInfoServiceTest.java | 13 +++++++++----
3 files changed, 28 insertions(+), 11 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index ed220f88b8..9dac66b996 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -496,13 +496,14 @@ public class RoutineLoadManager implements Writable {
} else {
tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
- // user may be dropped. Here we fall back to use replica tag
+ // user may be dropped, or may not set resource tag property.
+ // Here we fall back to use replica tag
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
}
}
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().setCluster(cluster)
.addTags(tags).build();
- return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 20000);
+ return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1 /* as many as possible */);
}
private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 82571cd3d8..ef31b53e19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -776,14 +776,16 @@ public class SystemInfoService {
* Select a set of backends by the given policy.
*
* @param policy
- * @param number number of backends which need to be selected.
+ * @param number number of backends which need to be selected. -1 means return as many as possible.
* @return return #number of backend ids,
* or empty set if no backends match the policy, or the number of matched backends is less than "number";
*/
public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
+ Preconditions.checkArgument(number >= -1);
List<Backend> candidates =
idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList());
- if (candidates.size() < number) {
+ if ((number != -1 && candidates.size() < number) || candidates.isEmpty()) {
+ LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
return Lists.newArrayList();
}
// If only need one Backend, just return a random one.
@@ -794,7 +796,11 @@ public class SystemInfoService {
if (policy.allowOnSameHost) {
Collections.shuffle(candidates);
- return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+ if (number == -1) {
+ return candidates.stream().map(b -> b.getId()).collect(Collectors.toList());
+ } else {
+ return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+ }
}
// for each host, random select one backend.
@@ -813,11 +819,16 @@ public class SystemInfoService {
Collections.shuffle(list);
candidates.add(list.get(0));
}
- if (candidates.size() < number) {
+ if (number != -1 && candidates.size() < number) {
+ LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
return Lists.newArrayList();
}
Collections.shuffle(candidates);
- return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+ if (number != -1) {
+ return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+ } else {
+ return candidates.stream().map(b -> b.getId()).collect(Collectors.toList());
+ }
}
public ImmutableMap<Long, Backend> getIdToBackend() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index b2570095a0..9134c66c44 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -185,15 +185,20 @@ public class SystemInfoServiceTest {
BeSelectionPolicy policy10 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga, tagb))
.setStorageMedium(TStorageMedium.SSD).build();
Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, 4).size());
+ Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy10, 3).size());
+ // check return as many as possible
+ Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, -1).size());
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy10, 5).size());
- BeSelectionPolicy policy11 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb))
- .setStorageMedium(TStorageMedium.HDD).build();
+ BeSelectionPolicy policy11 =
+ new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb)).setStorageMedium(TStorageMedium.HDD)
+ .build();
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy11, 1).size());
// 7. check disk usage
- BeSelectionPolicy policy12 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
- .setStorageMedium(TStorageMedium.HDD).build();
+ BeSelectionPolicy policy12 =
+ new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)).setStorageMedium(TStorageMedium.HDD)
+ .build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy12, 1).size());
BeSelectionPolicy policy13 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
.setStorageMedium(TStorageMedium.HDD).needCheckDiskUsage().build();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org