You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/11/28 01:59:35 UTC
[incubator-doris] branch master updated: [BUG] Fix colocate balance
bug when there is decommissioned be (#4955)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new f1248cb [BUG] Fix colocate balance bug when there is decommissioned be (#4955)
f1248cb is described below
commit f1248cb10eca8996a99ce0c2c934fbc5ba9bb511
Author: gengjun-git <54...@users.noreply.github.com>
AuthorDate: Sat Nov 28 09:59:25 2020 +0800
[BUG] Fix colocate balance bug when there is decommissioned be (#4955)
We should ignore decommissioned BE when select BEs to balance group bucketSeq.
---
.../apache/doris/clone/ColocateTableBalancer.java | 53 ++++++++++------
.../doris/clone/ColocateTableBalancerTest.java | 71 +++++++++++++++++++---
2 files changed, 99 insertions(+), 25 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java
index 389a2c6..79e57ba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java
@@ -151,10 +151,10 @@ public class ColocateTableBalancer extends MasterDaemon {
continue;
}
- Set<Long> unavailableBeIds = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId);
- List<Long> availableBeIds = getAvailableBeIdsInGroup(db.getClusterName(), infoService, unavailableBeIds);
+ Set<Long> unavailableBeIdsInGroup = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId);
+ List<Long> availableBeIds = getAvailableBeIds(db.getClusterName(), infoService);
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
- if (relocateAndBalance(groupId, unavailableBeIds, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) {
+ if (relocateAndBalance(groupId, unavailableBeIdsInGroup, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) {
colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
catalog.getEditLog().logColocateBackendsPerBucketSeq(info);
@@ -484,27 +484,44 @@ public class ColocateTableBalancer extends MasterDaemon {
private Set<Long> getUnavailableBeIdsInGroup(SystemInfoService infoService, ColocateTableIndex colocateIndex, GroupId groupId) {
Set<Long> backends = colocateIndex.getBackendsByGroup(groupId);
Set<Long> unavailableBeIds = Sets.newHashSet();
- long currTime = System.currentTimeMillis();
for (Long backendId : backends) {
- Backend be = infoService.getBackend(backendId);
- if (be == null) {
+ if (!checkBackendAvailable(backendId, infoService)) {
unavailableBeIds.add(backendId);
- } else if (!be.isAvailable()) {
- // 1. BE is dead for a long time
- // 2. BE is under decommission
- if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)
- || be.isDecommissioned()) {
- unavailableBeIds.add(backendId);
- }
}
}
return unavailableBeIds;
}
- private List<Long> getAvailableBeIdsInGroup(String cluster, SystemInfoService infoService, Set<Long> unavailableBeIds) {
- List<Long> allBackendIds = infoService.getClusterBackendIds(cluster, true);
- return allBackendIds.stream()
- .filter(id -> !unavailableBeIds.contains(id))
- .collect(Collectors.toList());
+ private List<Long> getAvailableBeIds(String cluster, SystemInfoService infoService) {
+ // get all backends to allBackendIds, and check be availability using checkBackendAvailable
+ // backend stopped for a short period of time is still considered available
+ List<Long> allBackendIds = infoService.getClusterBackendIds(cluster, false);
+ List<Long> availableBeIds = Lists.newArrayList();
+ for (Long backendId : allBackendIds) {
+ if (checkBackendAvailable(backendId, infoService)) {
+ availableBeIds.add(backendId);
+ }
+ }
+ return availableBeIds;
+ }
+
+ /**
+ * check backend available
+ * backend stopped for a short period of time is still considered available
+ */
+ private boolean checkBackendAvailable(Long backendId, SystemInfoService infoService) {
+ long currTime = System.currentTimeMillis();
+ Backend be = infoService.getBackend(backendId);
+ if (be == null) {
+ return false;
+ } else if (!be.isAvailable()) {
+ // 1. BE is dead for a long time
+ // 2. BE is under decommission
+ if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)
+ || be.isDecommissioned()) {
+ return false;
+ }
+ }
+ return true;
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java
index ee59267..47c2f58 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java
@@ -362,18 +362,75 @@ public class ColocateTableBalancerTest {
}
@Test
- public void testGetAvailableBeIdsInGroup(@Mocked SystemInfoService infoService) {
- List<Long> clusterAliveBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L);
+ public void testGetAvailableBeIds(@Mocked SystemInfoService infoService,
+ @Mocked Backend myBackend2,
+ @Mocked Backend myBackend3,
+ @Mocked Backend myBackend4,
+ @Mocked Backend myBackend5) {
+ List<Long> clusterBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L);
new Expectations(){
{
- infoService.getClusterBackendIds("cluster1", true);
- result = clusterAliveBackendIds;
+ infoService.getClusterBackendIds("cluster1", false);
+ result = clusterBackendIds;
+ minTimes = 0;
+
+ infoService.getBackend(1L);
+ result = null;
+ minTimes = 0;
+
+ // backend2 is available
+ infoService.getBackend(2L);
+ result = myBackend2;
+ minTimes = 0;
+ myBackend2.isAvailable();
+ result = true;
+ minTimes = 0;
+
+ // backend3 not available, and dead for a long time
+ infoService.getBackend(3L);
+ result = myBackend3;
+ minTimes = 0;
+ myBackend3.isAvailable();
+ result = false;
+ minTimes = 0;
+ myBackend3.isAlive();
+ result = false;
+ minTimes = 0;
+ myBackend3.getLastUpdateMs();
+ result = System.currentTimeMillis() - Config.tablet_repair_delay_factor_second * 1000 * 20;
+ minTimes = 0;
+
+ // backend4 available, not alive but dead for a short time
+ infoService.getBackend(4L);
+ result = myBackend4;
+ minTimes = 0;
+ myBackend4.isAvailable();
+ result = false;
+ minTimes = 0;
+ myBackend4.isAlive();
+ result = false;
+ minTimes = 0;
+ myBackend4.getLastUpdateMs();
+ result = System.currentTimeMillis();
+ minTimes = 0;
+
+ // backend5 not available, and in decommission
+ infoService.getBackend(5L);
+ result = myBackend5;
+ minTimes = 0;
+ myBackend5.isAvailable();
+ result = false;
+ minTimes = 0;
+ myBackend5.isAlive();
+ result = true;
+ minTimes = 0;
+ myBackend5.isDecommissioned();
+ result = true;
minTimes = 0;
}
};
- Set<Long> unavailableBeIds = Sets.newHashSet(4L, 5L, 6L);
- List<Long> availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIdsInGroup","cluster1", infoService, unavailableBeIds);
- Assert.assertArrayEquals(new long[]{1L, 2L, 3L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray());
+ List<Long> availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIds","cluster1", infoService);
+ Assert.assertArrayEquals(new long[]{2L, 4L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org