You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/12/29 03:40:46 UTC

[GitHub] [incubator-doris] morningman commented on a change in pull request #5010: [Rebalancer] support partition rebalancer

morningman commented on a change in pull request #5010:
URL: https://github.com/apache/incubator-doris/pull/5010#discussion_r549369324



##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
##########
@@ -602,5 +604,96 @@ public void clear() {
     public Map<Long, Long> getReplicaToTabletMap() {
         return replicaToTabletMap;
     }
+
+    // Only build from available bes, exclude colocate tables
+    public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartitionInfoBySkew(List<Long> availableBeIds) {
+        readLock();
+
+        // 1. gen <partitionId-indexId, <beId, replicaCount>>
+        // for each replica(all tablets):
+        //      find beId, then replicaCount++
+        Map<TStorageMedium, Table<Long, Long, Map<Long, Long>>> partitionReplicasInfoMaps = Maps.newHashMap();
+        for (TStorageMedium medium : TStorageMedium.values()) {
+            partitionReplicasInfoMaps.put(medium, HashBasedTable.create());
+        }
+        try {
+            // Changes to the returned set will update the underlying table
+            // tablet id -> (backend id -> replica)
+            Set<Table.Cell<Long, Long, Replica>> cells = replicaMetaTable.cellSet();
+            for (Table.Cell<Long, Long, Replica> cell : cells) {
+                Long tabletId = cell.getRowKey();
+                Long beId = cell.getColumnKey();
+                try {
+                    Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId);
+                    TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
+                    Preconditions.checkNotNull(tabletMeta, "invalid tablet " + tabletId);
+                    Preconditions.checkState(!Catalog.getCurrentColocateIndex().isColocateTable(tabletMeta.getTableId()),
+                            "should not be the colocate table");
+
+                    TStorageMedium medium = tabletMeta.getStorageMedium();
+                    Table<Long, Long, Map<Long, Long>> partitionReplicasInfo = partitionReplicasInfoMaps.get(medium);
+                    Map<Long, Long> countMap = partitionReplicasInfo.get(tabletMeta.getPartitionId(), tabletMeta.getIndexId());
+                    if (countMap == null) {
+                        // If one be doesn't have any replica of one partition, it should be counted too.
+                        countMap = availableBeIds.stream().collect(Collectors.toMap(i -> i, i -> 0L));
+                    }
+
+                    Long count = countMap.get(beId);
+                    countMap.put(beId, count + 1L);
+                    partitionReplicasInfo.put(tabletMeta.getPartitionId(), tabletMeta.getIndexId(), countMap);
+                    partitionReplicasInfoMaps.put(medium, partitionReplicasInfo);
+                } catch (IllegalStateException | NullPointerException e) {
+                    // If the tablet or be has some problem, don't count in
+                    LOG.debug(e.getMessage());
+                }
+            }
+        } finally {
+            readUnlock();
+        }
+
+        // 2. Populate ClusterBalanceInfo::table_info_by_skew
+        // for each PartitionId-MaterializedIndex:
+        //      for each beId: record max_count, min_count(replicaCount)
+        //      put <max_count-min_count, TableBalanceInfo> to table_info_by_skew
+        Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> skewMaps = Maps.newHashMap();
+        for (TStorageMedium medium : TStorageMedium.values()) {
+            TreeMultimap<Long, PartitionBalanceInfo> partitionInfoBySkew = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
+            Set<Table.Cell<Long, Long, Map<Long, Long>>> mapCells = partitionReplicasInfoMaps.getOrDefault(medium, HashBasedTable.create()).cellSet();
+            for (Table.Cell<Long, Long, Map<Long, Long>> cell : mapCells) {
+                Map<Long, Long> countMap = cell.getValue();
+                Preconditions.checkNotNull(countMap);
+                PartitionBalanceInfo pbi = new PartitionBalanceInfo(cell.getRowKey(), cell.getColumnKey());
+                for (Map.Entry<Long, Long> entry : countMap.entrySet()) {
+                    Long beID = entry.getKey();
+                    Long replicaCount = entry.getValue();
+                    pbi.beByReplicaCount.put(replicaCount, beID);
+                }
+                // beByReplicaCount values are natural ordering
+                long min_count = pbi.beByReplicaCount.keySet().first();

Review comment:
       ```suggestion
                   long minCount = pbi.beByReplicaCount.keySet().first();
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java
##########
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo;
+import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo;
+import org.apache.doris.common.Pair;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * A two-dimensional greedy rebalancing algorithm. The two dims are cluster and partition. It'll generate multiple `PartitionMove`,
+ * only decide which partition to move, fromBe, toBe. The next step is to select a tablet to move.
+ *
+ * From among moves that decrease the skew of a most skewed partition, it prefers ones that reduce the skew of the cluster.
+ * A cluster is considered balanced when the skew of every partition is <= 1 and the skew of the cluster is <= 1.
+ * The skew of the cluster is defined as the difference between the maximum total replica count over all bes and the
+ * minimum total replica count over all bes.
+ *
+ * This class is modified from kudu TwoDimensionalGreedyAlgo.
+ */
+public class TwoDimensionalGreedyRebalanceAlgo {
+    private static final Logger LOG = LogManager.getLogger(TwoDimensionalGreedyRebalanceAlgo.class);
+
+    private final EqualSkewOption equalSkewOption;
+    private static final Random rand = new Random(System.currentTimeMillis());
+
+    public static class PartitionMove {
+        Long partitionId;
+        Long indexId;
+        Long fromBe;
+        Long toBe;
+
+        PartitionMove(Long p, Long i, Long f, Long t) {
+            this.partitionId = p;
+            this.indexId = i;
+            this.fromBe = f;
+            this.toBe = t;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            PartitionMove that = (PartitionMove) o;
+            return Objects.equal(partitionId, that.partitionId) &&
+                    Objects.equal(indexId, that.indexId) &&
+                    Objects.equal(fromBe, that.fromBe) &&
+                    Objects.equal(toBe, that.toBe);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(partitionId, indexId, fromBe, toBe);
+        }
+
+        @Override
+        public String toString() {
+            return "ReplicaMove{" +
+                    "pid=" + partitionId + "-" + indexId +
+                    ", from=" + fromBe +
+                    ", to=" + toBe +
+                    '}';
+        }
+    }
+
+    public enum EqualSkewOption {
+        // generally only be used on unit test
+        PICK_FIRST,
+        PICK_RANDOM
+    }
+
+    public enum ExtremumType {
+        MAX,
+        MIN
+    }
+
+    public static class IntersectionResult {
+        Long replicaCountPartition;
+        Long replicaCountTotal;
+        List<Long> beWithExtremumCount;
+        List<Long> intersection;
+    }
+
+    TwoDimensionalGreedyRebalanceAlgo() {
+        this(EqualSkewOption.PICK_RANDOM);
+    }
+
+    TwoDimensionalGreedyRebalanceAlgo(EqualSkewOption equalSkewOption) {
+        this.equalSkewOption = equalSkewOption;
+    }
+
+    // maxMovesNum: Value of '0' is a shortcut for 'the possible maximum'.
+    // May modify the ClusterBalanceInfo
+    public List<PartitionMove> getNextMoves(ClusterBalanceInfo info, int maxMovesNum) {
+        Preconditions.checkArgument(maxMovesNum >= 0);
+        if (maxMovesNum == 0) {
+            maxMovesNum = Integer.MAX_VALUE;
+        }
+
+        if (info.partitionInfoBySkew.isEmpty()) {
+            // Check for the consistency of the 'ClusterBalanceInfo' parameter: if no information is given on
+            // the partition skew, partition count for all the be should be 0.
+            // Keys are ordered by the natural ordering, so we can get the last(max) key to know if all keys are 0.
+            NavigableSet<Long> keySet = info.beByTotalReplicaCount.keySet();
+            LOG.debug(keySet);
+            Preconditions.checkState(keySet.isEmpty() || keySet.last() == 0L,
+                    "non-zero replica count on be while no partition skew information in skewMap");
+            // Nothing to balance: cluster is empty.
+            return Lists.newArrayList();
+        }
+
+        List<PartitionMove> moves = Lists.newArrayList();
+        for (int i = 0; i < maxMovesNum; ++i) {
+            PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew);
+            if (move == null || !(applyMove(move, info.beByTotalReplicaCount, info.partitionInfoBySkew))) {
+                // 1. No replicas to move.
+                // 2. Apply to info failed, it's useless to get next move from the same info.
+                break;
+            }
+            moves.add(move);
+        }
+
+        return moves;
+    }
+
+    private PartitionMove getNextMove(TreeMultimap<Long, Long> beByTotalReplicaCount,
+                                      TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
+        PartitionMove move = null;
+        if (skewMap.isEmpty() || beByTotalReplicaCount.isEmpty()) {
+            return null;
+        }
+        long maxPartitionSkew = skewMap.keySet().last();
+        long maxBeSkew = beByTotalReplicaCount.keySet().last() - beByTotalReplicaCount.keySet().first();
+
+        // 1. Every partition is balanced(maxPartitionSkew<=1) and any move will unbalance a partition, so there
+        // is no potential for the greedy algorithm to balance the cluster.
+        // 2. Every partition is balanced(maxPartitionSkew<=1) and the cluster as a whole is balanced(maxBeSkew<=1).
+        if (maxPartitionSkew == 0L || (maxPartitionSkew <= 1L && maxBeSkew <= 1L)) {

Review comment:
       Is `1` too strict here?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
##########
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.TreeMultimap;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/*
+ * PartitionRebalancer will decrease the skew of partitions. The skew of the partition is defined as the difference
+ * between the maximum replica count of the partition over all bes and the minimum replica count over all bes.
+ * Only consider about the replica count for each partition, never consider the replica size(disk usage).
+ *
+ * We use TwoDimensionalGreedyRebalanceAlgo to get partition moves(one PartitionMove is <partition id, from be, to be>).
+ * It prefers a move that reduce the skew of the cluster when we want to rebalance a max skew partition.
+ *
+ * selectAlternativeTabletsForCluster() must set the tablet id, so we need to select tablet for each move in this phase
+ * (as TabletMove).
+ */
+public class PartitionRebalancer extends Rebalancer {
+    private static final Logger LOG = LogManager.getLogger(PartitionRebalancer.class);
+
+    private final TwoDimensionalGreedyRebalanceAlgo algo = new TwoDimensionalGreedyRebalanceAlgo();
+
+    private final MovesCacheMap movesCacheMap = new MovesCacheMap();
+
+    private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
+    private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
+
+    public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) {
+        super(infoService, invertedIndex);
+    }
+
+    @Override
+    protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
+            String clusterName, ClusterLoadStatistic clusterStat, TStorageMedium medium) {
+        MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(clusterName, medium);
+        Preconditions.checkNotNull(movesInProgress, "clusterStat is got from statisticMap, movesCacheMap should have the same entry");
+
+        // Iterating through Cache.asMap().values() does not reset access time for the entries you retrieve.
+        List<TabletMove> movesInProgressList = movesInProgress.get().asMap().values()
+                .stream().map(p -> p.first).collect(Collectors.toList());
+        List<Long> toDeleteKeys = Lists.newArrayList();
+
+        // The problematic movements will be found in buildClusterInfo(), so here is a simply move completion check
+        // of moves which have valid ToDeleteReplica.
+        List<TabletMove> movesNeedCheck = movesInProgress.get().asMap().values()
+                .stream().filter(p -> p.second != -1L).map(p -> p.first).collect(Collectors.toList());
+        checkMovesCompleted(movesNeedCheck, toDeleteKeys);
+
+        ClusterBalanceInfo clusterBalanceInfo = new ClusterBalanceInfo();
+        // We should assume the in-progress moves have been succeeded to avoid producing the same moves.
+        // Apply in-progress moves to current cluster stats, use TwoDimensionalGreedyAlgo.ApplyMove for simplicity.
+        if (!buildClusterInfo(clusterStat, medium, movesInProgressList, clusterBalanceInfo, toDeleteKeys)) {
+            return Lists.newArrayList();
+        }
+
+        // Just delete the completed or problematic moves
+        if (!toDeleteKeys.isEmpty()) {
+            movesInProgress.get().invalidateAll(toDeleteKeys);
+            movesInProgressList = movesInProgressList.stream()
+                    .filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList());
+        }
+
+        // The balancing tasks of other cluster or medium might have failed. We use the upper limit value
+        // `total num of in-progress moves` to avoid useless selections.
+        if (movesCacheMap.size() > Config.max_balancing_tablets) {
+            LOG.debug("Total in-progress moves > {}", Config.max_balancing_tablets);
+            return Lists.newArrayList();
+        }
+
+        NavigableSet<Long> skews = clusterBalanceInfo.partitionInfoBySkew.keySet();
+        LOG.debug("Cluster {}-{}: peek max skew {}, assume {} in-progress moves are succeeded {}", clusterName, medium,
+                skews.isEmpty() ? 0 : skews.last(), movesInProgressList.size(), movesInProgressList);
+
+        List<TwoDimensionalGreedyRebalanceAlgo.PartitionMove> moves = algo.getNextMoves(clusterBalanceInfo, Config.partition_rebalance_max_moves_num_per_selection);
+
+        List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
+        List<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toList());
+        for (TwoDimensionalGreedyRebalanceAlgo.PartitionMove move : moves) {
+            // Find all tablets of the specified partition that would have a replica at the source be,
+            // but would not have a replica at the destination be. That is to satisfy the restriction
+            // of having no more than one replica of the same tablet per be.
+            List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.fromBe, medium);
+            List<Long> invalidIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium);
+            tabletIds.removeAll(invalidIds);
+            // In-progress tablets can't be the candidate too.
+            tabletIds.removeAll(inProgressIds);
+
+            Map<Long, TabletMeta> tabletCandidates = Maps.newHashMap();
+            for (long tabletId : tabletIds) {
+                TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
+                if (tabletMeta != null && tabletMeta.getPartitionId() == move.partitionId
+                        && tabletMeta.getIndexId() == move.indexId) {
+                    tabletCandidates.put(tabletId, tabletMeta);
+                }
+            }
+            LOG.debug("Find {} candidates for move {}", tabletCandidates.size(), move);
+            if (tabletCandidates.isEmpty()) {
+                continue;
+            }
+
+            // Random pick one candidate to create tabletSchedCtx
+            Random rand = new Random();
+            Object[] keys = tabletCandidates.keySet().toArray();
+            long pickedTabletId = (long) keys[rand.nextInt(keys.length)];
+            LOG.debug("Picked tablet id for move {}: {}", move, pickedTabletId);
+
+            TabletMeta tabletMeta = tabletCandidates.get(pickedTabletId);
+            TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, clusterName,
+                    tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
+                    tabletMeta.getIndexId(), pickedTabletId, System.currentTimeMillis());
+            // Balance task's priority is always LOW
+            tabletCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
+            alternativeTablets.add(tabletCtx);
+            // Pair<Move, ToDeleteReplicaId>, ToDeleteReplicaId should be -1L before scheduled successfully
+            movesInProgress.get().put(pickedTabletId, new Pair<>(new TabletMove(pickedTabletId, move.fromBe, move.toBe), -1L));
+            counterBalanceMoveCreated.incrementAndGet();
+            // Synchronize with movesInProgress
+            inProgressIds.add(pickedTabletId);
+        }
+
+        if (moves.isEmpty()) {
+            // Balanced cluster should not print too much log messages, so we log it with level debug.
+            LOG.debug("Cluster {}-{}: cluster is balanced.", clusterName, medium);
+        } else {
+            LOG.info("Cluster {}-{}: get {} moves, actually select {} alternative tablets to move. Tablets detail: {}",
+                    clusterName, medium, moves.size(), alternativeTablets.size(),
+                    alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
+        }
+        return alternativeTablets;
+    }
+
+    private boolean buildClusterInfo(ClusterLoadStatistic clusterStat, TStorageMedium medium,
+                                     List<TabletMove> movesInProgress, ClusterBalanceInfo info, List<Long> toDeleteKeys) {
+        Preconditions.checkState(info.beByTotalReplicaCount.isEmpty() && info.partitionInfoBySkew.isEmpty(), "");
+
+        // If we wanna modify the PartitionBalanceInfo in info.beByTotalReplicaCount, deep-copy it
+        info.beByTotalReplicaCount.putAll(clusterStat.getBeByTotalReplicaMap(medium));
+        info.partitionInfoBySkew.putAll(clusterStat.getSkewMap(medium));
+
+        // Skip the toDeleteKeys
+        List<TabletMove> filteredMoves = movesInProgress.stream().filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList());
+
+        for (TabletMove move : filteredMoves) {
+            TabletMeta meta = invertedIndex.getTabletMeta(move.tabletId);
+            if (meta == null) {
+                // Move's tablet is invalid, need delete it
+                toDeleteKeys.add(move.tabletId);
+                continue;
+            }
+
+            TwoDimensionalGreedyRebalanceAlgo.PartitionMove partitionMove = new TwoDimensionalGreedyRebalanceAlgo.PartitionMove(meta.getPartitionId(), meta.getIndexId(), move.fromBe, move.toBe);
+            boolean st = TwoDimensionalGreedyRebalanceAlgo.applyMove(partitionMove, info.beByTotalReplicaCount, info.partitionInfoBySkew);
+            if (!st) {
+                // Can't apply this move, mark it failed, continue to apply the next.
+                toDeleteKeys.add(move.tabletId);
+            }
+        }
+        return true;
+    }
+
+    private void checkMovesCompleted(List<TabletMove> moves, List<Long> toDeleteKeys) {
+        boolean move_is_complete;

Review comment:
       ```suggestion
           boolean isMoveComplete;
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
##########
@@ -602,5 +604,96 @@ public void clear() {
     public Map<Long, Long> getReplicaToTabletMap() {
         return replicaToTabletMap;
     }
+
+    // Only build from available bes, exclude colocate tables
+    public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartitionInfoBySkew(List<Long> availableBeIds) {
+        readLock();
+
+        // 1. gen <partitionId-indexId, <beId, replicaCount>>
+        // for each replica(all tablets):
+        //      find beId, then replicaCount++
+        Map<TStorageMedium, Table<Long, Long, Map<Long, Long>>> partitionReplicasInfoMaps = Maps.newHashMap();
+        for (TStorageMedium medium : TStorageMedium.values()) {
+            partitionReplicasInfoMaps.put(medium, HashBasedTable.create());
+        }
+        try {
+            // Changes to the returned set will update the underlying table
+            // tablet id -> (backend id -> replica)
+            Set<Table.Cell<Long, Long, Replica>> cells = replicaMetaTable.cellSet();
+            for (Table.Cell<Long, Long, Replica> cell : cells) {
+                Long tabletId = cell.getRowKey();
+                Long beId = cell.getColumnKey();
+                try {
+                    Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId);
+                    TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
+                    Preconditions.checkNotNull(tabletMeta, "invalid tablet " + tabletId);
+                    Preconditions.checkState(!Catalog.getCurrentColocateIndex().isColocateTable(tabletMeta.getTableId()),
+                            "should not be the colocate table");
+
+                    TStorageMedium medium = tabletMeta.getStorageMedium();
+                    Table<Long, Long, Map<Long, Long>> partitionReplicasInfo = partitionReplicasInfoMaps.get(medium);
+                    Map<Long, Long> countMap = partitionReplicasInfo.get(tabletMeta.getPartitionId(), tabletMeta.getIndexId());
+                    if (countMap == null) {
+                        // If one be doesn't have any replica of one partition, it should be counted too.
+                        countMap = availableBeIds.stream().collect(Collectors.toMap(i -> i, i -> 0L));
+                    }
+
+                    Long count = countMap.get(beId);
+                    countMap.put(beId, count + 1L);
+                    partitionReplicasInfo.put(tabletMeta.getPartitionId(), tabletMeta.getIndexId(), countMap);
+                    partitionReplicasInfoMaps.put(medium, partitionReplicasInfo);
+                } catch (IllegalStateException | NullPointerException e) {
+                    // If the tablet or be has some problem, don't count in
+                    LOG.debug(e.getMessage());
+                }
+            }
+        } finally {
+            readUnlock();
+        }
+
+        // 2. Populate ClusterBalanceInfo::table_info_by_skew
+        // for each PartitionId-MaterializedIndex:
+        //      for each beId: record max_count, min_count(replicaCount)
+        //      put <max_count-min_count, TableBalanceInfo> to table_info_by_skew
+        Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> skewMaps = Maps.newHashMap();
+        for (TStorageMedium medium : TStorageMedium.values()) {
+            TreeMultimap<Long, PartitionBalanceInfo> partitionInfoBySkew = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
+            Set<Table.Cell<Long, Long, Map<Long, Long>>> mapCells = partitionReplicasInfoMaps.getOrDefault(medium, HashBasedTable.create()).cellSet();
+            for (Table.Cell<Long, Long, Map<Long, Long>> cell : mapCells) {
+                Map<Long, Long> countMap = cell.getValue();
+                Preconditions.checkNotNull(countMap);
+                PartitionBalanceInfo pbi = new PartitionBalanceInfo(cell.getRowKey(), cell.getColumnKey());
+                for (Map.Entry<Long, Long> entry : countMap.entrySet()) {
+                    Long beID = entry.getKey();
+                    Long replicaCount = entry.getValue();
+                    pbi.beByReplicaCount.put(replicaCount, beID);
+                }
+                // beByReplicaCount values are natural ordering
+                long min_count = pbi.beByReplicaCount.keySet().first();
+                long max_count = pbi.beByReplicaCount.keySet().last();

Review comment:
       maxCount

##########
File path: fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java
##########
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo;
+import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo;
+import org.apache.doris.common.Pair;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * A two-dimensional greedy rebalancing algorithm. The two dims are cluster and partition. It'll generate multiple `PartitionMove`,
+ * only decide which partition to move, fromBe, toBe. The next step is to select a tablet to move.
+ *
+ * From among moves that decrease the skew of a most skewed partition, it prefers ones that reduce the skew of the cluster.
+ * A cluster is considered balanced when the skew of every partition is <= 1 and the skew of the cluster is <= 1.
+ * The skew of the cluster is defined as the difference between the maximum total replica count over all bes and the
+ * minimum total replica count over all bes.
+ *
+ * This class is modified from kudu TwoDimensionalGreedyAlgo.
+ */
+public class TwoDimensionalGreedyRebalanceAlgo {
+    private static final Logger LOG = LogManager.getLogger(TwoDimensionalGreedyRebalanceAlgo.class);
+
+    private final EqualSkewOption equalSkewOption;
+    private static final Random rand = new Random(System.currentTimeMillis());
+
+    public static class PartitionMove {
+        Long partitionId;
+        Long indexId;
+        Long fromBe;
+        Long toBe;
+
+        PartitionMove(Long p, Long i, Long f, Long t) {
+            this.partitionId = p;
+            this.indexId = i;
+            this.fromBe = f;
+            this.toBe = t;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            PartitionMove that = (PartitionMove) o;
+            return Objects.equal(partitionId, that.partitionId) &&
+                    Objects.equal(indexId, that.indexId) &&
+                    Objects.equal(fromBe, that.fromBe) &&
+                    Objects.equal(toBe, that.toBe);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(partitionId, indexId, fromBe, toBe);
+        }
+
+        @Override
+        public String toString() {
+            return "ReplicaMove{" +
+                    "pid=" + partitionId + "-" + indexId +
+                    ", from=" + fromBe +
+                    ", to=" + toBe +
+                    '}';
+        }
+    }
+
+    public enum EqualSkewOption {
+        // generally only be used on unit test
+        PICK_FIRST,
+        PICK_RANDOM
+    }
+
+    public enum ExtremumType {
+        MAX,
+        MIN
+    }
+
+    public static class IntersectionResult {
+        Long replicaCountPartition;
+        Long replicaCountTotal;
+        List<Long> beWithExtremumCount;
+        List<Long> intersection;
+    }
+
+    TwoDimensionalGreedyRebalanceAlgo() {
+        this(EqualSkewOption.PICK_RANDOM);
+    }
+
+    TwoDimensionalGreedyRebalanceAlgo(EqualSkewOption equalSkewOption) {
+        this.equalSkewOption = equalSkewOption;
+    }
+
+    // maxMovesNum: Value of '0' is a shortcut for 'the possible maximum'.
+    // May modify the ClusterBalanceInfo
+    public List<PartitionMove> getNextMoves(ClusterBalanceInfo info, int maxMovesNum) {
+        Preconditions.checkArgument(maxMovesNum >= 0);
+        if (maxMovesNum == 0) {
+            maxMovesNum = Integer.MAX_VALUE;
+        }
+
+        if (info.partitionInfoBySkew.isEmpty()) {
+            // Check for the consistency of the 'ClusterBalanceInfo' parameter: if no information is given on
+            // the partition skew, partition count for all the be should be 0.
+            // Keys are ordered by the natural ordering, so we can get the last(max) key to know if all keys are 0.
+            NavigableSet<Long> keySet = info.beByTotalReplicaCount.keySet();
+            LOG.debug(keySet);
+            Preconditions.checkState(keySet.isEmpty() || keySet.last() == 0L,
+                    "non-zero replica count on be while no partition skew information in skewMap");
+            // Nothing to balance: cluster is empty.
+            return Lists.newArrayList();
+        }
+
+        List<PartitionMove> moves = Lists.newArrayList();
+        for (int i = 0; i < maxMovesNum; ++i) {
+            PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew);
+            if (move == null || !(applyMove(move, info.beByTotalReplicaCount, info.partitionInfoBySkew))) {
+                // 1. No replicas to move.
+                // 2. Apply to info failed, it's useless to get next move from the same info.
+                break;
+            }
+            moves.add(move);
+        }
+
+        return moves;
+    }
+
+    private PartitionMove getNextMove(TreeMultimap<Long, Long> beByTotalReplicaCount,
+                                      TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
+        PartitionMove move = null;
+        if (skewMap.isEmpty() || beByTotalReplicaCount.isEmpty()) {
+            return null;
+        }
+        long maxPartitionSkew = skewMap.keySet().last();
+        long maxBeSkew = beByTotalReplicaCount.keySet().last() - beByTotalReplicaCount.keySet().first();
+
+        // 1. Every partition is balanced(maxPartitionSkew<=1) and any move will unbalance a partition, so there
+        // is no potential for the greedy algorithm to balance the cluster.
+        // 2. Every partition is balanced(maxPartitionSkew<=1) and the cluster as a whole is balanced(maxBeSkew<=1).
+        if (maxPartitionSkew == 0L || (maxPartitionSkew <= 1L && maxBeSkew <= 1L)) {
+            return null;
+        }
+
+        // Among the partitions with maximum skew, attempt to pick a partition where there is
+        // a move that improves the partition skew and the cluster skew, if possible. If
+        // not, attempt to pick a move that improves the partition skew. If all partitions
+        // are balanced, attempt to pick a move that preserves partition balance and
+        // improves cluster skew.
+        NavigableSet<PartitionBalanceInfo> maxSet = skewMap.get(maxPartitionSkew);
+        for (PartitionBalanceInfo pbi : maxSet) {
+            Preconditions.checkArgument(!pbi.beByReplicaCount.isEmpty(), "no information on replicas of " +
+                    "partition " + pbi.partitionId + "-" + pbi.indexId);
+
+            Long minReplicaCount = pbi.beByReplicaCount.keySet().first();
+            Long maxReplicaCount = pbi.beByReplicaCount.keySet().last();
+            LOG.info("balancing partition {}-{} with replica count skew {} (min_replica_count: {}, max_replica_count: {})",
+                    pbi.partitionId, pbi.indexId, maxPartitionSkew,
+                    minReplicaCount, maxReplicaCount);
+
+            // Compute the intersection of the bes most loaded for the table
+            // with the bes most loaded overall, and likewise for least loaded.
+            // These are our ideal candidates for moving from and to, respectively.
+            IntersectionResult maxLoaded = getIntersection(ExtremumType.MAX, pbi.beByReplicaCount, beByTotalReplicaCount);
+            IntersectionResult minLoaded = getIntersection(ExtremumType.MIN, pbi.beByReplicaCount, beByTotalReplicaCount);
+            LOG.info("partition-wise: min_count: {}, max_count: {}", minLoaded.replicaCountPartition, maxLoaded.replicaCountPartition);
+            LOG.info("cluster-wise: min_count: {}, max_count: {}", minLoaded.replicaCountTotal, maxLoaded.replicaCountTotal);
+            LOG.debug("min_loaded_intersection: {}, max_loaded_intersection: {}", minLoaded.intersection.toString(), maxLoaded.intersection.toString());
+
+            // Do not move replicas of a balanced table if the least (most) loaded
+            // servers overall do not intersect the servers hosting the least (most)
+            // replicas of the table. Moving a replica in that case might keep the
+            // cluster skew the same or make it worse while keeping the table balanced.
+            if ((maxLoaded.replicaCountPartition <= minLoaded.replicaCountPartition + 1)
+                    && (minLoaded.intersection.isEmpty() || maxLoaded.intersection.isEmpty())) {
+                continue;
+            }
+
+            Long minLoadedBe, maxLoadedBe;
+            if (equalSkewOption == EqualSkewOption.PICK_FIRST) {
+                // beWithExtremumCount lists & intersection lists are natural ordering
+                minLoadedBe = minLoaded.intersection.isEmpty() ? minLoaded.beWithExtremumCount.get(0) : minLoaded.intersection.get(0);
+                maxLoadedBe = maxLoaded.intersection.isEmpty() ? maxLoaded.beWithExtremumCount.get(0) : maxLoaded.intersection.get(0);
+            } else {
+                minLoadedBe = minLoaded.intersection.isEmpty() ? getRandomListElement(minLoaded.beWithExtremumCount)
+                        : getRandomListElement(minLoaded.intersection);
+                maxLoadedBe = maxLoaded.intersection.isEmpty() ? getRandomListElement(maxLoaded.beWithExtremumCount)
+                        : getRandomListElement(maxLoaded.intersection);
+            }
+
+            LOG.debug("min_loaded_be: {}, max_loaded_be: {}", minLoadedBe, maxLoadedBe);
+            if (minLoadedBe.equals(maxLoadedBe)) {

Review comment:
       If we use PICK_FIRST, in the case of fewer candidate sets, will there be a high probability of the same?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java
##########
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo;
+import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo;
+import org.apache.doris.common.Pair;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * A two-dimensional greedy rebalancing algorithm. The two dims are cluster and partition. It'll generate multiple `PartitionMove`,
+ * only decide which partition to move, fromBe, toBe. The next step is to select a tablet to move.
+ *
+ * From among moves that decrease the skew of a most skewed partition, it prefers ones that reduce the skew of the cluster.
+ * A cluster is considered balanced when the skew of every partition is <= 1 and the skew of the cluster is <= 1.
+ * The skew of the cluster is defined as the difference between the maximum total replica count over all bes and the
+ * minimum total replica count over all bes.
+ *
+ * This class is modified from kudu TwoDimensionalGreedyAlgo.
+ */
+public class TwoDimensionalGreedyRebalanceAlgo {
+    private static final Logger LOG = LogManager.getLogger(TwoDimensionalGreedyRebalanceAlgo.class);
+
+    private final EqualSkewOption equalSkewOption;
+    private static final Random rand = new Random(System.currentTimeMillis());
+
+    public static class PartitionMove {
+        Long partitionId;
+        Long indexId;
+        Long fromBe;
+        Long toBe;
+
+        PartitionMove(Long p, Long i, Long f, Long t) {
+            this.partitionId = p;
+            this.indexId = i;
+            this.fromBe = f;
+            this.toBe = t;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            PartitionMove that = (PartitionMove) o;
+            return Objects.equal(partitionId, that.partitionId) &&
+                    Objects.equal(indexId, that.indexId) &&
+                    Objects.equal(fromBe, that.fromBe) &&
+                    Objects.equal(toBe, that.toBe);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(partitionId, indexId, fromBe, toBe);
+        }
+
+        @Override
+        public String toString() {
+            return "ReplicaMove{" +
+                    "pid=" + partitionId + "-" + indexId +
+                    ", from=" + fromBe +
+                    ", to=" + toBe +
+                    '}';
+        }
+    }
+
+    public enum EqualSkewOption {
+        // generally only be used on unit test
+        PICK_FIRST,
+        PICK_RANDOM
+    }
+
+    public enum ExtremumType {
+        MAX,
+        MIN
+    }
+
+    public static class IntersectionResult {
+        Long replicaCountPartition;
+        Long replicaCountTotal;
+        List<Long> beWithExtremumCount;
+        List<Long> intersection;
+    }
+
+    TwoDimensionalGreedyRebalanceAlgo() {
+        this(EqualSkewOption.PICK_RANDOM);
+    }
+
+    TwoDimensionalGreedyRebalanceAlgo(EqualSkewOption equalSkewOption) {
+        this.equalSkewOption = equalSkewOption;
+    }
+
+    // maxMovesNum: Value of '0' is a shortcut for 'the possible maximum'.
+    // May modify the ClusterBalanceInfo
+    public List<PartitionMove> getNextMoves(ClusterBalanceInfo info, int maxMovesNum) {
+        Preconditions.checkArgument(maxMovesNum >= 0);
+        if (maxMovesNum == 0) {
+            maxMovesNum = Integer.MAX_VALUE;
+        }
+
+        if (info.partitionInfoBySkew.isEmpty()) {
+            // Check for the consistency of the 'ClusterBalanceInfo' parameter: if no information is given on
+            // the partition skew, partition count for all the be should be 0.
+            // Keys are ordered by the natural ordering, so we can get the last(max) key to know if all keys are 0.
+            NavigableSet<Long> keySet = info.beByTotalReplicaCount.keySet();
+            LOG.debug(keySet);
+            Preconditions.checkState(keySet.isEmpty() || keySet.last() == 0L,
+                    "non-zero replica count on be while no partition skew information in skewMap");
+            // Nothing to balance: cluster is empty.
+            return Lists.newArrayList();
+        }
+
+        List<PartitionMove> moves = Lists.newArrayList();
+        for (int i = 0; i < maxMovesNum; ++i) {
+            PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew);
+            if (move == null || !(applyMove(move, info.beByTotalReplicaCount, info.partitionInfoBySkew))) {
+                // 1. No replicas to move.
+                // 2. Apply to info failed, it's useless to get next move from the same info.
+                break;
+            }
+            moves.add(move);
+        }
+
+        return moves;
+    }
+
+    private PartitionMove getNextMove(TreeMultimap<Long, Long> beByTotalReplicaCount,
+                                      TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
+        PartitionMove move = null;
+        if (skewMap.isEmpty() || beByTotalReplicaCount.isEmpty()) {
+            return null;
+        }
+        long maxPartitionSkew = skewMap.keySet().last();
+        long maxBeSkew = beByTotalReplicaCount.keySet().last() - beByTotalReplicaCount.keySet().first();
+
+        // 1. Every partition is balanced(maxPartitionSkew<=1) and any move will unbalance a partition, so there
+        // is no potential for the greedy algorithm to balance the cluster.
+        // 2. Every partition is balanced(maxPartitionSkew<=1) and the cluster as a whole is balanced(maxBeSkew<=1).
+        if (maxPartitionSkew == 0L || (maxPartitionSkew <= 1L && maxBeSkew <= 1L)) {
+            return null;
+        }
+
+        // Among the partitions with maximum skew, attempt to pick a partition where there is
+        // a move that improves the partition skew and the cluster skew, if possible. If
+        // not, attempt to pick a move that improves the partition skew. If all partitions
+        // are balanced, attempt to pick a move that preserves partition balance and
+        // improves cluster skew.
+        NavigableSet<PartitionBalanceInfo> maxSet = skewMap.get(maxPartitionSkew);
+        for (PartitionBalanceInfo pbi : maxSet) {
+            Preconditions.checkArgument(!pbi.beByReplicaCount.isEmpty(), "no information on replicas of " +
+                    "partition " + pbi.partitionId + "-" + pbi.indexId);
+
+            Long minReplicaCount = pbi.beByReplicaCount.keySet().first();
+            Long maxReplicaCount = pbi.beByReplicaCount.keySet().last();
+            LOG.info("balancing partition {}-{} with replica count skew {} (min_replica_count: {}, max_replica_count: {})",
+                    pbi.partitionId, pbi.indexId, maxPartitionSkew,
+                    minReplicaCount, maxReplicaCount);
+
+            // Compute the intersection of the bes most loaded for the table
+            // with the bes most loaded overall, and likewise for least loaded.
+            // These are our ideal candidates for moving from and to, respectively.
+            IntersectionResult maxLoaded = getIntersection(ExtremumType.MAX, pbi.beByReplicaCount, beByTotalReplicaCount);
+            IntersectionResult minLoaded = getIntersection(ExtremumType.MIN, pbi.beByReplicaCount, beByTotalReplicaCount);
+            LOG.info("partition-wise: min_count: {}, max_count: {}", minLoaded.replicaCountPartition, maxLoaded.replicaCountPartition);

Review comment:
       debug

##########
File path: fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java
##########
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo;
+import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo;
+import org.apache.doris.common.Pair;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * A two-dimensional greedy rebalancing algorithm. The two dims are cluster and partition. It'll generate multiple `PartitionMove`,
+ * only decide which partition to move, fromBe, toBe. The next step is to select a tablet to move.
+ *
+ * From among moves that decrease the skew of a most skewed partition, it prefers ones that reduce the skew of the cluster.
+ * A cluster is considered balanced when the skew of every partition is <= 1 and the skew of the cluster is <= 1.
+ * The skew of the cluster is defined as the difference between the maximum total replica count over all bes and the
+ * minimum total replica count over all bes.
+ *
+ * This class is modified from kudu TwoDimensionalGreedyAlgo.
+ */
+public class TwoDimensionalGreedyRebalanceAlgo {
+    private static final Logger LOG = LogManager.getLogger(TwoDimensionalGreedyRebalanceAlgo.class);
+
+    private final EqualSkewOption equalSkewOption;
+    private static final Random rand = new Random(System.currentTimeMillis());
+
+    public static class PartitionMove {
+        Long partitionId;
+        Long indexId;
+        Long fromBe;
+        Long toBe;
+
+        PartitionMove(Long p, Long i, Long f, Long t) {
+            this.partitionId = p;
+            this.indexId = i;
+            this.fromBe = f;
+            this.toBe = t;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            PartitionMove that = (PartitionMove) o;
+            return Objects.equal(partitionId, that.partitionId) &&
+                    Objects.equal(indexId, that.indexId) &&
+                    Objects.equal(fromBe, that.fromBe) &&
+                    Objects.equal(toBe, that.toBe);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(partitionId, indexId, fromBe, toBe);
+        }
+
+        @Override
+        public String toString() {
+            return "ReplicaMove{" +
+                    "pid=" + partitionId + "-" + indexId +
+                    ", from=" + fromBe +
+                    ", to=" + toBe +
+                    '}';
+        }
+    }
+
+    public enum EqualSkewOption {
+        // generally only be used on unit test
+        PICK_FIRST,
+        PICK_RANDOM
+    }
+
+    public enum ExtremumType {
+        MAX,
+        MIN
+    }
+
+    public static class IntersectionResult {
+        Long replicaCountPartition;
+        Long replicaCountTotal;
+        List<Long> beWithExtremumCount;
+        List<Long> intersection;
+    }
+
+    TwoDimensionalGreedyRebalanceAlgo() {
+        this(EqualSkewOption.PICK_RANDOM);
+    }
+
+    TwoDimensionalGreedyRebalanceAlgo(EqualSkewOption equalSkewOption) {
+        this.equalSkewOption = equalSkewOption;
+    }
+
+    // maxMovesNum: Value of '0' is a shortcut for 'the possible maximum'.
+    // May modify the ClusterBalanceInfo
+    public List<PartitionMove> getNextMoves(ClusterBalanceInfo info, int maxMovesNum) {
+        Preconditions.checkArgument(maxMovesNum >= 0);
+        if (maxMovesNum == 0) {
+            maxMovesNum = Integer.MAX_VALUE;
+        }
+
+        if (info.partitionInfoBySkew.isEmpty()) {
+            // Check for the consistency of the 'ClusterBalanceInfo' parameter: if no information is given on
+            // the partition skew, partition count for all the be should be 0.
+            // Keys are ordered by the natural ordering, so we can get the last(max) key to know if all keys are 0.
+            NavigableSet<Long> keySet = info.beByTotalReplicaCount.keySet();
+            LOG.debug(keySet);
+            Preconditions.checkState(keySet.isEmpty() || keySet.last() == 0L,
+                    "non-zero replica count on be while no partition skew information in skewMap");
+            // Nothing to balance: cluster is empty.
+            return Lists.newArrayList();
+        }
+
+        List<PartitionMove> moves = Lists.newArrayList();
+        for (int i = 0; i < maxMovesNum; ++i) {
+            PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew);
+            if (move == null || !(applyMove(move, info.beByTotalReplicaCount, info.partitionInfoBySkew))) {
+                // 1. No replicas to move.
+                // 2. Apply to info failed, it's useless to get next move from the same info.
+                break;
+            }
+            moves.add(move);
+        }
+
+        return moves;
+    }
+
+    private PartitionMove getNextMove(TreeMultimap<Long, Long> beByTotalReplicaCount,
+                                      TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
+        PartitionMove move = null;
+        if (skewMap.isEmpty() || beByTotalReplicaCount.isEmpty()) {
+            return null;
+        }
+        long maxPartitionSkew = skewMap.keySet().last();
+        long maxBeSkew = beByTotalReplicaCount.keySet().last() - beByTotalReplicaCount.keySet().first();
+
+        // 1. Every partition is balanced(maxPartitionSkew<=1) and any move will unbalance a partition, so there
+        // is no potential for the greedy algorithm to balance the cluster.
+        // 2. Every partition is balanced(maxPartitionSkew<=1) and the cluster as a whole is balanced(maxBeSkew<=1).
+        if (maxPartitionSkew == 0L || (maxPartitionSkew <= 1L && maxBeSkew <= 1L)) {
+            return null;
+        }
+
+        // Among the partitions with maximum skew, attempt to pick a partition where there is
+        // a move that improves the partition skew and the cluster skew, if possible. If
+        // not, attempt to pick a move that improves the partition skew. If all partitions
+        // are balanced, attempt to pick a move that preserves partition balance and
+        // improves cluster skew.
+        NavigableSet<PartitionBalanceInfo> maxSet = skewMap.get(maxPartitionSkew);
+        for (PartitionBalanceInfo pbi : maxSet) {
+            Preconditions.checkArgument(!pbi.beByReplicaCount.isEmpty(), "no information on replicas of " +
+                    "partition " + pbi.partitionId + "-" + pbi.indexId);
+
+            Long minReplicaCount = pbi.beByReplicaCount.keySet().first();
+            Long maxReplicaCount = pbi.beByReplicaCount.keySet().last();
+            LOG.info("balancing partition {}-{} with replica count skew {} (min_replica_count: {}, max_replica_count: {})",

Review comment:
       debug is better?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
##########
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.TreeMultimap;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/*
+ * PartitionRebalancer will decrease the skew of partitions. The skew of the partition is defined as the difference
+ * between the maximum replica count of the partition over all bes and the minimum replica count over all bes.
+ * Only consider about the replica count for each partition, never consider the replica size(disk usage).
+ *
+ * We use TwoDimensionalGreedyRebalanceAlgo to get partition moves(one PartitionMove is <partition id, from be, to be>).
+ * It prefers a move that reduce the skew of the cluster when we want to rebalance a max skew partition.
+ *
+ * selectAlternativeTabletsForCluster() must set the tablet id, so we need to select tablet for each move in this phase
+ * (as TabletMove).
+ */
+public class PartitionRebalancer extends Rebalancer {
+    private static final Logger LOG = LogManager.getLogger(PartitionRebalancer.class);
+
+    private final TwoDimensionalGreedyRebalanceAlgo algo = new TwoDimensionalGreedyRebalanceAlgo();
+
+    private final MovesCacheMap movesCacheMap = new MovesCacheMap();
+
+    private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
+    private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
+
+    public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) {
+        super(infoService, invertedIndex);
+    }
+
+    @Override
+    protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
+            String clusterName, ClusterLoadStatistic clusterStat, TStorageMedium medium) {
+        MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(clusterName, medium);
+        Preconditions.checkNotNull(movesInProgress, "clusterStat is got from statisticMap, movesCacheMap should have the same entry");
+
+        // Iterating through Cache.asMap().values() does not reset access time for the entries you retrieve.
+        List<TabletMove> movesInProgressList = movesInProgress.get().asMap().values()
+                .stream().map(p -> p.first).collect(Collectors.toList());
+        List<Long> toDeleteKeys = Lists.newArrayList();
+
+        // The problematic movements will be found in buildClusterInfo(), so here is a simply move completion check
+        // of moves which have valid ToDeleteReplica.
+        List<TabletMove> movesNeedCheck = movesInProgress.get().asMap().values()
+                .stream().filter(p -> p.second != -1L).map(p -> p.first).collect(Collectors.toList());
+        checkMovesCompleted(movesNeedCheck, toDeleteKeys);
+
+        ClusterBalanceInfo clusterBalanceInfo = new ClusterBalanceInfo();
+        // We should assume the in-progress moves have been succeeded to avoid producing the same moves.
+        // Apply in-progress moves to current cluster stats, use TwoDimensionalGreedyAlgo.ApplyMove for simplicity.
+        if (!buildClusterInfo(clusterStat, medium, movesInProgressList, clusterBalanceInfo, toDeleteKeys)) {
+            return Lists.newArrayList();
+        }
+
+        // Just delete the completed or problematic moves
+        if (!toDeleteKeys.isEmpty()) {
+            movesInProgress.get().invalidateAll(toDeleteKeys);
+            movesInProgressList = movesInProgressList.stream()
+                    .filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList());
+        }
+
+        // The balancing tasks of other cluster or medium might have failed. We use the upper limit value
+        // `total num of in-progress moves` to avoid useless selections.
+        if (movesCacheMap.size() > Config.max_balancing_tablets) {
+            LOG.debug("Total in-progress moves > {}", Config.max_balancing_tablets);
+            return Lists.newArrayList();
+        }
+
+        NavigableSet<Long> skews = clusterBalanceInfo.partitionInfoBySkew.keySet();
+        LOG.debug("Cluster {}-{}: peek max skew {}, assume {} in-progress moves are succeeded {}", clusterName, medium,
+                skews.isEmpty() ? 0 : skews.last(), movesInProgressList.size(), movesInProgressList);
+
+        List<TwoDimensionalGreedyRebalanceAlgo.PartitionMove> moves = algo.getNextMoves(clusterBalanceInfo, Config.partition_rebalance_max_moves_num_per_selection);
+
+        List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
+        List<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toList());
+        for (TwoDimensionalGreedyRebalanceAlgo.PartitionMove move : moves) {
+            // Find all tablets of the specified partition that would have a replica at the source be,
+            // but would not have a replica at the destination be. That is to satisfy the restriction
+            // of having no more than one replica of the same tablet per be.
+            List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.fromBe, medium);
+            List<Long> invalidIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium);
+            tabletIds.removeAll(invalidIds);
+            // In-progress tablets can't be the candidate too.
+            tabletIds.removeAll(inProgressIds);
+
+            Map<Long, TabletMeta> tabletCandidates = Maps.newHashMap();
+            for (long tabletId : tabletIds) {
+                TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
+                if (tabletMeta != null && tabletMeta.getPartitionId() == move.partitionId
+                        && tabletMeta.getIndexId() == move.indexId) {
+                    tabletCandidates.put(tabletId, tabletMeta);
+                }
+            }
+            LOG.debug("Find {} candidates for move {}", tabletCandidates.size(), move);
+            if (tabletCandidates.isEmpty()) {
+                continue;
+            }
+
+            // Random pick one candidate to create tabletSchedCtx
+            Random rand = new Random();
+            Object[] keys = tabletCandidates.keySet().toArray();
+            long pickedTabletId = (long) keys[rand.nextInt(keys.length)];
+            LOG.debug("Picked tablet id for move {}: {}", move, pickedTabletId);
+
+            TabletMeta tabletMeta = tabletCandidates.get(pickedTabletId);
+            TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, clusterName,
+                    tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
+                    tabletMeta.getIndexId(), pickedTabletId, System.currentTimeMillis());
+            // Balance task's priority is always LOW
+            tabletCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
+            alternativeTablets.add(tabletCtx);
+            // Pair<Move, ToDeleteReplicaId>, ToDeleteReplicaId should be -1L before scheduled successfully
+            movesInProgress.get().put(pickedTabletId, new Pair<>(new TabletMove(pickedTabletId, move.fromBe, move.toBe), -1L));
+            counterBalanceMoveCreated.incrementAndGet();
+            // Synchronize with movesInProgress
+            inProgressIds.add(pickedTabletId);
+        }
+
+        if (moves.isEmpty()) {
+            // Balanced cluster should not print too much log messages, so we log it with level debug.
+            LOG.debug("Cluster {}-{}: cluster is balanced.", clusterName, medium);
+        } else {
+            LOG.info("Cluster {}-{}: get {} moves, actually select {} alternative tablets to move. Tablets detail: {}",
+                    clusterName, medium, moves.size(), alternativeTablets.size(),
+                    alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
+        }
+        return alternativeTablets;
+    }
+
+    private boolean buildClusterInfo(ClusterLoadStatistic clusterStat, TStorageMedium medium,
+                                     List<TabletMove> movesInProgress, ClusterBalanceInfo info, List<Long> toDeleteKeys) {
+        Preconditions.checkState(info.beByTotalReplicaCount.isEmpty() && info.partitionInfoBySkew.isEmpty(), "");
+
+        // If we wanna modify the PartitionBalanceInfo in info.beByTotalReplicaCount, deep-copy it
+        info.beByTotalReplicaCount.putAll(clusterStat.getBeByTotalReplicaMap(medium));
+        info.partitionInfoBySkew.putAll(clusterStat.getSkewMap(medium));
+
+        // Skip the toDeleteKeys
+        List<TabletMove> filteredMoves = movesInProgress.stream().filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList());
+
+        for (TabletMove move : filteredMoves) {
+            TabletMeta meta = invertedIndex.getTabletMeta(move.tabletId);
+            if (meta == null) {
+                // Move's tablet is invalid, need delete it
+                toDeleteKeys.add(move.tabletId);
+                continue;
+            }
+
+            TwoDimensionalGreedyRebalanceAlgo.PartitionMove partitionMove = new TwoDimensionalGreedyRebalanceAlgo.PartitionMove(meta.getPartitionId(), meta.getIndexId(), move.fromBe, move.toBe);
+            boolean st = TwoDimensionalGreedyRebalanceAlgo.applyMove(partitionMove, info.beByTotalReplicaCount, info.partitionInfoBySkew);
+            if (!st) {
+                // Can't apply this move, mark it failed, continue to apply the next.
+                toDeleteKeys.add(move.tabletId);
+            }
+        }
+        return true;
+    }
+
+    private void checkMovesCompleted(List<TabletMove> moves, List<Long> toDeleteKeys) {
+        boolean move_is_complete;
+        for (TabletMove move : moves) {
+            move_is_complete = checkMoveCompleted(move);
+            // If the move was completed, remove it
+            if (move_is_complete) {
+                toDeleteKeys.add(move.tabletId);
+                LOG.debug("Move {} is completed. The cur dist: {}", move,
+                        invertedIndex.getReplicasByTabletId(move.tabletId).stream().map(Replica::getBackendId).collect(Collectors.toList()));
+                counterBalanceMoveSucceeded.incrementAndGet();
+            }
+        }
+    }
+
+    // Move completed: fromBe doesn't have a replica and toBe has a replica
+    private boolean checkMoveCompleted(TabletMove move) {

Review comment:
       The new replica will be added to the tablet at the beginning of clone task with state CLONE.
   And the "fromBE" replica may be dropped for other reason such as "data broken on BE".
   
   So are you sure `!bes.contains(move.fromBe) && bes.contains(move.toBe)` can be treated as move complete?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org