You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/01 09:55:35 UTC
[incubator-doris] branch master updated: [fix](routine-load) fix bug that routine load task can not find backend (#9902)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8effdd95a7 [fix](routine-load) fix bug that routine load task can not find backend (#9902)
8effdd95a7 is described below
commit 8effdd95a7f50c4a43f58b0655c9fe7e68317667
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Jun 1 17:55:30 2022 +0800
[fix](routine-load) fix bug that routine load task can not find backend (#9902)
Introduced from #9492.
---
.../doris/load/routineload/RoutineLoadManager.java | 5 +++--
.../org/apache/doris/system/SystemInfoService.java | 21 ++++++++++++++++-----
.../apache/doris/system/SystemInfoServiceTest.java | 13 +++++++++----
3 files changed, 28 insertions(+), 11 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 0b3f45cfba..c21691aff1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -495,13 +495,14 @@ public class RoutineLoadManager implements Writable {
} else {
tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
- // user may be dropped. Here we fall back to use replica tag
+ // user may be dropped, or may not set resource tag property.
+ // Here we fall back to use replica tag
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
}
}
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().setCluster(cluster)
.addTags(tags).build();
- return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 20000);
+ return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1 /* as many as possible */);
}
private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index d2af2c2704..b4b7f8b140 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -775,14 +775,16 @@ public class SystemInfoService {
* Select a set of backends by the given policy.
*
* @param policy
- * @param number number of backends which need to be selected.
+ * @param number number of backends which need to be selected. -1 means return as many as possible.
* @return return #number of backend ids,
* or empty set if no backends match the policy, or the number of matched backends is less than "number";
*/
public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
+ Preconditions.checkArgument(number >= -1);
List<Backend> candidates =
idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList());
- if (candidates.size() < number) {
+ if ((number != -1 && candidates.size() < number) || candidates.isEmpty()) {
+ LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
return Lists.newArrayList();
}
// If only need one Backend, just return a random one.
@@ -793,7 +795,11 @@ public class SystemInfoService {
if (policy.allowOnSameHost) {
Collections.shuffle(candidates);
- return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+ if (number == -1) {
+ return candidates.stream().map(b -> b.getId()).collect(Collectors.toList());
+ } else {
+ return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+ }
}
// for each host, random select one backend.
@@ -812,11 +818,16 @@ public class SystemInfoService {
Collections.shuffle(list);
candidates.add(list.get(0));
}
- if (candidates.size() < number) {
+ if (number != -1 && candidates.size() < number) {
+ LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
return Lists.newArrayList();
}
Collections.shuffle(candidates);
- return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+ if (number != -1) {
+ return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
+ } else {
+ return candidates.stream().map(b -> b.getId()).collect(Collectors.toList());
+ }
}
public ImmutableMap<Long, Backend> getIdToBackend() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index 4d158604aa..2bd2a2ffc9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -184,15 +184,20 @@ public class SystemInfoServiceTest {
BeSelectionPolicy policy10 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga, tagb))
.setStorageMedium(TStorageMedium.SSD).build();
Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, 4).size());
+ Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy10, 3).size());
+ // check return as many as possible
+ Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, -1).size());
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy10, 5).size());
- BeSelectionPolicy policy11 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb))
- .setStorageMedium(TStorageMedium.HDD).build();
+ BeSelectionPolicy policy11 =
+ new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb)).setStorageMedium(TStorageMedium.HDD)
+ .build();
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy11, 1).size());
// 7. check disk usage
- BeSelectionPolicy policy12 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
- .setStorageMedium(TStorageMedium.HDD).build();
+ BeSelectionPolicy policy12 =
+ new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)).setStorageMedium(TStorageMedium.HDD)
+ .build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy12, 1).size());
BeSelectionPolicy policy13 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
.setStorageMedium(TStorageMedium.HDD).needCheckDiskUsage().build();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org