You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/12/31 01:41:47 UTC

[incubator-doris] branch master updated: [Rebalancer] support partition rebalancer (#5010)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d7a584a  [Rebalancer] support partition rebalancer (#5010)
d7a584a is described below

commit d7a584ac59481b9281b6bbab04d0fc27de6e9851
Author: HuangWei <hu...@apache.org>
AuthorDate: Thu Dec 31 09:41:38 2020 +0800

    [Rebalancer] support partition rebalancer (#5010)
    
    RebalancerType could be configured via Config.rebalancer_type(BeLoad, Partition).
    PartitionRebalancer is based on TwoDimensionalGreedyAlgo.
    Two dims of Doris should be cluster & partition. And we only consider about the replica count,
    do not consider replica size.
    #4845 for further details.
---
 .../operation/tablet-repair-and-balance.md         |  23 +-
 .../operation/tablet-repair-and-balance.md         |  21 +-
 .../java/org/apache/doris/catalog/Catalog.java     |   2 +-
 .../apache/doris/catalog/TabletInvertedIndex.java  | 133 ++++++--
 .../apache/doris/clone/ClusterLoadStatistic.java   |  49 ++-
 .../java/org/apache/doris/clone/MovesCacheMap.java | 102 +++++++
 .../apache/doris/clone/PartitionRebalancer.java    | 337 +++++++++++++++++++++
 .../java/org/apache/doris/clone/Rebalancer.java    |   4 +-
 .../org/apache/doris/clone/TabletScheduler.java    |  10 +-
 .../clone/TwoDimensionalGreedyRebalanceAlgo.java   | 329 ++++++++++++++++++++
 .../main/java/org/apache/doris/common/Config.java  |  12 +
 .../java/org/apache/doris/clone/RebalanceTest.java | 301 ++++++++++++++++++
 .../org/apache/doris/clone/RebalancerTestUtil.java |  89 ++++++
 .../TwoDimensionalGreedyRebalanceAlgoTest.java     | 299 ++++++++++++++++++
 14 files changed, 1668 insertions(+), 43 deletions(-)

diff --git a/docs/en/administrator-guide/operation/tablet-repair-and-balance.md b/docs/en/administrator-guide/operation/tablet-repair-and-balance.md
index 07d86d0..73d3e50 100644
--- a/docs/en/administrator-guide/operation/tablet-repair-and-balance.md
+++ b/docs/en/administrator-guide/operation/tablet-repair-and-balance.md
@@ -214,7 +214,11 @@ At the same time, in order to ensure the weight of the initial priority, we stip
 
 ## Duplicate Equilibrium
 
-Doris automatically balances replicas within the cluster. The main idea of balancing is to create a replica of some fragments on low-load nodes, and then delete the replicas of these fragments on high-load nodes. At the same time, because of the existence of different storage media, there may or may not exist one or two storage media on different BE nodes in the same cluster. We require that fragments of storage medium A be stored in storage medium A as far as possible after equalization [...]
+Doris automatically balances replicas within the cluster. Currently supports two rebalance strategies, BeLoad and Partition. BeLoad rebalance will consider about the disk usage and replica count for each BE. Partition rebalance just aim at replica count for each partition, this helps to avoid hot spots. If you want high read/write performance, you may need this. Note that Partition rebalance do not consider about the disk usage, pay more attention to it when you are using Partition rebal [...]
+
+### BeLoad
+
+The main idea of balancing is to create a replica of some fragments on low-load nodes, and then delete the replicas of these fragments on high-load nodes. At the same time, because of the existence of different storage media, there may or may not exist one or two storage media on different BE nodes in the same cluster. We require that fragments of storage medium A be stored in storage medium A as far as possible after equalization. So we divide the BE nodes of the cluster according to th [...]
 
 Similarly, replica balancing ensures that a copy of the same table will not be deployed on the BE of the same host.
 
@@ -228,7 +232,22 @@ Disk usage and number of copies have a weight factor, which is **capacityCoeffic
 
 The weight coefficient ensures that when disk utilization is too high, the backend load score will be higher to ensure that the BE load is reduced as soon as possible.
 
-Tablet Scheduler updates CLS every 1 minute.
+Tablet Scheduler updates CLS every 20 seconds.
+
+### Partition
+
+The main idea of `partition rebalancing` is to decrease the skew of partitions. The skew of the partition is defined as the difference between the maximum replica count of the partition over all bes and the minimum replica count over all bes. 
+
+So we only consider about the replica count, do not consider replica size(disk usage).
+To fewer moves, we use TwoDimensionalGreedyAlgo which two dims are cluster & partition. It prefers a move that reduce the skew of the cluster when we want to rebalance a max skew partition. 
+
+#### Skew Info
+
+The skew info is represented by `ClusterBalanceInfo`. `partitionInfoBySkew` is a multimap which key is the partition's skew, so we can get max skew partitions simply. `beByTotalReplicaCount` is a multimap which key is the total replica count of the backend.
+
+`ClusterBalanceInfo` is in CLS, updated every 20 seconds.
+
+When get more than one max skew partitions, we random select one partition to calculate the move.
 
 ### Equilibrium strategy
 
diff --git a/docs/zh-CN/administrator-guide/operation/tablet-repair-and-balance.md b/docs/zh-CN/administrator-guide/operation/tablet-repair-and-balance.md
index 82346cd..015c2d7 100644
--- a/docs/zh-CN/administrator-guide/operation/tablet-repair-and-balance.md
+++ b/docs/zh-CN/administrator-guide/operation/tablet-repair-and-balance.md
@@ -215,11 +215,15 @@ TabletScheduler 里等待被调度的分片会根据状态不同,赋予不同
 
 ## 副本均衡
 
-Doris 会自动进行集群内的副本均衡。均衡的主要思想,是对某些分片,先在低负载的节点上创建一个副本,然后再删除这些分片在高负载节点上的副本。同时,因为不同存储介质的存在,在同一个集群内的不同 BE 节点上,可能存在一种或两种存储介质。我们要求存储介质为 A 的分片在均衡后,尽量依然存储在存储介质 A 中。所以我们根据存储介质,对集群的 BE 节点进行划分。然后针对不同的存储介质的 BE 节点集合,进行负载均衡调度。
+Doris 会自动进行集群内的副本均衡。目前支持两种均衡策略,负载/分区。负载均衡适合需要兼顾节点磁盘使用率和节点副本数量的场景;而分区均衡会使每个分区的副本都均匀分布在各个节点,避免热点,适合对分区读写要求比较高的场景。但是,分区均衡不考虑磁盘使用率,使用分区均衡时需要注意磁盘的使用情况。策略只能在fe启动前配置,不支持运行时切换。
+
+### 负载均衡
+
+负载均衡的主要思想是,对某些分片,先在低负载的节点上创建一个副本,然后再删除这些分片在高负载节点上的副本。同时,因为不同存储介质的存在,在同一个集群内的不同 BE 节点上,可能存在一种或两种存储介质。我们要求存储介质为 A 的分片在均衡后,尽量依然存储在存储介质 A 中。所以我们根据存储介质,对集群的 BE 节点进行划分。然后针对不同的存储介质的 BE 节点集合,进行负载均衡调度。
 
 同样,副本均衡会保证不会将同一个 Tablet 的副本部署在同一个 host 的 BE 上。
 
-### BE 节点负载
+#### BE 节点负载
 
 我们用 ClusterLoadStatistics(CLS)表示一个 cluster 中各个 Backend 的负载均衡情况。TabletScheduler 根据这个统计值,来触发集群均衡。我们当前通过 **磁盘使用率** 和 **副本数量** 两个指标,为每个BE计算一个 loadScore,作为 BE 的负载分数。分数越高,表示该 BE 的负载越重。
 
@@ -229,7 +233,18 @@ Doris 会自动进行集群内的副本均衡。均衡的主要思想,是对
 
 该权重系数保证当磁盘使用率过高时,该 Backend 的负载分数会更高,以保证尽快降低这个 BE 的负载。
 
-TabletScheduler 会每隔 1 分钟更新一次 CLS。
+TabletScheduler 会每隔 20s 更新一次 CLS。
+
+### 分区均衡
+
+分区均衡的主要思想是,将每个分区的在各个 Backend 上的 replica 数量差(即 partition skew),减少到最小。因此只考虑副本个数,不考虑磁盘使用率。
+为了尽量少的迁移次数,分区均衡使用二维贪心的策略,优先均衡partition skew最大的分区,均衡分区时会尽量选择,可以使整个 cluster 的在各个 Backend 上的 replica 数量差(即 cluster skew/total skew)减少的方向。
+
+#### skew 统计
+
+skew 统计信息由`ClusterBalanceInfo`表示,其中,`partitionInfoBySkew`以 partition skew 为key排序,便于找到max partition skew;`beByTotalReplicaCount`则是以 Backend 上的所有 replica 个数为key排序。`ClusterBalanceInfo`同样保持在CLS中, 同样 20s 更新一次。
+
+max partition skew 的分区可能有多个,采用随机的方式选择一个分区计算。
 
 ### 均衡策略
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 4d31304..7623b6f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -536,7 +536,7 @@ public class Catalog {
         this.metaContext.setThreadLocalInfo();
         
         this.stat = new TabletSchedulerStat();
-        this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat);
+        this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat, Config.tablet_rebalancer_type);
         this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);
 
         this.pendingLoadTaskScheduler = new MasterTaskExecutor("pending_load_task_scheduler", Config.async_load_task_pool_size,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 5f3f411..edad1e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -34,8 +34,10 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Table;
+import com.google.common.collect.TreeMultimap;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -66,10 +68,10 @@ public class TabletInvertedIndex {
 
     // tablet id -> tablet meta
     private Map<Long, TabletMeta> tabletMetaMap = Maps.newHashMap();
-    
+
     // replica id -> tablet id
     private Map<Long, Long> replicaToTabletMap = Maps.newHashMap();
-    
+
     /*
      *  we use this to save memory.
      *  we do not need create TabletMeta instance for each tablet,
@@ -80,7 +82,7 @@ public class TabletInvertedIndex {
      *  partition id -> (index id -> tablet meta)
      */
     private Table<Long, Long, TabletMeta> tabletMetaTable = HashBasedTable.create();
-    
+
     // tablet id -> (backend id -> replica)
     private Table<Long, Long, Replica> replicaMetaTable = HashBasedTable.create();
     // backing replica table, for visiting backend replicas faster.
@@ -112,7 +114,7 @@ public class TabletInvertedIndex {
                              ListMultimap<Long, Long> tabletDeleteFromMeta,
                              Set<Long> foundTabletsWithValidSchema,
                              Map<Long, TTabletInfo> foundTabletsWithInvalidSchema,
-                             ListMultimap<TStorageMedium, Long> tabletMigrationMap, 
+                             ListMultimap<TStorageMedium, Long> tabletMigrationMap,
                              Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
                              ListMultimap<Long, Long> transactionsToClear,
                              ListMultimap<Long, Long> tabletRecoveryMap,
@@ -149,7 +151,7 @@ public class TabletInvertedIndex {
                                     // need sync
                                     tabletSyncMap.put(tabletMeta.getDbId(), tabletId);
                                 }
-                                
+
                                 // check and set path
                                 // path info of replica is only saved in Master FE
                                 if (backendTabletInfo.isSetPathHash() &&
@@ -165,8 +167,8 @@ public class TabletInvertedIndex {
 
                                 if (needRecover(replica, tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
                                     LOG.warn("replica {} of tablet {} on backend {} need recovery. "
-                                            + "replica in FE: {}, report version {}-{}, report schema hash: {},"
-                                            + " is bad: {}, is version missing: {}",
+                                                    + "replica in FE: {}, report version {}-{}, report schema hash: {},"
+                                                    + " is bad: {}, is version missing: {}",
                                             replica.getId(), tabletId, backendId, replica,
                                             backendTabletInfo.getVersion(),
                                             backendTabletInfo.getVersionHash(),
@@ -195,7 +197,7 @@ public class TabletInvertedIndex {
                                         TransactionState transactionState = transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
                                         if (transactionState == null || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
                                             transactionsToClear.put(transactionId, tabletMeta.getPartitionId());
-                                            LOG.debug("transaction id [{}] is not valid any more, " 
+                                            LOG.debug("transaction id [{}] is not valid any more, "
                                                     + "clear it from backend [{}]", transactionId, backendId);
                                         } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
                                             TableCommitInfo tableCommitInfo = transactionState.getTableCommitInfo(tabletMeta.getTableId());
@@ -207,13 +209,13 @@ public class TabletInvertedIndex {
                                                  * 2. FE received report and begin to assemble partitionCommitInfos.
                                                  * 3. At the same time, some of partitions have been dropped, so partitionCommitInfos does not contain these partitions.
                                                  * 4. So we will not able to get partitionCommitInfo here.
-                                                 * 
+                                                 *
                                                  * Just print a log to observe
                                                  */
                                                 LOG.info("failed to find partition commit info. table: {}, partition: {}, tablet: {}, txn id: {}",
                                                         tabletMeta.getTableId(), partitionId, tabletId, transactionState.getTransactionId());
                                             } else {
-                                                TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(tabletMeta.getPartitionId(), 
+                                                TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(tabletMeta.getPartitionId(),
                                                         partitionCommitInfo.getVersion(),
                                                         partitionCommitInfo.getVersionHash());
                                                 ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(transactionState.getDbId());
@@ -237,7 +239,7 @@ public class TabletInvertedIndex {
                                 foundTabletsWithInvalidSchema.put(tabletId, backendTabletInfo);
                             } // end for be tablet info
                         }
-                    }  else {
+                    } else {
                         // 2. (meta - be)
                         // may need delete from meta
                         LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, tabletId, tabletMeta);
@@ -251,10 +253,10 @@ public class TabletInvertedIndex {
 
         long end = System.currentTimeMillis();
         LOG.info("finished to do tablet diff with backend[{}]. sync: {}. metaDel: {}. foundValid: {}. foundInvalid: {}."
-                         + " migration: {}. found invalid transactions {}. found republish transactions {} " 
-                         + " cost: {} ms", backendId, tabletSyncMap.size(),
-                 tabletDeleteFromMeta.size(), foundTabletsWithValidSchema.size(), foundTabletsWithInvalidSchema.size(),
-                 tabletMigrationMap.size(), transactionsToClear.size(), transactionsToPublish.size(), (end - start));
+                        + " migration: {}. found invalid transactions {}. found republish transactions {} "
+                        + " cost: {} ms", backendId, tabletSyncMap.size(),
+                tabletDeleteFromMeta.size(), foundTabletsWithValidSchema.size(), foundTabletsWithInvalidSchema.size(),
+                tabletMigrationMap.size(), transactionsToClear.size(), transactionsToPublish.size(), (end - start));
     }
 
     public Long getTabletIdByReplica(long replicaId) {
@@ -302,7 +304,7 @@ public class TabletInvertedIndex {
 
         long versionInFe = replicaInFe.getVersion();
         long versionHashInFe = replicaInFe.getVersionHash();
-        
+
         if (backendTabletInfo.getVersion() > versionInFe) {
             // backend replica's version is larger or newer than replica in FE, sync it.
             return true;
@@ -311,10 +313,10 @@ public class TabletInvertedIndex {
             // backend replica's version is equal to replica in FE, but replica in FE is bad, while backend replica is good, sync it
             return true;
         }
-        
+
         return false;
     }
-    
+
     /**
      * Be will set `used' to false for bad replicas and `version_miss' to true for replicas with hole
      * in their version chain. In either case, those replicas need to be fixed by TabletScheduler.
@@ -350,7 +352,7 @@ public class TabletInvertedIndex {
              * 2. BE will report version (X+1, 0), and FE will sync with this version, change to (X+1, 0), too.
              * 3. When restore, BE will restore the replica with version (X, Y) (which is the visible version of partition)
              * 4. BE report the version (X-Y), and than we fall into here
-             * 
+             *
              * Actually, the version (X+1, 0) is a 'virtual' version, so here we ignore this kind of report
              */
             return false;
@@ -455,7 +457,7 @@ public class TabletInvertedIndex {
             writeUnlock();
         }
     }
-    
+
     public Replica getReplica(long tabletId, long backendId) {
         readLock();
         try {
@@ -602,5 +604,96 @@ public class TabletInvertedIndex {
     public Map<Long, Long> getReplicaToTabletMap() {
         return replicaToTabletMap;
     }
+
+    // Only build from available bes, exclude colocate tables
+    public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartitionInfoBySkew(List<Long> availableBeIds) {
+        readLock();
+
+        // 1. gen <partitionId-indexId, <beId, replicaCount>>
+        // for each replica(all tablets):
+        //      find beId, then replicaCount++
+        Map<TStorageMedium, Table<Long, Long, Map<Long, Long>>> partitionReplicasInfoMaps = Maps.newHashMap();
+        for (TStorageMedium medium : TStorageMedium.values()) {
+            partitionReplicasInfoMaps.put(medium, HashBasedTable.create());
+        }
+        try {
+            // Changes to the returned set will update the underlying table
+            // tablet id -> (backend id -> replica)
+            Set<Table.Cell<Long, Long, Replica>> cells = replicaMetaTable.cellSet();
+            for (Table.Cell<Long, Long, Replica> cell : cells) {
+                Long tabletId = cell.getRowKey();
+                Long beId = cell.getColumnKey();
+                try {
+                    Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId);
+                    TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
+                    Preconditions.checkNotNull(tabletMeta, "invalid tablet " + tabletId);
+                    Preconditions.checkState(!Catalog.getCurrentColocateIndex().isColocateTable(tabletMeta.getTableId()),
+                            "should not be the colocate table");
+
+                    TStorageMedium medium = tabletMeta.getStorageMedium();
+                    Table<Long, Long, Map<Long, Long>> partitionReplicasInfo = partitionReplicasInfoMaps.get(medium);
+                    Map<Long, Long> countMap = partitionReplicasInfo.get(tabletMeta.getPartitionId(), tabletMeta.getIndexId());
+                    if (countMap == null) {
+                        // If one be doesn't have any replica of one partition, it should be counted too.
+                        countMap = availableBeIds.stream().collect(Collectors.toMap(i -> i, i -> 0L));
+                    }
+
+                    Long count = countMap.get(beId);
+                    countMap.put(beId, count + 1L);
+                    partitionReplicasInfo.put(tabletMeta.getPartitionId(), tabletMeta.getIndexId(), countMap);
+                    partitionReplicasInfoMaps.put(medium, partitionReplicasInfo);
+                } catch (IllegalStateException | NullPointerException e) {
+                    // If the tablet or be has some problem, don't count in
+                    LOG.debug(e.getMessage());
+                }
+            }
+        } finally {
+            readUnlock();
+        }
+
+        // 2. Populate ClusterBalanceInfo::table_info_by_skew
+        // for each PartitionId-MaterializedIndex:
+        //      for each beId: record max_count, min_count(replicaCount)
+        //      put <max_count-min_count, TableBalanceInfo> to table_info_by_skew
+        Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> skewMaps = Maps.newHashMap();
+        for (TStorageMedium medium : TStorageMedium.values()) {
+            TreeMultimap<Long, PartitionBalanceInfo> partitionInfoBySkew = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
+            Set<Table.Cell<Long, Long, Map<Long, Long>>> mapCells = partitionReplicasInfoMaps.getOrDefault(medium, HashBasedTable.create()).cellSet();
+            for (Table.Cell<Long, Long, Map<Long, Long>> cell : mapCells) {
+                Map<Long, Long> countMap = cell.getValue();
+                Preconditions.checkNotNull(countMap);
+                PartitionBalanceInfo pbi = new PartitionBalanceInfo(cell.getRowKey(), cell.getColumnKey());
+                for (Map.Entry<Long, Long> entry : countMap.entrySet()) {
+                    Long beID = entry.getKey();
+                    Long replicaCount = entry.getValue();
+                    pbi.beByReplicaCount.put(replicaCount, beID);
+                }
+                // beByReplicaCount values are natural ordering
+                long minCount = pbi.beByReplicaCount.keySet().first();
+                long maxCount = pbi.beByReplicaCount.keySet().last();
+                partitionInfoBySkew.put(maxCount - minCount, pbi);
+            }
+            skewMaps.put(medium, partitionInfoBySkew);
+        }
+        return skewMaps;
+    }
+
+    public static class PartitionBalanceInfo {
+        public Long partitionId;
+        public Long indexId;
+        // Natural ordering
+        public TreeMultimap<Long, Long> beByReplicaCount = TreeMultimap.create();
+
+        public PartitionBalanceInfo(Long partitionId, Long indexId) {
+            this.partitionId = partitionId;
+            this.indexId = indexId;
+        }
+
+        public PartitionBalanceInfo(PartitionBalanceInfo info) {
+            this.partitionId = info.partitionId;
+            this.indexId = info.indexId;
+            this.beByReplicaCount = TreeMultimap.create(info.beByReplicaCount);
+        }
+    }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
index 33ca951..e2b6cdc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
@@ -17,6 +17,10 @@
 
 package org.apache.doris.clone;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.TreeMultimap;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.clone.BackendLoadStatistic.Classification;
 import org.apache.doris.clone.BackendLoadStatistic.LoadScore;
@@ -25,11 +29,6 @@ import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageMedium;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -59,9 +58,11 @@ public class ClusterLoadStatistic {
     // storage medium -> number of backend which has this kind of medium
     private Map<TStorageMedium, Integer> backendNumMap = Maps.newHashMap();
     private List<BackendLoadStatistic> beLoadStatistics = Lists.newArrayList();
+    private Map<TStorageMedium, TreeMultimap<Long, Long>> beByTotalReplicaCountMaps = Maps.newHashMap();
+    private Map<TStorageMedium, TreeMultimap<Long, TabletInvertedIndex.PartitionBalanceInfo>> skewMaps = Maps.newHashMap();
 
     public ClusterLoadStatistic(String clusterName, SystemInfoService infoService,
-            TabletInvertedIndex invertedIndex) {
+                                TabletInvertedIndex invertedIndex) {
         this.clusterName = clusterName;
         this.infoService = infoService;
         this.invertedIndex = invertedIndex;
@@ -90,7 +91,7 @@ public class ClusterLoadStatistic {
 
             beLoadStatistics.add(beStatistic);
         }
-        
+
         for (TStorageMedium medium : TStorageMedium.values()) {
             avgUsedCapacityPercentMap.put(medium, totalUsedCapacityMap.getOrDefault(medium, 0L) / (double) totalCapacityMap.getOrDefault(medium, 1L));
             avgReplicaNumPercentMap.put(medium, totalReplicaNumMap.getOrDefault(medium, 0L) / (double) backendNumMap.getOrDefault(medium, 1));
@@ -107,6 +108,22 @@ public class ClusterLoadStatistic {
 
         // sort be stats by mix load score
         Collections.sort(beLoadStatistics, BackendLoadStatistic.MIX_COMPARATOR);
+
+        // <medium -> Multimap<totalReplicaCount -> beId>>
+        // Only count the available be
+        for (TStorageMedium medium : TStorageMedium.values()) {
+            TreeMultimap<Long, Long> beByTotalReplicaCount = TreeMultimap.create();
+            beLoadStatistics.stream().filter(BackendLoadStatistic::isAvailable).forEach(beStat ->
+                    beByTotalReplicaCount.put(beStat.getReplicaNum(medium), beStat.getBeId()));
+            beByTotalReplicaCountMaps.put(medium, beByTotalReplicaCount);
+        }
+
+        // Actually the partition is [partition_id, index_id], aka pid.
+        // Multimap<skew -> PartitionBalanceInfo>
+        //                  PartitionBalanceInfo: <pid -> <partitionReplicaCount, beId>>
+        // Only count available bes here, aligned with the beByTotalReplicaCountMaps.
+        skewMaps = invertedIndex.buildPartitionInfoBySkew(beLoadStatistics.stream().filter(BackendLoadStatistic::isAvailable).
+                map(BackendLoadStatistic::getBeId).collect(Collectors.toList()));
     }
 
     /*
@@ -167,10 +184,10 @@ public class ClusterLoadStatistic {
      *    as more balance.
      */
     public boolean isMoreBalanced(long srcBeId, long destBeId, long tabletId, long tabletSize,
-            TStorageMedium medium) {
+                                  TStorageMedium medium) {
         double currentSrcBeScore;
         double currentDestBeScore;
-        
+
         BackendLoadStatistic srcBeStat = null;
         Optional<BackendLoadStatistic> optSrcBeStat = beLoadStatistics.stream().filter(
                 t -> t.getBeId() == srcBeId).findFirst();
@@ -179,7 +196,7 @@ public class ClusterLoadStatistic {
         } else {
             return false;
         }
-        
+
         BackendLoadStatistic destBeStat = null;
         Optional<BackendLoadStatistic> optDestBeStat = beLoadStatistics.stream().filter(
                 t -> t.getBeId() == destBeId).findFirst();
@@ -208,8 +225,8 @@ public class ClusterLoadStatistic {
         double newDiff = Math.abs(newSrcBeScore.score - avgLoadScoreMap.get(medium)) + Math.abs(newDestBeScore.score - avgLoadScoreMap.get(medium));
 
         LOG.debug("after migrate {}(size: {}) from {} to {}, medium: {}, the load score changed."
-                + " src: {} -> {}, dest: {}->{}, average score: {}. current diff: {}, new diff: {},"
-                + " more balanced: {}",
+                        + " src: {} -> {}, dest: {}->{}, average score: {}. current diff: {}, new diff: {},"
+                        + " more balanced: {}",
                 tabletId, tabletSize, srcBeId, destBeId, medium, currentSrcBeScore, newSrcBeScore.score,
                 currentDestBeScore, newDestBeScore.score, avgLoadScoreMap.get(medium), currentDiff, newDiff,
                 (newDiff < currentDiff));
@@ -332,4 +349,12 @@ public class ClusterLoadStatistic {
         }
         return sb.toString();
     }
+
+    public TreeMultimap<Long, Long> getBeByTotalReplicaMap(TStorageMedium medium) {
+        return beByTotalReplicaCountMaps.get(medium);
+    }
+
+    public TreeMultimap<Long, TabletInvertedIndex.PartitionBalanceInfo> getSkewMap(TStorageMedium medium) {
+        return skewMaps.get(medium);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/MovesCacheMap.java b/fe/fe-core/src/main/java/org/apache/doris/clone/MovesCacheMap.java
new file mode 100644
index 0000000..ff8a2d9
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/MovesCacheMap.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Maps;
+import org.apache.doris.common.Pair;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/*
+ * MovesCacheMap stores MovesCache for every cluster and medium.
+ * MovesCache is a simple encapsulation of Guava Cache. Use it by calling MovesCache.get().
+ * MovesCache's expireAfterAccess can be reset when updating the cache mapping. If expireAfterAccess reset,
+ * all MovesCaches will be cleared and recreated.
+ */
+public class MovesCacheMap {
+    private static final Logger LOG = LogManager.getLogger(MovesCacheMap.class);
+
+    // cluster -> medium -> MovesCache
+    private final Map<String, Map<TStorageMedium, MovesCache>> cacheMap = Maps.newHashMap();
+    private long lastExpireConfig = -1L;
+
+    // TabletId -> Pair<Move, ToDeleteReplicaId>, 'ToDeleteReplicaId == -1' means this move haven't been scheduled successfully.
+    public static class MovesCache {
+        Cache<Long, Pair<PartitionRebalancer.TabletMove, Long>> cache;
+
+        MovesCache(long duration, TimeUnit unit) {
+            cache = CacheBuilder.newBuilder().expireAfterAccess(duration, unit).build();
+        }
+
+        public Cache<Long, Pair<PartitionRebalancer.TabletMove, Long>> get() {
+            return cache;
+        }
+    }
+
+    // Cyclical update the cache mapping, cuz the cluster may be deleted, we should delete the corresponding cache too.
+    public void updateMapping(Map<String, ClusterLoadStatistic> statisticMap, long expireAfterAccessSecond) {
+        if (expireAfterAccessSecond > 0 && lastExpireConfig != expireAfterAccessSecond) {
+            LOG.debug("Reset expireAfterAccess, last {}s, now {}s. Moves will be cleared.", lastExpireConfig, expireAfterAccessSecond);
+            cacheMap.clear();
+            lastExpireConfig = expireAfterAccessSecond;
+        }
+
+        cacheMap.keySet().stream().filter(k -> !statisticMap.containsKey(k)).forEach(cacheMap::remove);
+
+        List<String> toAdd = statisticMap.keySet().stream().filter(k -> !cacheMap.containsKey(k)).collect(Collectors.toList());
+        for (String cluster : toAdd) {
+            Map<TStorageMedium, MovesCache> cacheMap = Maps.newHashMap();
+            Arrays.stream(TStorageMedium.values()).forEach(m -> cacheMap.put(m, new MovesCache(expireAfterAccessSecond, TimeUnit.SECONDS)));
+            this.cacheMap.put(cluster, cacheMap);
+        }
+    }
+
+    public MovesCache getCache(String clusterName, TStorageMedium medium) {
+        Map<TStorageMedium, MovesCache> clusterMoves = cacheMap.get(clusterName);
+        if (clusterMoves != null) {
+            return clusterMoves.get(medium);
+        }
+        return null;
+    }
+
+    // For each MovesCache, performs any pending maintenance operations needed by the cache.
+    public void maintain() {
+        cacheMap.values().forEach(maps -> maps.values().forEach(map -> map.get().cleanUp()));
+    }
+
+    public long size() {
+        return cacheMap.values().stream().mapToLong(maps -> maps.values().stream().mapToLong(map -> map.get().size()).sum()).sum();
+    }
+
+    @Override
+    public String toString() {
+        StringJoiner sj = new StringJoiner("\n", "MovesInProgress detail:\n", "");
+        cacheMap.forEach((key, value) -> value.forEach((k, v) -> sj.add("(" + key + "-" + k + ": " + v.get().asMap() + ")")));
+        return sj.toString();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
new file mode 100644
index 0000000..d4cd812
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.TreeMultimap;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/*
+ * PartitionRebalancer will decrease the skew of partitions. The skew of the partition is defined as the difference
+ * between the maximum replica count of the partition over all bes and the minimum replica count over all bes.
+ * Only consider about the replica count for each partition, never consider the replica size(disk usage).
+ *
+ * We use TwoDimensionalGreedyRebalanceAlgo to get partition moves(one PartitionMove is <partition id, from be, to be>).
+ * It prefers a move that reduce the skew of the cluster when we want to rebalance a max skew partition.
+ *
+ * selectAlternativeTabletsForCluster() must set the tablet id, so we need to select tablet for each move in this phase
+ * (as TabletMove).
+ */
+public class PartitionRebalancer extends Rebalancer {
+    private static final Logger LOG = LogManager.getLogger(PartitionRebalancer.class);
+
+    private final TwoDimensionalGreedyRebalanceAlgo algo = new TwoDimensionalGreedyRebalanceAlgo();
+
+    private final MovesCacheMap movesCacheMap = new MovesCacheMap();
+
+    private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
+    private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
+
+    public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) {
+        super(infoService, invertedIndex);
+    }
+
+    @Override
+    protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
+            String clusterName, ClusterLoadStatistic clusterStat, TStorageMedium medium) {
+        MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(clusterName, medium);
+        Preconditions.checkNotNull(movesInProgress, "clusterStat is got from statisticMap, movesCacheMap should have the same entry");
+
+        // Iterating through Cache.asMap().values() does not reset access time for the entries you retrieve.
+        List<TabletMove> movesInProgressList = movesInProgress.get().asMap().values()
+                .stream().map(p -> p.first).collect(Collectors.toList());
+        List<Long> toDeleteKeys = Lists.newArrayList();
+
+        // The problematic movements will be found in buildClusterInfo(), so here is a simply move completion check
+        // of moves which have valid ToDeleteReplica.
+        List<TabletMove> movesNeedCheck = movesInProgress.get().asMap().values()
+                .stream().filter(p -> p.second != -1L).map(p -> p.first).collect(Collectors.toList());
+        checkMovesCompleted(movesNeedCheck, toDeleteKeys);
+
+        ClusterBalanceInfo clusterBalanceInfo = new ClusterBalanceInfo();
+        // We should assume the in-progress moves have been succeeded to avoid producing the same moves.
+        // Apply in-progress moves to current cluster stats, use TwoDimensionalGreedyAlgo.ApplyMove for simplicity.
+        if (!buildClusterInfo(clusterStat, medium, movesInProgressList, clusterBalanceInfo, toDeleteKeys)) {
+            return Lists.newArrayList();
+        }
+
+        // Just delete the completed or problematic moves
+        if (!toDeleteKeys.isEmpty()) {
+            movesInProgress.get().invalidateAll(toDeleteKeys);
+            movesInProgressList = movesInProgressList.stream()
+                    .filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList());
+        }
+
+        // The balancing tasks of other cluster or medium might have failed. We use the upper limit value
+        // `total num of in-progress moves` to avoid useless selections.
+        if (movesCacheMap.size() > Config.max_balancing_tablets) {
+            LOG.debug("Total in-progress moves > {}", Config.max_balancing_tablets);
+            return Lists.newArrayList();
+        }
+
+        NavigableSet<Long> skews = clusterBalanceInfo.partitionInfoBySkew.keySet();
+        LOG.debug("Cluster {}-{}: peek max skew {}, assume {} in-progress moves are succeeded {}", clusterName, medium,
+                skews.isEmpty() ? 0 : skews.last(), movesInProgressList.size(), movesInProgressList);
+
+        List<TwoDimensionalGreedyRebalanceAlgo.PartitionMove> moves = algo.getNextMoves(clusterBalanceInfo, Config.partition_rebalance_max_moves_num_per_selection);
+
+        List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
+        List<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toList());
+        for (TwoDimensionalGreedyRebalanceAlgo.PartitionMove move : moves) {
+            // Find all tablets of the specified partition that would have a replica at the source be,
+            // but would not have a replica at the destination be. That is to satisfy the restriction
+            // of having no more than one replica of the same tablet per be.
+            List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.fromBe, medium);
+            List<Long> invalidIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium);
+            tabletIds.removeAll(invalidIds);
+            // In-progress tablets can't be the candidate too.
+            tabletIds.removeAll(inProgressIds);
+
+            Map<Long, TabletMeta> tabletCandidates = Maps.newHashMap();
+            for (long tabletId : tabletIds) {
+                TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
+                if (tabletMeta != null && tabletMeta.getPartitionId() == move.partitionId
+                        && tabletMeta.getIndexId() == move.indexId) {
+                    tabletCandidates.put(tabletId, tabletMeta);
+                }
+            }
+            LOG.debug("Find {} candidates for move {}", tabletCandidates.size(), move);
+            if (tabletCandidates.isEmpty()) {
+                continue;
+            }
+
+            // Random pick one candidate to create tabletSchedCtx
+            Random rand = new Random();
+            Object[] keys = tabletCandidates.keySet().toArray();
+            long pickedTabletId = (long) keys[rand.nextInt(keys.length)];
+            LOG.debug("Picked tablet id for move {}: {}", move, pickedTabletId);
+
+            TabletMeta tabletMeta = tabletCandidates.get(pickedTabletId);
+            TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, clusterName,
+                    tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
+                    tabletMeta.getIndexId(), pickedTabletId, System.currentTimeMillis());
+            // Balance task's priority is always LOW
+            tabletCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
+            alternativeTablets.add(tabletCtx);
+            // Pair<Move, ToDeleteReplicaId>, ToDeleteReplicaId should be -1L before scheduled successfully
+            movesInProgress.get().put(pickedTabletId, new Pair<>(new TabletMove(pickedTabletId, move.fromBe, move.toBe), -1L));
+            counterBalanceMoveCreated.incrementAndGet();
+            // Synchronize with movesInProgress
+            inProgressIds.add(pickedTabletId);
+        }
+
+        if (moves.isEmpty()) {
+            // Balanced cluster should not print too much log messages, so we log it with level debug.
+            LOG.debug("Cluster {}-{}: cluster is balanced.", clusterName, medium);
+        } else {
+            LOG.info("Cluster {}-{}: get {} moves, actually select {} alternative tablets to move. Tablets detail: {}",
+                    clusterName, medium, moves.size(), alternativeTablets.size(),
+                    alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
+        }
+        return alternativeTablets;
+    }
+
+    private boolean buildClusterInfo(ClusterLoadStatistic clusterStat, TStorageMedium medium,
+                                     List<TabletMove> movesInProgress, ClusterBalanceInfo info, List<Long> toDeleteKeys) {
+        Preconditions.checkState(info.beByTotalReplicaCount.isEmpty() && info.partitionInfoBySkew.isEmpty(), "");
+
+        // If we wanna modify the PartitionBalanceInfo in info.beByTotalReplicaCount, deep-copy it
+        info.beByTotalReplicaCount.putAll(clusterStat.getBeByTotalReplicaMap(medium));
+        info.partitionInfoBySkew.putAll(clusterStat.getSkewMap(medium));
+
+        // Skip the toDeleteKeys
+        List<TabletMove> filteredMoves = movesInProgress.stream().filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList());
+
+        for (TabletMove move : filteredMoves) {
+            TabletMeta meta = invertedIndex.getTabletMeta(move.tabletId);
+            if (meta == null) {
+                // Move's tablet is invalid, need delete it
+                toDeleteKeys.add(move.tabletId);
+                continue;
+            }
+
+            TwoDimensionalGreedyRebalanceAlgo.PartitionMove partitionMove = new TwoDimensionalGreedyRebalanceAlgo.PartitionMove(meta.getPartitionId(), meta.getIndexId(), move.fromBe, move.toBe);
+            boolean st = TwoDimensionalGreedyRebalanceAlgo.applyMove(partitionMove, info.beByTotalReplicaCount, info.partitionInfoBySkew);
+            if (!st) {
+                // Can't apply this move, mark it failed, continue to apply the next.
+                toDeleteKeys.add(move.tabletId);
+            }
+        }
+        return true;
+    }
+
+    private void checkMovesCompleted(List<TabletMove> moves, List<Long> toDeleteKeys) {
+        boolean moveIsComplete;
+        for (TabletMove move : moves) {
+            moveIsComplete = checkMoveCompleted(move);
+            // If the move was completed, remove it
+            if (moveIsComplete) {
+                toDeleteKeys.add(move.tabletId);
+                LOG.debug("Move {} is completed. The cur dist: {}", move,
+                        invertedIndex.getReplicasByTabletId(move.tabletId).stream().map(Replica::getBackendId).collect(Collectors.toList()));
+                counterBalanceMoveSucceeded.incrementAndGet();
+            }
+        }
+    }
+
+    // Move completed: fromBe doesn't have a replica and toBe has a replica
+    private boolean checkMoveCompleted(TabletMove move) {
+        Long tabletId = move.tabletId;
+        List<Long> bes = invertedIndex.getReplicasByTabletId(tabletId).stream().map(Replica::getBackendId).collect(Collectors.toList());
+        return !bes.contains(move.fromBe) && bes.contains(move.toBe);
+    }
+
+    @Override
+    protected void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, TabletScheduler.PathSlot> backendsWorkingSlots)
+            throws SchedException {
+        MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(tabletCtx.getCluster(), tabletCtx.getStorageMedium());
+        Preconditions.checkNotNull(movesInProgress, "clusterStat is got from statisticMap, movesInProgressMap should have the same entry");
+
+        try {
+            Pair<TabletMove, Long> pair = movesInProgress.get().getIfPresent(tabletCtx.getTabletId());
+            Preconditions.checkNotNull(pair, "No cached move for tablet: " + tabletCtx.getTabletId());
+
+            TabletMove move = pair.first;
+            checkMoveValidation(move);
+
+            // Check src replica's validation
+            Replica srcReplica = tabletCtx.getTablet().getReplicaByBackendId(move.fromBe);
+            Preconditions.checkNotNull(srcReplica);
+            TabletScheduler.PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendId());
+            Preconditions.checkNotNull(slot, "unable to get fromBe " + srcReplica.getBackendId() + " slot");
+            if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) {
+                tabletCtx.setSrc(srcReplica);
+            } else {
+                throw new SchedException(SchedException.Status.SCHEDULE_FAILED, "no slot for src replica " + srcReplica + ", pathHash " + srcReplica.getPathHash());
+            }
+
+            // Choose a path in destination
+            ClusterLoadStatistic clusterStat = statisticMap.get(tabletCtx.getCluster());
+            Preconditions.checkNotNull(clusterStat, "cluster does not exist: " + tabletCtx.getCluster());
+            BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(move.toBe);
+            Preconditions.checkNotNull(beStat);
+            slot = backendsWorkingSlots.get(move.toBe);
+            Preconditions.checkNotNull(slot, "unable to get slot of toBe " + move.toBe);
+
+            List<RootPathLoadStatistic> paths = beStat.getPathStatistics();
+            Set<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
+                    && path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
+                    .map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
+            long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
+            if (pathHash == -1) {
+                throw new SchedException(SchedException.Status.SCHEDULE_FAILED, "paths has no available balance slot: " + availPath);
+            } else {
+                tabletCtx.setDest(beStat.getBeId(), pathHash);
+            }
+
+            // ToDeleteReplica is the source replica
+            pair.second = srcReplica.getId();
+        } catch (IllegalStateException | NullPointerException e) {
+            // Problematic move should be invalidated immediately
+            movesInProgress.get().invalidate(tabletCtx.getTabletId());
+            throw new SchedException(SchedException.Status.UNRECOVERABLE, e.getMessage());
+        }
+    }
+
+    // The validation check cannot be accurate, cuz the production of moves do have ordering.
+    // If some moves failed, the cluster & partition skew is different to the skew when we getNextMove.
+    // So we can't do skew check.
+    // Just do some basic checks, e.g. server available.
+    private void checkMoveValidation(TabletMove move) throws IllegalStateException {
+        boolean fromAvailable = infoService.checkBackendAvailable(move.fromBe);
+        boolean toAvailable = infoService.checkBackendAvailable(move.toBe);
+        Preconditions.checkState(fromAvailable && toAvailable, move + "'s bes are not all available: from " + fromAvailable + ", to " + toAvailable);
+        // To be improved
+    }
+
+    @Override
+    public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) {
+        MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(tabletCtx.getCluster(), tabletCtx.getStorageMedium());
+
+        // We don't invalidate the cached move here, cuz the redundant repair progress is just started.
+        // The move should be invalidated by TTL or Algo.CheckMoveCompleted()
+        Pair<TabletMove, Long> pair = movesInProgress.get().getIfPresent(tabletCtx.getTabletId());
+        if (pair != null) {
+            Preconditions.checkState(pair.second != -1L);
+            return pair.second;
+        } else {
+            return (long) -1;
+        }
+    }
+
+    @Override
+    public void updateLoadStatistic(Map<String, ClusterLoadStatistic> statisticMap) {
+        super.updateLoadStatistic(statisticMap);
+        movesCacheMap.updateMapping(statisticMap, Config.partition_rebalance_move_expire_after_access);
+        // Perform cache maintenance
+        movesCacheMap.maintain();
+        LOG.debug("Move succeeded/total :{}/{}, current {}",
+                counterBalanceMoveSucceeded.get(), counterBalanceMoveCreated.get(), movesCacheMap);
+    }
+
+    // Represents a concrete move of a tablet from one be to another.
+    // Formed logically from a PartitionMove by specifying a tablet for the move.
+    public static class TabletMove {
+        Long tabletId;
+        Long fromBe;
+        Long toBe;
+
+        TabletMove(Long id, Long from, Long to) {
+            this.tabletId = id;
+            this.fromBe = from;
+            this.toBe = to;
+        }
+
+        @Override
+        public String toString() {
+            return "ReplicaMove{" +
+                    "tabletId=" + tabletId +
+                    ", fromBe=" + fromBe +
+                    ", toBe=" + toBe +
+                    '}';
+        }
+    }
+
+    // Balance information for a cluster(one medium), excluding decommissioned/dead bes and replicas on them.
+    // Natural ordering, so the last key is the max key.
+    public static class ClusterBalanceInfo {
+        TreeMultimap<Long, TabletInvertedIndex.PartitionBalanceInfo> partitionInfoBySkew = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
+        TreeMultimap<Long, Long> beByTotalReplicaCount = TreeMultimap.create();
+    }
+
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index e0bf5f7..f854ef0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -65,7 +65,7 @@ public abstract class Rebalancer {
         return alternativeTablets;
     }
 
-    // The return TabletSchedCtx should have the tablet id at least. {srcReplica, destBe} can be complete here or
+    // The returned TabletSchedCtx should have the tablet id at least. {srcReplica, destBe} can be complete here or
     // later(when createBalanceTask called).
     protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
             String clusterName, ClusterLoadStatistic clusterStat, TStorageMedium medium);
@@ -86,7 +86,7 @@ public abstract class Rebalancer {
     protected abstract void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, PathSlot> backendsWorkingSlots)
             throws SchedException;
 
-    public Long getToDeleteReplicaId(Long tabletId) {
+    public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) {
         return -1L;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 9a2cfd6..349db1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -138,14 +138,18 @@ public class TabletScheduler extends MasterDaemon {
     }
 
     public TabletScheduler(Catalog catalog, SystemInfoService infoService, TabletInvertedIndex invertedIndex,
-            TabletSchedulerStat stat) {
+                           TabletSchedulerStat stat, String rebalancerType) {
         super("tablet scheduler", SCHEDULE_INTERVAL_MS);
         this.catalog = catalog;
         this.infoService = infoService;
         this.invertedIndex = invertedIndex;
         this.colocateTableIndex = catalog.getColocateTableIndex();
         this.stat = stat;
-        this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex);
+        if (rebalancerType.equalsIgnoreCase("partition")) {
+            this.rebalancer = new PartitionRebalancer(infoService, invertedIndex);
+        } else {
+            this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex);
+        }
     }
 
     public TabletSchedulerStat getStat() {
@@ -852,7 +856,7 @@ public class TabletScheduler extends MasterDaemon {
     }
 
     private boolean deleteReplicaChosenByRebalancer(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
-        Long id = rebalancer.getToDeleteReplicaId(tabletCtx.getTabletId());
+        Long id = rebalancer.getToDeleteReplicaId(tabletCtx);
         if (id == -1L) {
             return false;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java
new file mode 100644
index 0000000..cabb5af
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo;
+import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo;
+import org.apache.doris.common.Pair;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * A two-dimensional greedy rebalancing algorithm. The two dims are cluster and partition. It'll generate multiple `PartitionMove`,
+ * only decide which partition to move, fromBe, toBe. The next step is to select a tablet to move.
+ *
+ * From among moves that decrease the skew of a most skewed partition, it prefers ones that reduce the skew of the cluster.
+ * A cluster is considered balanced when the skew of every partition is <= 1 and the skew of the cluster is <= 1.
+ * The skew of the cluster is defined as the difference between the maximum total replica count over all bes and the
+ * minimum total replica count over all bes.
+ *
+ * This class is modified from kudu TwoDimensionalGreedyAlgo.
+ */
+public class TwoDimensionalGreedyRebalanceAlgo {
+    private static final Logger LOG = LogManager.getLogger(TwoDimensionalGreedyRebalanceAlgo.class);
+
+    private final EqualSkewOption equalSkewOption;
+    private static final Random rand = new Random(System.currentTimeMillis());
+
+    public static class PartitionMove {
+        Long partitionId;
+        Long indexId;
+        Long fromBe;
+        Long toBe;
+
+        PartitionMove(Long p, Long i, Long f, Long t) {
+            this.partitionId = p;
+            this.indexId = i;
+            this.fromBe = f;
+            this.toBe = t;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            PartitionMove that = (PartitionMove) o;
+            return Objects.equal(partitionId, that.partitionId) &&
+                    Objects.equal(indexId, that.indexId) &&
+                    Objects.equal(fromBe, that.fromBe) &&
+                    Objects.equal(toBe, that.toBe);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(partitionId, indexId, fromBe, toBe);
+        }
+
+        @Override
+        public String toString() {
+            return "ReplicaMove{" +
+                    "pid=" + partitionId + "-" + indexId +
+                    ", from=" + fromBe +
+                    ", to=" + toBe +
+                    '}';
+        }
+    }
+
+    public enum EqualSkewOption {
+        // generally only be used on unit test
+        PICK_FIRST,
+        PICK_RANDOM
+    }
+
+    public enum ExtremumType {
+        MAX,
+        MIN
+    }
+
+    public static class IntersectionResult {
+        Long replicaCountPartition;
+        Long replicaCountTotal;
+        List<Long> beWithExtremumCount;
+        List<Long> intersection;
+    }
+
+    TwoDimensionalGreedyRebalanceAlgo() {
+        this(EqualSkewOption.PICK_RANDOM);
+    }
+
+    TwoDimensionalGreedyRebalanceAlgo(EqualSkewOption equalSkewOption) {
+        this.equalSkewOption = equalSkewOption;
+    }
+
+    // maxMovesNum: Value of '0' is a shortcut for 'the possible maximum'.
+    // May modify the ClusterBalanceInfo
+    public List<PartitionMove> getNextMoves(ClusterBalanceInfo info, int maxMovesNum) {
+        Preconditions.checkArgument(maxMovesNum >= 0);
+        if (maxMovesNum == 0) {
+            maxMovesNum = Integer.MAX_VALUE;
+        }
+
+        if (info.partitionInfoBySkew.isEmpty()) {
+            // Check for the consistency of the 'ClusterBalanceInfo' parameter: if no information is given on
+            // the partition skew, partition count for all the be should be 0.
+            // Keys are ordered by the natural ordering, so we can get the last(max) key to know if all keys are 0.
+            NavigableSet<Long> keySet = info.beByTotalReplicaCount.keySet();
+            LOG.debug(keySet);
+            Preconditions.checkState(keySet.isEmpty() || keySet.last() == 0L,
+                    "non-zero replica count on be while no partition skew information in skewMap");
+            // Nothing to balance: cluster is empty.
+            return Lists.newArrayList();
+        }
+
+        List<PartitionMove> moves = Lists.newArrayList();
+        for (int i = 0; i < maxMovesNum; ++i) {
+            PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew);
+            if (move == null || !(applyMove(move, info.beByTotalReplicaCount, info.partitionInfoBySkew))) {
+                // 1. No replicas to move.
+                // 2. Apply to info failed, it's useless to get next move from the same info.
+                break;
+            }
+            moves.add(move);
+        }
+
+        return moves;
+    }
+
+    private PartitionMove getNextMove(TreeMultimap<Long, Long> beByTotalReplicaCount,
+                                      TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
+        PartitionMove move = null;
+        if (skewMap.isEmpty() || beByTotalReplicaCount.isEmpty()) {
+            return null;
+        }
+        long maxPartitionSkew = skewMap.keySet().last();
+        long maxBeSkew = beByTotalReplicaCount.keySet().last() - beByTotalReplicaCount.keySet().first();
+
+        // 1. Every partition is balanced(maxPartitionSkew<=1) and any move will unbalance a partition, so there
+        // is no potential for the greedy algorithm to balance the cluster.
+        // 2. Every partition is balanced(maxPartitionSkew<=1) and the cluster as a whole is balanced(maxBeSkew<=1).
+        if (maxPartitionSkew == 0L || (maxPartitionSkew <= 1L && maxBeSkew <= 1L)) {
+            return null;
+        }
+
+        // Among the partitions with maximum skew, attempt to pick a partition where there is
+        // a move that improves the partition skew and the cluster skew, if possible. If
+        // not, attempt to pick a move that improves the partition skew. If all partitions
+        // are balanced, attempt to pick a move that preserves partition balance and
+        // improves cluster skew.
+        NavigableSet<PartitionBalanceInfo> maxSet = skewMap.get(maxPartitionSkew);
+        for (PartitionBalanceInfo pbi : maxSet) {
+            Preconditions.checkArgument(!pbi.beByReplicaCount.isEmpty(), "no information on replicas of " +
+                    "partition " + pbi.partitionId + "-" + pbi.indexId);
+
+            Long minReplicaCount = pbi.beByReplicaCount.keySet().first();
+            Long maxReplicaCount = pbi.beByReplicaCount.keySet().last();
+            LOG.debug("balancing partition {}-{} with replica count skew {} (min_replica_count: {}, max_replica_count: {})",
+                    pbi.partitionId, pbi.indexId, maxPartitionSkew,
+                    minReplicaCount, maxReplicaCount);
+
+            // Compute the intersection of the bes most loaded for the table
+            // with the bes most loaded overall, and likewise for least loaded.
+            // These are our ideal candidates for moving from and to, respectively.
+            IntersectionResult maxLoaded = getIntersection(ExtremumType.MAX, pbi.beByReplicaCount, beByTotalReplicaCount);
+            IntersectionResult minLoaded = getIntersection(ExtremumType.MIN, pbi.beByReplicaCount, beByTotalReplicaCount);
+            LOG.debug("partition-wise: min_count: {}, max_count: {}", minLoaded.replicaCountPartition, maxLoaded.replicaCountPartition);
+            LOG.debug("cluster-wise: min_count: {}, max_count: {}", minLoaded.replicaCountTotal, maxLoaded.replicaCountTotal);
+            LOG.debug("min_loaded_intersection: {}, max_loaded_intersection: {}", minLoaded.intersection.toString(), maxLoaded.intersection.toString());
+
+            // Do not move replicas of a balanced table if the least (most) loaded
+            // servers overall do not intersect the servers hosting the least (most)
+            // replicas of the table. Moving a replica in that case might keep the
+            // cluster skew the same or make it worse while keeping the table balanced.
+            if ((maxLoaded.replicaCountPartition <= minLoaded.replicaCountPartition + 1)
+                    && (minLoaded.intersection.isEmpty() || maxLoaded.intersection.isEmpty())) {
+                continue;
+            }
+
+            Long minLoadedBe, maxLoadedBe;
+            if (equalSkewOption == EqualSkewOption.PICK_FIRST) {
+                // beWithExtremumCount lists & intersection lists are natural ordering
+                minLoadedBe = minLoaded.intersection.isEmpty() ? minLoaded.beWithExtremumCount.get(0) : minLoaded.intersection.get(0);
+                maxLoadedBe = maxLoaded.intersection.isEmpty() ? maxLoaded.beWithExtremumCount.get(0) : maxLoaded.intersection.get(0);
+            } else {
+                minLoadedBe = minLoaded.intersection.isEmpty() ? getRandomListElement(minLoaded.beWithExtremumCount)
+                        : getRandomListElement(minLoaded.intersection);
+                maxLoadedBe = maxLoaded.intersection.isEmpty() ? getRandomListElement(maxLoaded.beWithExtremumCount)
+                        : getRandomListElement(maxLoaded.intersection);
+            }
+
+            LOG.debug("min_loaded_be: {}, max_loaded_be: {}", minLoadedBe, maxLoadedBe);
+            if (minLoadedBe.equals(maxLoadedBe)) {
+                // Nothing to move.
+                continue;
+            }
+            // Move a replica of the selected partition from a most loaded server to a
+            // least loaded server.
+            move = new PartitionMove(pbi.partitionId, pbi.indexId, maxLoadedBe, minLoadedBe);
+            break;
+        }
+        return move;
+    }
+
+    public static <T> T getRandomListElement(List<T> items) {
+        Preconditions.checkArgument(!items.isEmpty());
+        return items.get(rand.nextInt(items.size()));
+    }
+
+    public static IntersectionResult getIntersection(ExtremumType extremumType, TreeMultimap<Long, Long> beByReplicaCount,
+                                                     TreeMultimap<Long, Long> beByTotalReplicaCount) {
+        Pair<Long, Set<Long>> beSelectedByPartition = getMinMaxLoadedServers(beByReplicaCount, extremumType);
+        Pair<Long, Set<Long>> beSelectedByTotal = getMinMaxLoadedServers(beByTotalReplicaCount, extremumType);
+        Preconditions.checkNotNull(beSelectedByPartition);
+        Preconditions.checkNotNull(beSelectedByTotal);
+
+        IntersectionResult res = new IntersectionResult();
+        res.replicaCountPartition = beSelectedByPartition.first;
+        res.replicaCountTotal = beSelectedByTotal.first;
+        res.beWithExtremumCount = Lists.newArrayList(beSelectedByPartition.second);
+        res.intersection = Lists.newArrayList(Sets.intersection(beSelectedByPartition.second, beSelectedByTotal.second));
+        return res;
+    }
+
+    private static Pair<Long, Set<Long>> getMinMaxLoadedServers(TreeMultimap<Long, Long> multimap, ExtremumType extremumType) {
+        if (multimap.isEmpty()) {
+            return null;
+        }
+        Long count = (extremumType == ExtremumType.MIN) ? multimap.keySet().first() : multimap.keySet().last();
+        return new Pair<>(count, multimap.get(count));
+    }
+
+    // Update the balance state in 'ClusterBalanceInfo'(the two maps) with the outcome of the move 'move'.
+    // To support apply in-progress moves to current cluster balance info, if apply failed, the maps should not be modified.
+    public static boolean applyMove(PartitionMove move, TreeMultimap<Long, Long> beByTotalReplicaCount,
+                                    TreeMultimap<Long, PartitionBalanceInfo> skewMap) {
+        // Update the total counts
+        moveOneReplica(move.fromBe, move.toBe, beByTotalReplicaCount);
+
+        try {
+            PartitionBalanceInfo partitionBalanceInfo = null;
+            Long skew = -1L;
+            for (Long key : skewMap.keySet()) {
+                NavigableSet<PartitionBalanceInfo> pbiSet = skewMap.get(key);
+                List<PartitionBalanceInfo> pbis = pbiSet.stream().filter(info ->
+                        info.partitionId.equals(move.partitionId) && info.indexId.equals(move.indexId)).collect(Collectors.toList());
+                Preconditions.checkState(pbis.size() <= 1, "skew map has dup partition info");
+                if (pbis.size() == 1) {
+                    partitionBalanceInfo = pbis.get(0);
+                    skew = key;
+                    break;
+                }
+            }
+
+            Preconditions.checkState(skew != -1L, "partition is not in skew map");
+            PartitionBalanceInfo newInfo = new PartitionBalanceInfo(partitionBalanceInfo);
+            moveOneReplica(move.fromBe, move.toBe, newInfo.beByReplicaCount);
+
+            skewMap.remove(skew, partitionBalanceInfo);
+            long min_count = newInfo.beByReplicaCount.keySet().first();
+            long max_count = newInfo.beByReplicaCount.keySet().last();
+            skewMap.put(max_count - min_count, newInfo);
+        } catch (IllegalStateException e) {
+            // If touch IllegalState, the skew map doesn't be modified, so we should rollback the move of beByTotalReplicaCount
+            moveOneReplica(move.toBe, move.fromBe, beByTotalReplicaCount);
+            LOG.info("{} apply failed, {}", move, e.getMessage());
+            return false;
+        } catch (Exception e) {
+            // Rollback the move of beByTotalReplicaCount is meaningless here
+            LOG.warn("got unexpected exception when apply {}, the skew may be broken. {}", move, e.toString());
+            throw e;
+        }
+        return true;
+    }
+
+    // Applies to 'm' a move of a replica from the be with id 'src' to the be with id 'dst' by decrementing
+    // the count of 'src' and incrementing the count of 'dst'.
+    // If check failed, won't modify the map.
+    private static void moveOneReplica(Long fromBe, Long toBe, TreeMultimap<Long, Long> m) throws IllegalStateException {
+        boolean foundSrc = false;
+        boolean foundDst = false;
+        Long countSrc = 0L;
+        Long countDst = 0L;
+        for (Long key : m.keySet()) {
+            // set is arbitrary ordering, need to convert
+            Set<Long> values = m.get(key);
+            if (values.contains(fromBe)) {
+                foundSrc = true;
+                countSrc = key;
+            }
+            if (values.contains(toBe)) {
+                foundDst = true;
+                countDst = key;
+            }
+        }
+
+        Preconditions.checkState(foundSrc, "fromBe " + fromBe + " is not in the map");
+        Preconditions.checkState(foundDst, "toBe " + toBe + " is not in the map");
+        Preconditions.checkState(countSrc > 0, "fromBe has no replica in the map, can't move");
+
+        m.remove(countSrc, fromBe);
+        m.remove(countDst, toBe);
+        m.put(countSrc - 1, fromBe);
+        m.put(countDst + 1, toBe);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index c3ba37a..c9b590b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1011,6 +1011,18 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int max_balancing_tablets = 100;
 
+    // Rebalancer type(ignore case): BeLoad, Partition. If type parse failed, use BeLoad as default.
+    @ConfField(masterOnly = true)
+    public static String tablet_rebalancer_type = "BeLoad";
+
+    // Valid only if use PartitionRebalancer. If this changed, cached moves will be cleared.
+    @ConfField(mutable = true, masterOnly = true)
+    public static long partition_rebalance_move_expire_after_access = 600; // 600s
+
+    // Valid only if use PartitionRebalancer
+    @ConfField(mutable = true, masterOnly = true)
+    public static int partition_rebalance_max_moves_num_per_selection = 10;
+
     // This threshold is to avoid piling up too many report task in FE, which may cause OOM exception.
     // In some large Doris cluster, eg: 100 Backends with ten million replicas, a tablet report may cost
     // several seconds after some modification of metadata(drop partition, etc..).
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
new file mode 100644
index 0000000..a6ea1e4
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import mockit.Delegate;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DataProperty;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.RangePartitionInfo;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTask;
+import org.apache.doris.task.CloneTask;
+import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.thrift.TStorageType;
+import org.apache.doris.thrift.TTabletInfo;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static com.google.common.collect.MoreCollectors.onlyElement;
+
+public class RebalanceTest {
+    private static final Logger LOG = LogManager.getLogger(RebalanceTest.class);
+
+    @Mocked
+    private Catalog catalog;
+
+    private long id = 10086;
+
+    private Database db;
+    private OlapTable olapTable;
+
+    private final SystemInfoService systemInfoService = new SystemInfoService();
+    private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
+    private Map<String, ClusterLoadStatistic> statisticMap;
+
+    @Before
+    public void setUp() throws AnalysisException {
+        db = new Database(1, "test db");
+        db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
+        new Expectations() {
+            {
+                catalog.getDbIds();
+                minTimes = 0;
+                result = db.getId();
+
+                catalog.getDb(anyLong);
+                minTimes = 0;
+                result = db;
+
+                Catalog.getCurrentCatalogJournalVersion();
+                minTimes = 0;
+                result = FeConstants.meta_version;
+
+                catalog.getNextId();
+                minTimes = 0;
+                result = new Delegate() {
+                    long a() {
+                        return id++;
+                    }
+                };
+
+                Catalog.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+
+                Catalog.getCurrentInvertedIndex();
+                minTimes = 0;
+                result = invertedIndex;
+
+                Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();
+                result = 111;
+
+                Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(anyLong, anyLong, (List<Long>) any);
+                result = true;
+            }
+        };
+        // Test mock validation
+        Assert.assertEquals(111, Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
+        Assert.assertTrue(Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1, 2, Lists.newArrayList(3L)));
+
+        List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
+        beIds.forEach(id -> systemInfoService.addBackend(RebalancerTestUtil.createBackend(id, 2048, 0)));
+
+        olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS,
+                new RangePartitionInfo(), new HashDistributionInfo());
+        db.createTable(olapTable);
+
+        // 1 table, 3 partitions p0,p1,p2
+        MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null);
+        createPartitionsForTable(olapTable, materializedIndex, 3L);
+        olapTable.setIndexMeta(materializedIndex.getId(), "fake index", Lists.newArrayList(new Column()),
+                0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS);
+
+        // Tablet distribution: we add them to olapTable & build invertedIndex manually
+        RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", TStorageMedium.HDD,
+                50000, Lists.newArrayList(10001L, 10002L, 10003L));
+
+        RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1", TStorageMedium.HDD,
+                60000, Lists.newArrayList(10001L, 10002L, 10003L));
+
+        RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", TStorageMedium.HDD,
+                70000, Lists.newArrayList(10001L, 10002L, 10003L));
+
+        // be4(10004) doesn't have any replica
+
+        generateStatisticMap();
+    }
+
+    private void generateStatisticMap() {
+        ClusterLoadStatistic loadStatistic = new ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER,
+                systemInfoService, invertedIndex);
+        loadStatistic.init();
+        statisticMap = Maps.newConcurrentMap();
+        statisticMap.put(SystemInfoService.DEFAULT_CLUSTER, loadStatistic);
+    }
+
+    private void createPartitionsForTable(OlapTable olapTable, MaterializedIndex index, Long partitionCount) {
+        // partition id start from 31
+        LongStream.range(0, partitionCount).forEach(idx -> {
+            long id = 31 + idx;
+            Partition partition = new Partition(id, "p" + idx, index, new HashDistributionInfo());
+            olapTable.addPartition(partition);
+            olapTable.getPartitionInfo().addPartition(id, new DataProperty(TStorageMedium.HDD), (short) 3, false);
+        });
+
+    }
+
+    @Test
+    public void testPartitionRebalancer() {
+        Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer", Level.DEBUG);
+
+        // Disable scheduler's rebalancer adding balance task, add balance tasks manually
+        Config.disable_balance = true;
+        // Create a new scheduler & checker for redundant tablets handling
+        // Call runAfterCatalogReady manually instead of starting daemon thread
+        TabletSchedulerStat stat = new TabletSchedulerStat();
+        PartitionRebalancer rebalancer = new PartitionRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex());
+        TabletScheduler tabletScheduler = new TabletScheduler(catalog, systemInfoService, invertedIndex, stat, "");
+        // The rebalancer inside the scheduler will use this rebalancer, for getToDeleteReplicaId
+        Deencapsulation.setField(tabletScheduler, "rebalancer", rebalancer);
+
+        TabletChecker tabletChecker = new TabletChecker(catalog, systemInfoService, tabletScheduler, stat);
+
+        rebalancer.updateLoadStatistic(statisticMap);
+        List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets();
+
+        // Run once for update slots info, scheduler won't select balance cuz balance is disabled
+        tabletScheduler.runAfterCatalogReady();
+
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (TabletSchedCtx tabletCtx : alternativeTablets) {
+            LOG.info("try to schedule tablet {}", tabletCtx.getTabletId());
+            try {
+                tabletCtx.setStorageMedium(TStorageMedium.HDD);
+                tabletCtx.setTablet(olapTable.getPartition(tabletCtx.getPartitionId()).getIndex(tabletCtx.getIndexId()).getTablet(tabletCtx.getTabletId()));
+                tabletCtx.setVersionInfo(1, 0, 1, 0);
+                tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId()));
+                tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first
+
+                // createCloneReplicaAndTask, create replica will change invertedIndex too.
+                rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots(), batchTask);
+            } catch (SchedException e) {
+                LOG.warn("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage());
+            }
+        }
+
+        // Show debug info of MoveInProgressMap detail
+        rebalancer.updateLoadStatistic(statisticMap);
+        rebalancer.selectAlternativeTablets();
+
+        // Get created tasks, and finish them manually
+        List<AgentTask> tasks = batchTask.getAllTasks();
+        List<Long> needCheckTablets = tasks.stream().map(AgentTask::getTabletId).collect(Collectors.toList());
+        LOG.info("created tasks for tablet: {}", needCheckTablets);
+        needCheckTablets.forEach(t -> Assert.assertEquals(4, invertedIndex.getReplicasByTabletId(t).size()));
+
+//        // If clone task execution is too slow, tabletChecker may want to delete the CLONE replica.
+//        tabletChecker.runAfterCatalogReady();
+//        Assert.assertTrue(tabletScheduler.containsTablet(50000));
+//        // tabletScheduler handle redundant
+//        tabletScheduler.runAfterCatalogReady();
+
+        for (Long tabletId : needCheckTablets) {
+            TabletSchedCtx tabletSchedCtx = alternativeTablets.stream().filter(ctx -> ctx.getTabletId() == tabletId).collect(onlyElement());
+            AgentTask task = tasks.stream().filter(t -> t.getTabletId() == tabletId).collect(onlyElement());
+
+            LOG.info("try to finish tabletCtx {}", tabletId);
+            try {
+                TFinishTaskRequest fakeReq = new TFinishTaskRequest();
+                fakeReq.task_status = new TStatus(TStatusCode.OK);
+                fakeReq.finish_tablet_infos = Lists.newArrayList(new TTabletInfo(tabletSchedCtx.getTabletId(), 5, 1, 0, 0, 0));
+                tabletSchedCtx.finishCloneTask((CloneTask) task, fakeReq);
+            } catch (SchedException e) {
+                e.printStackTrace();
+            }
+        }
+
+        // NeedCheckTablets are redundant, TabletChecker will add them to TabletScheduler
+        tabletChecker.runAfterCatalogReady();
+        needCheckTablets.forEach(t -> Assert.assertEquals(4, invertedIndex.getReplicasByTabletId(t).size()));
+        needCheckTablets.forEach(t -> Assert.assertTrue(tabletScheduler.containsTablet(t)));
+
+        // TabletScheduler handle redundant tablet
+        tabletScheduler.runAfterCatalogReady();
+
+        // One replica is set to DECOMMISSION, still 4 replicas
+        needCheckTablets.forEach(t -> {
+            List<Replica> replicas = invertedIndex.getReplicasByTabletId(t);
+            Assert.assertEquals(4, replicas.size());
+            Replica decommissionedReplica = replicas.stream().filter(r -> r.getState() == Replica.ReplicaState.DECOMMISSION).collect(onlyElement());
+            // expected watermarkTxnId is 111
+            Assert.assertEquals(111, decommissionedReplica.getWatermarkTxnId());
+        });
+
+        // Delete replica should change invertedIndex too
+        tabletScheduler.runAfterCatalogReady();
+        needCheckTablets.forEach(t -> Assert.assertEquals(3, invertedIndex.getReplicasByTabletId(t).size()));
+
+        // Check moves completed
+        rebalancer.selectAlternativeTablets();
+        rebalancer.updateLoadStatistic(statisticMap);
+        AtomicLong succeeded = Deencapsulation.getField(rebalancer, "counterBalanceMoveSucceeded");
+        Assert.assertEquals(needCheckTablets.size(), succeeded.get());
+    }
+
+    @Test
+    public void testMoveInProgressMap() {
+        Configurator.setLevel("org.apache.doris.clone.MovesInProgressCache", Level.DEBUG);
+        MovesCacheMap m = new MovesCacheMap();
+        m.updateMapping(statisticMap, 3);
+        m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.HDD).get().put(1L, new Pair<>(null, -1L));
+        m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.SSD).get().put(2L, new Pair<>(null, -1L));
+        m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.SSD).get().put(3L, new Pair<>(null, -1L));
+        // Maintenance won't clean up the entries of cache
+        m.maintain();
+        Assert.assertEquals(3, m.size());
+
+        // Reset the expireAfterAccess, the whole cache map will be cleared.
+        m.updateMapping(statisticMap, 1);
+        Assert.assertEquals(0, m.size());
+
+        m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.SSD).get().put(3L, new Pair<>(null, -1L));
+        try {
+            Thread.sleep(1000);
+            m.maintain();
+            Assert.assertEquals(0, m.size());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}
+
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
new file mode 100644
index 0000000..6755f03
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStorageMedium;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+public class RebalancerTestUtil {
+
+    // Add only one path, PathHash:id
+    public static Backend createBackend(long id, long totalCap, long usedCap) {
+        // ip:port won't be checked
+        Backend be = new Backend(id, "192.168.0." + id, 9051);
+        Map<String, DiskInfo> disks = Maps.newHashMap();
+        DiskInfo diskInfo = new DiskInfo("/path1");
+        diskInfo.setPathHash(id);
+        diskInfo.setTotalCapacityB(totalCap);
+        diskInfo.setDataUsedCapacityB(usedCap);
+        disks.put(diskInfo.getRootPath(), diskInfo);
+        be.setDisks(ImmutableMap.copyOf(disks));
+        be.setAlive(true);
+        be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
+        return be;
+    }
+
+    // Create one tablet(and its replicas) for one partition. The replicas will created on backends which are numbered in beIds.
+    // The tablet will be added to TabletInvertedIndex & OlapTable.
+    // Only use the partition's baseIndex for simplicity
+    public static void createTablet(TabletInvertedIndex invertedIndex, Database db, OlapTable olapTable, String partitionName, TStorageMedium medium,
+                                    int tabletId, List<Long> beIds) {
+        Partition partition = olapTable.getPartition(partitionName);
+        MaterializedIndex baseIndex = partition.getBaseIndex();
+        int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId());
+
+        TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partition.getId(), baseIndex.getId(),
+                schemaHash, medium);
+        Tablet tablet = new Tablet(tabletId);
+
+        // add tablet to olapTable
+        olapTable.getPartition("p0").getBaseIndex().addTablet(tablet, tabletMeta);
+        createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds);
+    }
+
+    // Create replicas on backends which are numbered in beIds.
+    // The tablet & replicas will be added to invertedIndex.
+    public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex, TabletMeta tabletMeta, Tablet tablet, List<Long> beIds) {
+        invertedIndex.addTablet(tablet.getId(), tabletMeta);
+
+        IntStream.range(0, beIds.size()).forEach(i -> {
+            Replica replica = new Replica(tablet.getId() + i, beIds.get(i), Replica.ReplicaState.NORMAL, 1, 0, tabletMeta.getOldSchemaHash());
+            // We've set pathHash to beId for simplicity
+            replica.setPathHash(beIds.get(i));
+            // isRestore set true, to avoid modifying Catalog.getCurrentInvertedIndex
+            tablet.addReplica(replica, true);
+            invertedIndex.addReplica(tablet.getId(), replica);
+        });
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgoTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgoTest.java
new file mode 100644
index 0000000..f36f601
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgoTest.java
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo;
+import org.apache.doris.clone.TwoDimensionalGreedyRebalanceAlgo.PartitionMove;
+import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo;
+import org.apache.doris.common.Pair;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+public class TwoDimensionalGreedyRebalanceAlgoTest {
+    private static final Logger LOG = LogManager.getLogger(TwoDimensionalGreedyRebalanceAlgoTest.class);
+
+    TwoDimensionalGreedyRebalanceAlgo algo = new TwoDimensionalGreedyRebalanceAlgo(TwoDimensionalGreedyRebalanceAlgo.EqualSkewOption.PICK_FIRST);
+
+    // Structure to describe rebalancing-related state of the cluster expressively
+    // enough for the tests.
+    private static class TestClusterConfig {
+        static class PartitionPerBeReplicas {
+            Long partitionId;
+            Long indexId;
+
+            // Number of replicas of this partition on each server in the cluster.
+            // By definition, the indices in this container correspond to indices
+            // in TestClusterConfig::beIds.
+            List<Long> numReplicasByServer;
+
+            PartitionPerBeReplicas(Long p, Long i, List<Long> l) {
+                this.partitionId = p;
+                this.indexId = i;
+                this.numReplicasByServer = l;
+            }
+        }
+
+        // IDs of bes; every element must be unique.
+        List<Long> beIds = Lists.newArrayList();
+
+        // Distribution of partition replicas across the bes. The following
+        // constraints should be in place:
+        //   * for each p in partitionReplicas:
+        //       p.numReplicasByServer.size() == beIds.size()
+        List<PartitionPerBeReplicas> partitionReplicas = Lists.newArrayList();
+
+        // The expected replica movements: the reference output of the algorithm
+        // to compare with.
+        List<PartitionMove> expectedMoves = Lists.newArrayList();
+
+        // TODO MovesOrderingComparison: Options controlling how the reference and the actual results are compared.
+        // PartitionBalanceInfos in skew map are arbitrary ordering, so we can't get the fixed moves
+        // when more than one partition have the maxSkew.
+    }
+
+    // Transform the definition of the test cluster into the ClusterInfo
+    // that is consumed by the rebalancing algorithm.
+    private ClusterBalanceInfo ClusterConfigToClusterBalanceInfo(TestClusterConfig tcc) {
+        // First verify that the configuration of the test cluster is valid.
+        Set<Pair<Long, Long>> partitionIds = Sets.newHashSet();
+        for (TestClusterConfig.PartitionPerBeReplicas p : tcc.partitionReplicas) {
+            Assert.assertEquals(tcc.beIds.size(), p.numReplicasByServer.size());
+            partitionIds.add(new Pair<>(p.partitionId, p.indexId));
+        }
+        Assert.assertEquals(partitionIds.size(), tcc.partitionReplicas.size());
+
+        // Check for uniqueness of the tablet servers' identifiers.
+        Set<Long> beIdSet = new HashSet<>(tcc.beIds);
+        Assert.assertEquals(tcc.beIds.size(), beIdSet.size());
+
+        ClusterBalanceInfo balance = new ClusterBalanceInfo();
+
+        for (int beIdx = 0; beIdx < tcc.beIds.size(); ++beIdx) {
+            // Total replica count at the tablet server.
+            long count = 0;
+            for (TestClusterConfig.PartitionPerBeReplicas p : tcc.partitionReplicas) {
+                count += p.numReplicasByServer.get(beIdx);
+            }
+            balance.beByTotalReplicaCount.put(count, tcc.beIds.get(beIdx));
+        }
+
+        for (int pIdx = 0; pIdx < tcc.partitionReplicas.size(); ++pIdx) {
+            // Replicas of the current partition per be.
+            TestClusterConfig.PartitionPerBeReplicas distribution = tcc.partitionReplicas.get(pIdx);
+            PartitionBalanceInfo info = new PartitionBalanceInfo(distribution.partitionId, distribution.indexId);
+            List<Long> replicaCount = distribution.numReplicasByServer;
+            IntStream.range(0, replicaCount.size()).forEach(i -> info.beByReplicaCount.put(replicaCount.get(i), tcc.beIds.get(i)));
+
+            Long max_count = info.beByReplicaCount.keySet().last();
+            Long min_count = info.beByReplicaCount.keySet().first();
+            Assert.assertTrue(max_count >= min_count);
+            balance.partitionInfoBySkew.put(max_count - min_count, info);
+        }
+        return balance;
+    }
+
+    private void verifyMoves(List<TestClusterConfig> configs) {
+        for (TestClusterConfig config : configs) {
+            List<PartitionMove> moves = algo.getNextMoves(ClusterConfigToClusterBalanceInfo(config), 0);
+            Assert.assertEquals(moves, config.expectedMoves);
+        }
+    }
+
+    @Before
+    public void setUp() {
+        Configurator.setLevel("org.apache.doris.clone.TwoDimensionalGreedyAlgo", Level.WARN);
+    }
+
+    @Test
+    public void testApplyMoveFailed() {
+        PartitionMove move = new PartitionMove(11L, 22L, 10001L, 10002L);
+        // total count is valid
+        TreeMultimap<Long, Long> beByTotalReplicaCount = TreeMultimap.create();
+        beByTotalReplicaCount.put(10L, 10001L);
+        beByTotalReplicaCount.put(10L, 10002L);
+        // no info of partition
+        TreeMultimap<Long, PartitionBalanceInfo> skewMap = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
+        try {
+            TwoDimensionalGreedyRebalanceAlgo.applyMove(move, beByTotalReplicaCount, skewMap);
+        } catch (Exception e) {
+            Assert.assertSame(e.getClass(), IllegalStateException.class);
+            LOG.info(e.getMessage());
+        }
+        // beByTotalReplicaCount should be modified
+        Assert.assertEquals(0, beByTotalReplicaCount.keySet().stream().filter(skew -> skew != 10L).count());
+
+        // invalid info of partition
+        skewMap.put(6L, new PartitionBalanceInfo(11L, 22L));
+        try {
+            TwoDimensionalGreedyRebalanceAlgo.applyMove(move, beByTotalReplicaCount, skewMap);
+        } catch (Exception e) {
+            Assert.assertSame(e.getClass(), IllegalStateException.class);
+            LOG.warn(e.getMessage());
+        }
+        // beByTotalReplicaCount should be modified
+        Assert.assertEquals(0, beByTotalReplicaCount.keySet().stream().filter(skew -> skew != 10L).count());
+    }
+
+    @Test
+    public void testInvalidClusterBalanceInfo() {
+        Configurator.setLevel("org.apache.doris.clone.TwoDimensionalGreedyAlgo", Level.DEBUG);
+        try {
+            algo.getNextMoves(new ClusterBalanceInfo(), 0);
+        } catch (Exception e) {
+            Assert.fail();
+        }
+
+        try {
+            algo.getNextMoves(new ClusterBalanceInfo() {{
+                beByTotalReplicaCount.put(0L, 10001L);
+            }}, 0);
+        } catch (Exception e) {
+            Assert.fail();
+        }
+
+        try {
+            // Invalid balance info will cause IllegalStateException
+            algo.getNextMoves(new ClusterBalanceInfo() {
+                {
+                    beByTotalReplicaCount.put(0L, 10001L);
+                    beByTotalReplicaCount.put(1L, 10002L);
+                }
+            }, 0);
+            Assert.fail("Exception will be thrown in GetNextMoves");
+        } catch (Exception e) {
+            Assert.assertSame(e.getClass(), IllegalStateException.class);
+            LOG.info(e.getMessage());
+        }
+    }
+
+    // Partition- and cluster-wise balanced configuration with one-off skew.
+    // Algorithm won't consider about the tablet health
+    @Test
+    public void testAlreadyBalanced() {
+        List<TestClusterConfig> configs = Lists.newArrayList(
+                // A single be with a single replica of the only partition.
+                new TestClusterConfig() {{
+                    beIds.add(10001L);
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(1L)));
+                    // expectedMoves is empty
+                }},
+                // A single be in the cluster that hosts all replicas.
+                new TestClusterConfig() {{
+                    beIds.add(10001L);
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(1L)));
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 44L, Lists.newArrayList(10L)));
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 55L, Lists.newArrayList(10L)));
+                }},
+                // Single partition and 2 be: 100 and 99 replicas at each.
+                new TestClusterConfig() {{
+                    beIds.add(10001L);
+                    beIds.add(10002L);
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(100L, 99L)));
+                }}
+        );
+        verifyMoves(configs);
+    }
+
+    // TODO after MovesOrderingComparison supported
+    // Set of scenarios where the distribution of replicas is partition-wise balanced
+    // but not yet cluster-wise balanced, requiring just a few replica moves
+    // to achieve both partition- and cluster-wise balance state.
+
+    // TODO add more tests after MovesOrderingComparison supported
+    // Set of scenarios where the distribution of table replicas is cluster-wise
+    // balanced, but not table-wise balanced, requiring just few moves to make it
+    // both table- and cluster-wise balanced.
+    @Test
+    public void testClusterWiseBalanced() {
+        List<TestClusterConfig> configs = Lists.newArrayList(
+                new TestClusterConfig() {{
+                    beIds.add(10001L);
+                    beIds.add(10002L);
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(2L, 0L)));
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 44L, Lists.newArrayList(1L, 2L)));
+                    expectedMoves.add(new PartitionMove(22L, 33L, 10001L, 10002L));
+                }}
+        );
+        verifyMoves(configs);
+    }
+
+    // Unbalanced (both table- and cluster-wise) and simple enough configurations
+    // to make them balanced moving just few replicas.
+    @Test
+    public void testFewMoves() {
+        List<TestClusterConfig> configs = Lists.newArrayList(
+                new TestClusterConfig() {{
+                    beIds.add(10001L);
+                    beIds.add(10002L);
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(2L, 0L)));
+                    expectedMoves.add(new PartitionMove(22L, 33L, 10001L, 10002L));
+                }},
+                new TestClusterConfig() {{
+                    beIds.add(10001L);
+                    beIds.add(10002L);
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(3L, 0L)));
+                    expectedMoves.add(new PartitionMove(22L, 33L, 10001L, 10002L));
+                }},
+                new TestClusterConfig() {{
+                    beIds.add(10001L);
+                    beIds.add(10002L);
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(4L, 0L)));
+                    expectedMoves.add(new PartitionMove(22L, 33L, 10001L, 10002L));
+                    expectedMoves.add(new PartitionMove(22L, 33L, 10001L, 10002L));
+                }}
+        );
+        verifyMoves(configs);
+    }
+
+    // Unbalanced (both table- and cluster-wise) and simple enough configurations to
+    // make them balanced moving many replicas around.
+    @Test
+    public void testManyMoves() {
+        List<TestClusterConfig> configs = Lists.newArrayList(
+                new TestClusterConfig() {{
+                    beIds.add(10001L);
+                    beIds.add(10002L);
+                    beIds.add(10003L);
+                    partitionReplicas.add(new PartitionPerBeReplicas(22L, 33L, Lists.newArrayList(100L, 400L, 100L)));
+                    for (int i = 0; i < 200; i++) {
+                        if (i % 2 == 1) {
+                            expectedMoves.add(new PartitionMove(22L, 33L, 10002L, 10003L));
+                        } else {
+                            expectedMoves.add(new PartitionMove(22L, 33L, 10002L, 10001L));
+                        }
+                    }
+
+                }}
+        );
+        verifyMoves(configs);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org