You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/11/22 13:23:06 UTC

[incubator-doris] branch master updated: [BUG] Fix Colocate table balance bug (#4936)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a6731  [BUG] Fix Colocate table balance bug (#4936)
37a6731 is described below

commit 37a67312449bc7c14f4649c189c5b91c632e5032
Author: gengjun-git <54...@users.noreply.github.com>
AuthorDate: Sun Nov 22 21:22:44 2020 +0800

    [BUG] Fix Colocate table balance bug (#4936)
    
    Fix bug that colocation group is always in unstable status.
---
 .../apache/doris/clone/ColocateTableBalancer.java  | 501 +++++++--------------
 .../doris/clone/ColocateTableBalancerTest.java     | 256 +++++++++--
 2 files changed, 396 insertions(+), 361 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java
index 4dcb82a..389a2c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java
@@ -26,7 +26,6 @@ import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.Tablet.TabletStatus;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
@@ -44,7 +43,6 @@ import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -74,241 +72,95 @@ public class ColocateTableBalancer extends MasterDaemon {
 
     @Override
     /*
-     * Each round, we do 3 steps:
-     * 1. Relocate group:
+     * Each round, we do 2 steps:
+     * 1. Relocate and balance group:
      *      Backend is not available, find a new backend to replace it.
-     *      Relocate at most one bucket in one group at a time.
-     *      
+     *      and after all unavailable has been replaced, balance the group
+     *
      * 2. Match group:
      *      If replica mismatch backends in a group, that group will be marked as unstable, and pass that 
      *      tablet to TabletScheduler.
      *      Otherwise, mark the group as stable
-     * 
-     * 3. Balance group:
-     *      Try balance group, and skip groups which contains unavailable backends.
      */
     protected void runAfterCatalogReady() {
-        relocateGroup();
+        relocateAndBalanceGroup();
         matchGroup();
-        balanceGroup();
     }
 
     /*
-     * Traverse all colocate groups, for each group:
-     * Check if there are backends dead or unavailable(decommission)
-     * If yes, for each buckets in this group, if the unavailable backend belongs to this bucket, we will find
-     * a new backend to replace the unavailable one.
-     * 
-     * eg:
-     *      original bucket backends sequence is:
-     *      [[1, 2, 3], [4, 1, 2], [3, 4, 1], [2, 3, 4], [1, 2, 3]]
-     *      
-     *      and backend 3 is dead, so we will find an available backend(eg. backend 4) to replace backend 3.
-     *      [[1, 2, 4], [4, 1, 2], [3, 4, 1], [2, 3, 4], [1, 2, 3]]
-     *      
-     *      NOTICE that in this example, we only replace the #3 backend in first bucket. That is, we only replace
-     *      one bucket in one group at each round. because we need to use newly-updated cluster load statistic to
-     *      find next available backend. and cluster load statistic is updated every 20 seconds.
+     * relocate and balance group
+     *  here we just let replicas in colocate table evenly distributed in cluster, not consider the
+     *  cluster load statistic.
+     *  for example:
+     *  currently there are 4 backends A B C D with following load:
+     *
+     *                +-+
+     *                | |
+     * +-+  +-+  +-+  | |
+     * | |  | |  | |  | |
+     * +-+  +-+  +-+  +-+
+     *  A    B    C    D
+     *
+     *  And colocate group balancer will still evenly distribute the replicas to all 4 backends, not
+     *  just 3 low load backends.
+     *
+     *                 X
+     *                 X
+     *  X    X    X   +-+
+     *  X    X    X   | |
+     * +-+  +-+  +-+  | |
+     * | |  | |  | |  | |
+     * +-+  +-+  +-+  +-+
+     * A    B    C    D
+     *
+     *  So After colocate balance, the cluster may still 'unbalanced' from a global perspective.
+     *  And the LoadBalancer will balance the non-colocate table's replicas to make the
+     *  cluster balance, eventually.
+     *
+     *  X    X    X    X
+     *  X    X    X    X
+     * +-+  +-+  +-+  +-+
+     * | |  | |  | |  | |
+     * | |  | |  | |  | |
+     * +-+  +-+  +-+  +-+
+     *  A    B    C    D
      */
-    private void relocateGroup() {
-        if (Config.disable_colocate_relocate) {
+    private void relocateAndBalanceGroup() {
+        if (Config.disable_colocate_balance) {
             return;
         }
+
         Catalog catalog = Catalog.getCurrentCatalog();
         ColocateTableIndex colocateIndex = catalog.getColocateTableIndex();
         SystemInfoService infoService = Catalog.getCurrentSystemInfo();
-        Map<String, ClusterLoadStatistic> statisticMap = Catalog.getCurrentCatalog().getTabletScheduler().getStatisticMap();
-        long currTime = System.currentTimeMillis();
-        
+
         // get all groups
         Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
         for (GroupId groupId : groupIds) {
-            // get all backends in this group
-            Set<Long> backends = colocateIndex.getBackendsByGroup(groupId);
-            long unavailableBeId = -1;
-            for (Long backendId : backends) {
-                // find an unavailable backend. even if there are than one unavailable backend,
-                // we just handle the first one.
-                Backend be = infoService.getBackend(backendId);
-                if (be == null) {
-                    unavailableBeId = backendId;
-                    break;
-                } else if (!be.isAvailable()) {
-                    // 1. BE is dead for a long time
-                    // 2. BE is under decommission
-                    if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)
-                        || be.isDecommissioned()) {
-                        unavailableBeId = backendId;
-                        break;
-                    }
-                }
-            }
-
-            if (unavailableBeId == -1) {
-                // all backends in this group are available.
-                // But in previous version we had a bug that replicas of a tablet may be located on same host.
-                // we have to check it.
-                List<List<Long>> backendsPerBucketsSeq = colocateIndex.getBackendsPerBucketSeq(groupId);
-                OUT: for (List<Long> backendIds : backendsPerBucketsSeq) {
-                    Set<String> hosts = Sets.newHashSet();
-                    for (Long beId : backendIds) {
-                        Backend be = infoService.getBackend(beId);
-                        if (be == null) {
-                            // backend can be dropped any time, just skip this bucket
-                            break;
-                        }
-                        if (!hosts.add(be.getHost())) {
-                            // find replicas on same host. simply mark this backend as unavailable,
-                            // so that following step will find another backend
-                            unavailableBeId = beId;
-                            break OUT;
-                        }
-                    }
-                }
-
-                if (unavailableBeId == -1) {
-                    // if everything is ok, continue
-                    continue;
-                }
-            }
-
-            // find the first bucket which contains the unavailable backend
-            LOG.info("backend {} is unavailable in colocate group {}", unavailableBeId, groupId);
-            List<Set<Long>> bucketBackendsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId);
-            int tabletOrderIdx = 0;
-            for (Set<Long> set : bucketBackendsSeq) {
-                if (set.contains(unavailableBeId)) {
-                    break;
-                }
-                tabletOrderIdx++;
-            }
-
-            // select a new backend to replace the unavailable one
-            long newBackendId = selectSubstituteBackend(tabletOrderIdx, groupId, unavailableBeId, 
-                    bucketBackendsSeq.get(tabletOrderIdx), statisticMap);
-            if (newBackendId != -1) {
-                // replace backend
-                bucketBackendsSeq.get(tabletOrderIdx).remove(unavailableBeId);
-                bucketBackendsSeq.get(tabletOrderIdx).add(newBackendId);
-                colocateIndex.setBackendsSetByIdxForGroup(groupId, tabletOrderIdx, bucketBackendsSeq.get(tabletOrderIdx));
-                LOG.info("select backend {} to replace backend {} for bucket {} in group {}. now backends set is: {}",
-                        newBackendId, unavailableBeId, tabletOrderIdx, groupId, bucketBackendsSeq.get(tabletOrderIdx));
+            Database db = catalog.getDb(groupId.dbId);
+            if (db == null) {
+                continue;
             }
 
-            // only handle one backend at a time
-            break;
-        }
-    }
-
-    /*
-     * Select a substitute backend for specified bucket and colocate group.
-     * return -1 if backend not found.
-     * we need to move all replicas of this bucket to the new backend, so we have to check if the new
-     * backend can save all these replicas.
-     */
-    private long selectSubstituteBackend(int tabletOrderIdx, GroupId groupId, long unavailableBeId, 
-            Set<Long> excludeBeIds, Map<String, ClusterLoadStatistic> statisticMap) {
-        ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
-        Database db = Catalog.getCurrentCatalog().getDb(groupId.dbId);
-        if (db == null) {
-            LOG.info("db {} does not exist", groupId.dbId);
-            return -1;
-        }
-        ClusterLoadStatistic statistic = statisticMap.get(db.getClusterName());
-        if (statistic == null) {
-            LOG.info("cluster {} statistic does not exist", db.getClusterName());
-            return -1;
-        }
-        
-        // calculate the total replica size of this bucket
-        List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
-        long totalReplicaNum = 0;
-        long totalReplicaSize = 0;
-        db.readLock();
-        try {
-            for (Long tblId : tableIds) {
-                OlapTable tbl = (OlapTable) db.getTable(tblId);
-                if (tbl == null) {
-                    continue;
-                }
-
-                for (Partition partition : tbl.getPartitions()) {
-                    for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                        long tabletId = index.getTabletIdsInOrder().get(tabletOrderIdx);
-                        Tablet tablet = index.getTablet(tabletId);
-                        Replica replica = tablet.getReplicaByBackendId(unavailableBeId);
-                        if (replica != null) {
-                            totalReplicaNum++;
-                            totalReplicaSize += replica.getDataSize();
-                        }
-                    }
-                }
+            Map<String, ClusterLoadStatistic> statisticMap = catalog.getTabletScheduler().getStatisticMap();
+            if (statisticMap == null) {
+                continue;
             }
-        } finally {
-            db.readUnlock();
-        }
-        LOG.debug("the number and size of replicas on backend {} of bucket {} is: {} and {}",
-                unavailableBeId, tabletOrderIdx, totalReplicaNum, totalReplicaSize);
-        
-        /*
-         * There is an unsolved problem of finding a new backend for data migration:
-         *    Different table(partition) in this group may in different storage medium(SSD or HDD). If one backend
-         *    is down, the best solution is to find a backend which has both SSD and HDD, and replicas can be
-         *    relocated in corresponding storage medium.
-         *    But in fact, backends can be heterogeneous, which may only has SSD or HDD. If we choose to strictly
-         *    find backends with expecting storage medium, this may lead to a consequence that most of replicas
-         *    are gathered in a small portion of backends.
-         *    
-         *    So for simplicity, we ignore the storage medium property, just find a low load backend which has
-         *    capacity to save these replicas.
-         */
-        List<BackendLoadStatistic> beStats = statistic.getSortedBeLoadStats(null /* mix medium */);
-        if (beStats.isEmpty()) {
-            LOG.warn("failed to relocate backend for colocate group: {}, no backends found", groupId);
-            return -1;
-        }
-
-        // the selected backend should not be on same host of other backends of this bucket.
-        // here we generate a host set for further checking.
-        SystemInfoService infoService = Catalog.getCurrentSystemInfo();
-        Set<String> excludeHosts = Sets.newHashSet();
-        for (Long excludeBeId : excludeBeIds) {
-            Backend be = infoService.getBackend(excludeBeId);
-            if (be == null) {
-                LOG.info("Backend {} has been dropped when finding backend for colocate group {}", excludeBeId, groupId);
+            ClusterLoadStatistic statistic = statisticMap.get(db.getClusterName());
+            if (statistic == null) {
                 continue;
             }
-            excludeHosts.add(be.getHost());
-        }
-        Preconditions.checkState(excludeBeIds.size() >= excludeHosts.size());
 
-        // beStats is ordered by load score, ascend. so finding the available from first to last
-        BackendLoadStatistic chosenBe = null;
-        for (BackendLoadStatistic beStat : beStats) {
-            if (beStat.isAvailable() && beStat.getBeId() != unavailableBeId && !excludeBeIds.contains(beStat.getBeId())) {
-                Backend be = infoService.getBackend(beStat.getBeId());
-                if (be == null) {
-                    continue;
-                }
-                if (excludeHosts.contains(be.getHost())) {
-                    continue;
-                }
-                chosenBe = beStat;
-                break;
+            Set<Long> unavailableBeIds = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId);
+            List<Long> availableBeIds = getAvailableBeIdsInGroup(db.getClusterName(), infoService, unavailableBeIds);
+            List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
+            if (relocateAndBalance(groupId, unavailableBeIds, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) {
+                colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
+                ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
+                catalog.getEditLog().logColocateBackendsPerBucketSeq(info);
+                LOG.info("balance group {}. now backends per bucket sequence is: {}", groupId, balancedBackendsPerBucketSeq);
             }
         }
-        if (chosenBe == null) {
-            LOG.warn("failed to find an available backend to relocate for colocate group: {}", groupId);
-            return -1;
-        }
-
-        // check if there is enough capacity to save all these replicas
-        if (!chosenBe.canFitInColocate(totalReplicaSize)) {
-            LOG.warn("no backend has enough capacity to save replicas in group {} with bucket: {}", groupId, tabletOrderIdx);
-            return -1;
-        }
-
-        return chosenBe.getBeId();
     }
 
     /*
@@ -396,93 +248,6 @@ public class ColocateTableBalancer extends MasterDaemon {
     }
 
     /*
-     * Balance colocate groups which are unstable
-     *  here we just let replicas in colocate table evenly distributed in cluster, not consider the
-     *  cluster load statistic.
-     *  for example:
-     *  currently there are 4 backends A B C D with following load:
-     *  
-     *                +-+
-     *                | |
-     * +-+  +-+  +-+  | |
-     * | |  | |  | |  | |
-     * +-+  +-+  +-+  +-+
-     *  A    B    C    D
-     *  
-     *  And colocate group balancer will still evenly distribute the replicas to all 4 backends, not 
-     *  just 3 low load backends.
-     *  
-     *                 X
-     *                 X
-     *  X    X    X   +-+
-     *  X    X    X   | |
-     * +-+  +-+  +-+  | |
-     * | |  | |  | |  | |
-     * +-+  +-+  +-+  +-+
-     * A    B    C    D
-     *  
-     *  So After colocate balance, the cluster may still 'unbalanced' from a global perspective.
-     *  And the LoadBalancer will balance the non-colocate table's replicas to make the 
-     *  cluster balance, eventually.
-     *  
-     *  X    X    X    X
-     *  X    X    X    X
-     * +-+  +-+  +-+  +-+
-     * | |  | |  | |  | |
-     * | |  | |  | |  | |
-     * +-+  +-+  +-+  +-+
-     *  A    B    C    D
-     */
-    private void balanceGroup() {
-        if (Config.disable_colocate_balance) {
-            return;
-        }
-        Catalog catalog = Catalog.getCurrentCatalog();
-        SystemInfoService infoService = Catalog.getCurrentSystemInfo();
-        ColocateTableIndex colocateIndex = catalog.getColocateTableIndex();
-
-        Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
-        for (GroupId groupId : groupIds) {
-            // skip unstable groups
-            if (colocateIndex.isGroupUnstable(groupId)) {
-                continue;
-            }
-
-            // skip backend unavailable groups
-            Set<Long> backendIds = colocateIndex.getBackendsByGroup(groupId);
-            boolean isAllBackendsAvailable = true;
-            for (Long beId : backendIds) {
-                Backend be = infoService.getBackend(beId);
-                if (be == null || !be.isAvailable()) {
-                    isAllBackendsAvailable = false;
-                    break;
-                }
-            }
-            if (!isAllBackendsAvailable) {
-                continue;
-            }
-
-            // all backends are good
-            Database db = catalog.getDb(groupId.dbId);
-            if (db == null) {
-                continue;
-            }
-
-            List<Long> allBackendIds = infoService.getClusterBackendIds(db.getClusterName(), true);
-            List<Long> allBackendIdsAvailable = allBackendIds.stream()
-                    .filter(infoService::checkBackendAvailable)
-                    .collect(Collectors.toList());
-            List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
-            if (balance(groupId, allBackendIdsAvailable, colocateIndex, infoService, balancedBackendsPerBucketSeq)) {
-                colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
-                ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
-                Catalog.getCurrentCatalog().getEditLog().logColocateBackendsPerBucketSeq(info);
-                LOG.info("balance group {}. now backends per bucket sequence is: {}", groupId, balancedBackendsPerBucketSeq);
-            }
-        }
-    }
-
-    /*
      * The balance logic is as follow:
      * 
      * All backends: A,B,C,D,E,F,G,H,I,J
@@ -508,7 +273,7 @@ public class ColocateTableBalancer extends MasterDaemon {
      * Algorithm:
      * 0. Generate the flat list of backends per bucket sequence:
      *      A B C A D E A F G A H I
-     * 1. Sort the backend in this order by replication num, descending:
+     * 1. Sort backends order by replication num and load score for same replication num backends, descending:
      *      A B C D E F G H I J
      * 2. Check the diff of the first backend(A)'s replica num and last backend(J)'s replica num.
      *      If diff is less or equal than 1, we consider this group as balance. Jump to step 5.
@@ -520,12 +285,16 @@ public class ColocateTableBalancer extends MasterDaemon {
      *    Partition this flat list by replication num:
      *      [J B C] [J D E] [A F G] [A H I]
      *    And this is our new balanced backends per bucket sequence.
-     *    
+     *
+     *  relocate is similar to balance, but choosing unavailable be as src, and move all bucketIds on unavailable be to
+     *  low be
+     *
      *  Return true if backends per bucket sequence change and new sequence is saved in balancedBackendsPerBucketSeq.
      *  Return false if nothing changed.
      */
-    private boolean balance(GroupId groupId, List<Long> allAvailBackendIds, ColocateTableIndex colocateIndex,
-            SystemInfoService infoService, List<List<Long>> balancedBackendsPerBucketSeq) {
+    private boolean relocateAndBalance(GroupId groupId, Set<Long> unavailableBeIds, List<Long> availableBeIds,
+                                       ColocateTableIndex colocateIndex, SystemInfoService infoService,
+                                       ClusterLoadStatistic statistic, List<List<Long>> balancedBackendsPerBucketSeq) {
         ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
         int replicationNum = groupSchema.getReplicationNum();
         List<List<Long>> backendsPerBucketSeq = Lists.newArrayList(colocateIndex.getBackendsPerBucketSeq(groupId));
@@ -543,29 +312,49 @@ public class ColocateTableBalancer extends MasterDaemon {
             }
             Preconditions.checkState(backendsPerBucketSeq.size() == hostsPerBucketSeq.size());
 
-            // sort backends with replica num
+            long srcBeId = -1;
+            List<Integer> seqIndexes = null;
+            boolean hasUnavailableBe = false;
+            // first choose the unavailable be as src be
+            for (Long beId : unavailableBeIds) {
+                seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, beId);
+                if (seqIndexes.size() > 0) {
+                    srcBeId = beId;
+                    hasUnavailableBe = true;
+                    break;
+                }
+            }
+            // sort backends with replica num in desc order
             List<Map.Entry<Long, Long>> backendWithReplicaNum =
-                    getSortedBackendReplicaNumPairs(allAvailBackendIds, flatBackendsPerBucketSeq);
-            // if there is only one available backend, end the outer loop
-            if (backendWithReplicaNum.size() == 1) {
-                LOG.info("there is only one available backend, end the outer loop in colocate group {}", groupId);
-                break;
+                    getSortedBackendReplicaNumPairs(availableBeIds, unavailableBeIds, statistic, flatBackendsPerBucketSeq);
+            if (seqIndexes == null || seqIndexes.size() <= 0) {
+                // if there is only one available backend and no unavailable bucketId to relocate, end the outer loop
+                if (backendWithReplicaNum.size() <= 1) {
+                    break;
+                }
+
+                // choose max bucketId num be as src be
+                srcBeId = backendWithReplicaNum.get(0).getKey();
+                seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, srcBeId);
             }
 
-            int i = 0;
+            int i;
+            if (hasUnavailableBe) {
+                i = -1;
+            } else {
+                i = 0;
+            }
             int j = backendWithReplicaNum.size() - 1;
             while (i < j) {
                 boolean isThisRoundChanged = false;
-                // we try to use a low backend to replace the high backend.
+                // we try to use a low backend to replace the src backend.
                 // if replace failed(eg: both backends are on some host), select next low backend and try(j--)
-                Map.Entry<Long, Long> highBackend = backendWithReplicaNum.get(i);
                 Map.Entry<Long, Long> lowBackend = backendWithReplicaNum.get(j);
-                if (highBackend.getValue() - lowBackend.getValue() <= 1) {
+                if ((!hasUnavailableBe) && (seqIndexes.size() - lowBackend.getValue()) <= 1) {
                     // balanced
                     break OUT;
                 }
 
-                long srcBeId = highBackend.getKey();
                 long destBeId = lowBackend.getKey();
                 Backend destBe = infoService.getBackend(destBeId);
                 if (destBe == null) {
@@ -573,18 +362,6 @@ public class ColocateTableBalancer extends MasterDaemon {
                     return false;
                 }
 
-                /* 
-                 * get the array indexes of elements in flatBackendsPerBucketSeq which equals to srcBeId
-                 * eg:
-                 * flatBackendsPerBucketSeq:
-                 *      A B C A D E A F G A H I
-                 * and srcBeId is A.
-                 * so seqIndexes is:
-                 *      0 3 6 9
-                 */
-                List<Integer> seqIndexes = IntStream.range(0, flatBackendsPerBucketSeq.size()).boxed().filter(
-                        idx -> flatBackendsPerBucketSeq.get(idx).equals(srcBeId)).collect(Collectors.toList());
-
                 for (int seqIndex : seqIndexes) {
                     // the bucket index.
                     // eg: 0 / 3 = 0, so that the bucket index of the 4th backend id in flatBackendsPerBucketSeq is 0.
@@ -614,7 +391,7 @@ public class ColocateTableBalancer extends MasterDaemon {
                                          "end outer loop in colocate group {}", groupId);
                         break OUT;
                     } else {
-                        // select another load backend and try again
+                        // select another low backend and try again
                         continue;
                     }
                 }
@@ -649,19 +426,85 @@ public class ColocateTableBalancer extends MasterDaemon {
         return hostsPerBucketSeq;
     }
 
-    private List<Map.Entry<Long, Long>> getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds,
-            List<Long> flatBackendsPerBucketSeq) {
+    private List<Map.Entry<Long, Long>> getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds, Set<Long> unavailBackendIds,
+                ClusterLoadStatistic statistic, List<Long> flatBackendsPerBucketSeq) {
         // backend id -> replica num, and sorted by replica num, descending.
         Map<Long, Long> backendToReplicaNum = flatBackendsPerBucketSeq.stream()
                 .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
+        // remove unavailable backend
+        for (Long backendId : unavailBackendIds) {
+            backendToReplicaNum.remove(backendId);
+        }
         // add backends which are not in flatBackendsPerBucketSeq, with replication number 0
         for (Long backendId : allAvailBackendIds) {
             if (!backendToReplicaNum.containsKey(backendId)) {
                 backendToReplicaNum.put(backendId, 0L);
             }
         }
-        List<Map.Entry<Long, Long>> backendWithReplicaNum = backendToReplicaNum.entrySet().stream().sorted(
-                Collections.reverseOrder(Map.Entry.comparingByValue())).collect(Collectors.toList());
-        return backendWithReplicaNum;
+
+        return backendToReplicaNum
+                .entrySet()
+                .stream()
+                .sorted((entry1, entry2) -> {
+                    if (!entry1.getValue().equals(entry2.getValue())) {
+                        return (int)(entry2.getValue() - entry1.getValue());
+                    }
+                    BackendLoadStatistic beStat1 = statistic.getBackendLoadStatistic(entry1.getKey());
+                    BackendLoadStatistic beStat2 = statistic.getBackendLoadStatistic(entry2.getKey());
+                    if (beStat1 == null || beStat2 == null) {
+                        return 0;
+                    }
+                    double loadScore1 = beStat1.getMixLoadScore();
+                    double loadScore2 = beStat2.getMixLoadScore();
+                    if (Math.abs(loadScore1 - loadScore2) < 1e-6) {
+                        return 0;
+                    } else if (loadScore2 > loadScore1) {
+                        return 1;
+                    } else {
+                        return -1;
+                    }
+                })
+                .collect(Collectors.toList());
+    }
+
+    /*
+     * get the array indexes of elements in flatBackendsPerBucketSeq which equals to beId
+     * eg:
+     * flatBackendsPerBucketSeq:
+     *      A B C A D E A F G A H I
+     * and srcBeId is A.
+     * so seqIndexes is:
+     *      0 3 6 9
+     */
+    private List<Integer> getBeSeqIndexes(List<Long> flatBackendsPerBucketSeq, long beId) {
+        return IntStream.range(0, flatBackendsPerBucketSeq.size()).boxed().filter(
+                idx -> flatBackendsPerBucketSeq.get(idx).equals(beId)).collect(Collectors.toList());
+    }
+
+    private Set<Long> getUnavailableBeIdsInGroup(SystemInfoService infoService, ColocateTableIndex colocateIndex, GroupId groupId) {
+        Set<Long> backends = colocateIndex.getBackendsByGroup(groupId);
+        Set<Long> unavailableBeIds = Sets.newHashSet();
+        long currTime = System.currentTimeMillis();
+        for (Long backendId : backends) {
+            Backend be = infoService.getBackend(backendId);
+            if (be == null) {
+                unavailableBeIds.add(backendId);
+            } else if (!be.isAvailable()) {
+                // 1. BE is dead for a long time
+                // 2. BE is under decommission
+                if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)
+                        || be.isDecommissioned()) {
+                    unavailableBeIds.add(backendId);
+                }
+            }
+        }
+        return unavailableBeIds;
+    }
+
+    private List<Long> getAvailableBeIdsInGroup(String cluster, SystemInfoService infoService, Set<Long> unavailableBeIds) {
+        List<Long> allBackendIds = infoService.getClusterBackendIds(cluster, true);
+        return allBackendIds.stream()
+                .filter(id -> !unavailableBeIds.contains(id))
+                .collect(Collectors.toList());
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java
index bb76126..ee59267 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.doris.clone;
 
-import org.apache.doris.catalog.Catalog;
+import com.google.common.collect.Sets;
+import mockit.Delegate;
 import org.apache.doris.catalog.ColocateGroupSchema;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -34,23 +37,17 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import mockit.Expectations;
 import mockit.Mocked;
 
 public class ColocateTableBalancerTest {
-    
-    @Mocked
-    private Catalog catalog;
-    @Mocked
-    private SystemInfoService infoService;
-
-    private TabletScheduler tabletScheduler;
-    
     private ColocateTableBalancer balancer = ColocateTableBalancer.getInstance();
-    
+
     private Backend backend1;
     private Backend backend2;
     private Backend backend3;
@@ -61,6 +58,8 @@ public class ColocateTableBalancerTest {
     private Backend backend8;
     private Backend backend9;
 
+    private Map<Long, Double> mixLoadScores;
+
     @Before
     public void setUp() {
         backend1 = new Backend(1L, "192.168.1.1", 9050);
@@ -74,6 +73,29 @@ public class ColocateTableBalancerTest {
         backend8 = new Backend(8L, "192.168.1.8", 9050);
         backend9 = new Backend(9L, "192.168.1.8", 9050);
 
+        mixLoadScores = Maps.newHashMap();
+        mixLoadScores.put(1L, 0.1);
+        mixLoadScores.put(2L, 0.5);
+        mixLoadScores.put(3L, 0.4);
+        mixLoadScores.put(4L, 0.2);
+        mixLoadScores.put(5L, 0.3);
+        mixLoadScores.put(6L, 0.6);
+        mixLoadScores.put(7L, 0.8);
+        mixLoadScores.put(8L, 0.7);
+        mixLoadScores.put(9L, 0.9);
+    }
+
+    private ColocateTableIndex createColocateIndex(GroupId groupId, List<Long> flatList) {
+        ColocateTableIndex colocateTableIndex = new ColocateTableIndex();
+        int replicationNum = 3;
+        List<List<Long>> backendsPerBucketSeq = Lists.partition(flatList, replicationNum);
+        colocateTableIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
+        return colocateTableIndex;
+    }
+
+    @Test
+    public void testBalance(@Mocked SystemInfoService infoService,
+                            @Mocked ClusterLoadStatistic statistic) {
         new Expectations() {
             {
                 infoService.getBackend(1L);
@@ -103,20 +125,12 @@ public class ColocateTableBalancerTest {
                 infoService.getBackend(9L);
                 result = backend9;
                 minTimes = 0;
+
+                statistic.getBackendLoadStatistic(anyLong);
+                result = null;
+                minTimes = 0;
             }
         };
-    }
-
-    private ColocateTableIndex createColocateIndex(GroupId groupId, List<Long> flatList) {
-        ColocateTableIndex colocateTableIndex = new ColocateTableIndex();
-        int replicationNum = 3;
-        List<List<Long>> backendsPerBucketSeq = Lists.partition(flatList, replicationNum);
-        colocateTableIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
-        return colocateTableIndex;
-    }
-
-    @Test
-    public void testBalance() {
         GroupId groupId = new GroupId(10000, 10001);
         List<Column> distributionCols = Lists.newArrayList();
         distributionCols.add(new Column("k1", PrimitiveType.INT));
@@ -132,9 +146,8 @@ public class ColocateTableBalancerTest {
 
         List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
         List<Long> allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
-        boolean changed = (Boolean) Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds,
-                colocateTableIndex, infoService, balancedBackendsPerBucketSeq);
-        System.out.println(balancedBackendsPerBucketSeq);
+        boolean changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
+                colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
         List<List<Long>> expected = Lists.partition(
                 Lists.newArrayList(9L, 5L, 3L, 4L, 6L, 8L, 7L, 6L, 1L, 2L, 9L, 4L, 1L, 2L, 3L), 3);
         Assert.assertTrue(changed);
@@ -145,15 +158,50 @@ public class ColocateTableBalancerTest {
                 Lists.newArrayList(9L, 8L, 7L, 8L, 6L, 5L, 9L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L));
         Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema);
         balancedBackendsPerBucketSeq.clear();
-        changed = (Boolean) Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds,
-                colocateTableIndex, infoService, balancedBackendsPerBucketSeq);
+        changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
+                colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
         System.out.println(balancedBackendsPerBucketSeq);
         Assert.assertFalse(changed);
         Assert.assertTrue(balancedBackendsPerBucketSeq.isEmpty());
     }
 
     @Test
-    public void testFixBalanceEndlessLoop() {
+    public void testFixBalanceEndlessLoop(@Mocked SystemInfoService infoService,
+                                          @Mocked ClusterLoadStatistic statistic) {
+        new Expectations() {
+            {
+                infoService.getBackend(1L);
+                result = backend1;
+                minTimes = 0;
+                infoService.getBackend(2L);
+                result = backend2;
+                minTimes = 0;
+                infoService.getBackend(3L);
+                result = backend3;
+                minTimes = 0;
+                infoService.getBackend(4L);
+                result = backend4;
+                minTimes = 0;
+                infoService.getBackend(5L);
+                result = backend5;
+                minTimes = 0;
+                infoService.getBackend(6L);
+                result = backend6;
+                minTimes = 0;
+                infoService.getBackend(7L);
+                result = backend7;
+                minTimes = 0;
+                infoService.getBackend(8L);
+                result = backend8;
+                minTimes = 0;
+                infoService.getBackend(9L);
+                result = backend9;
+                minTimes = 0;
+                statistic.getBackendLoadStatistic(anyLong);
+                result = null;
+                minTimes = 0;
+            }
+        };
         GroupId groupId = new GroupId(10000, 10001);
         List<Column> distributionCols = Lists.newArrayList();
         distributionCols.add(new Column("k1", PrimitiveType.INT));
@@ -168,8 +216,8 @@ public class ColocateTableBalancerTest {
 
         List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
         List<Long> allAvailBackendIds = Lists.newArrayList(7L);
-        boolean changed = Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds,
-                                                 colocateTableIndex, infoService, balancedBackendsPerBucketSeq);
+        boolean changed = Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
+                                                 colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
         Assert.assertFalse(changed);
 
         // 2. all backends are checked but this round is not changed
@@ -180,8 +228,152 @@ public class ColocateTableBalancerTest {
 
         balancedBackendsPerBucketSeq = Lists.newArrayList();
         allAvailBackendIds = Lists.newArrayList(7L, 8L, 9L);
-        changed = Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds,
-                                         colocateTableIndex, infoService, balancedBackendsPerBucketSeq);
+        changed = Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
+                                         colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
         Assert.assertFalse(changed);
     }
+
+    @Test
+    public void testGetSortedBackendReplicaNumPairs(@Mocked ClusterLoadStatistic statistic) {
+        new Expectations() {
+            {
+                statistic.getBackendLoadStatistic(anyLong);
+                result = new Delegate<BackendLoadStatistic>() {
+                    BackendLoadStatistic delegate(Long beId) {
+                        return new FakeBackendLoadStatistic(beId, null, null, null);
+                    }
+                };
+                minTimes = 0;
+            }
+        };
+
+        // all buckets are on different be
+        List<Long> allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
+        Set<Long> unavailBackendIds = Sets.newHashSet(9L);
+        List<Long> flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
+        List<Map.Entry<Long, Long>> backends = Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs",
+                allAvailBackendIds, unavailBackendIds, statistic, flatBackendsPerBucketSeq);
+        long[] backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray();
+        Assert.assertArrayEquals(new long[]{7L, 8L, 6L, 2L, 3L, 5L, 4L, 1L}, backendIds);
+
+        // 0,1 bucket on same be and 5, 6 on same be
+        flatBackendsPerBucketSeq = Lists.newArrayList(1L, 1L, 3L, 4L, 5L, 6L, 7L, 7L, 9L);
+        backends = Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs", allAvailBackendIds, unavailBackendIds,
+                statistic, flatBackendsPerBucketSeq);
+        backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray();
+        Assert.assertArrayEquals(new long[]{7L, 1L, 6L, 3L, 5L, 4L, 8L, 2L}, backendIds);
+    }
+
+    public final class FakeBackendLoadStatistic extends BackendLoadStatistic {
+        public FakeBackendLoadStatistic(long beId, String clusterName, SystemInfoService infoService,
+                                    TabletInvertedIndex invertedIndex) {
+            super(beId, clusterName, infoService, invertedIndex);
+        }
+
+        @Override
+        public double getMixLoadScore() {
+            return mixLoadScores.get(getBeId());
+        }
+    }
+
+    @Test
+    public void testGetBeSeqIndexes() {
+        List<Long> flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 2L, 3L, 4L, 2L);
+        List<Integer> indexes = Deencapsulation.invoke(balancer, "getBeSeqIndexes", flatBackendsPerBucketSeq, 2L);
+        Assert.assertArrayEquals(new int[]{1, 2, 5}, indexes.stream().mapToInt(i->i).toArray());
+        System.out.println("backend1 id is " + backend1.getId());
+    }
+
+    @Test
+    public void testGetUnavailableBeIdsInGroup(@Mocked ColocateTableIndex colocateTableIndex,
+                                               @Mocked SystemInfoService infoService,
+                                               @Mocked Backend myBackend2,
+                                               @Mocked Backend myBackend3,
+                                               @Mocked Backend myBackend4,
+                                               @Mocked Backend myBackend5
+                                               ) {
+        GroupId groupId = new GroupId(10000, 10001);
+        Set<Long> allBackendsInGroup = Sets.newHashSet(1L, 2L, 3L, 4L, 5L);
+        new Expectations() {
+            {
+                infoService.getBackend(1L);
+                result = null;
+                minTimes = 0;
+
+                // backend2 is available
+                infoService.getBackend(2L);
+                result = myBackend2;
+                minTimes = 0;
+                myBackend2.isAvailable();
+                result = true;
+                minTimes = 0;
+
+                // backend3 not available, and dead for a long time
+                infoService.getBackend(3L);
+                result = myBackend3;
+                minTimes = 0;
+                myBackend3.isAvailable();
+                result = false;
+                minTimes = 0;
+                myBackend3.isAlive();
+                result = false;
+                minTimes = 0;
+                myBackend3.getLastUpdateMs();
+                result = System.currentTimeMillis() - Config.tablet_repair_delay_factor_second * 1000 * 20;
+                minTimes = 0;
+
+                // backend4 not available, and dead for a short time
+                infoService.getBackend(4L);
+                result = myBackend4;
+                minTimes = 0;
+                myBackend4.isAvailable();
+                result = false;
+                minTimes = 0;
+                myBackend4.isAlive();
+                result = false;
+                minTimes = 0;
+                myBackend4.getLastUpdateMs();
+                result = System.currentTimeMillis();
+                minTimes = 0;
+
+                // backend5 not available, and in decommission
+                infoService.getBackend(5L);
+                result = myBackend5;
+                minTimes = 0;
+                myBackend5.isAvailable();
+                result = false;
+                minTimes = 0;
+                myBackend5.isAlive();
+                result = true;
+                minTimes = 0;
+                myBackend5.isDecommissioned();
+                result = true;
+                minTimes = 0;
+
+                colocateTableIndex.getBackendsByGroup(groupId);
+                result = allBackendsInGroup;
+                minTimes = 0;
+            }
+        };
+
+        Set<Long> unavailableBeIds = Deencapsulation.invoke(balancer, "getUnavailableBeIdsInGroup", infoService, colocateTableIndex, groupId);
+        System.out.println(unavailableBeIds);
+        Assert.assertArrayEquals(new long[]{1L, 3L, 5L}, unavailableBeIds.stream().mapToLong(i->i).sorted().toArray());
+    }
+
+    @Test
+    public void testGetAvailableBeIdsInGroup(@Mocked SystemInfoService infoService) {
+        List<Long> clusterAliveBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L);
+        new Expectations(){
+            {
+                infoService.getClusterBackendIds("cluster1", true);
+                result = clusterAliveBackendIds;
+                minTimes = 0;
+            }
+        };
+
+        Set<Long> unavailableBeIds = Sets.newHashSet(4L, 5L, 6L);
+        List<Long> availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIdsInGroup","cluster1", infoService, unavailableBeIds);
+        Assert.assertArrayEquals(new long[]{1L, 2L, 3L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org