You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/23 08:02:26 UTC

[iotdb] branch master updated: [IOTDB-4983] Greedy leader balancing policy (#8057)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 29b7f48f05 [IOTDB-4983] Greedy leader balancing policy (#8057)
29b7f48f05 is described below

commit 29b7f48f05ad67187189fef0012477a32e9657eb
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Wed Nov 23 16:02:20 2022 +0800

    [IOTDB-4983] Greedy leader balancing policy (#8057)
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  27 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |  32 +-
 .../confignode/conf/ConfigNodeStartupCheck.java    |  18 +-
 .../manager/load/balancer/RegionBalancer.java      |   1 +
 .../manager/load/balancer/RouteBalancer.java       | 117 ++++---
 .../router/leader/GreedyLeaderBalancer.java        | 185 +++++++++++
 .../{IRouter.java => leader/ILeaderBalancer.java}  |  30 +-
 .../MinCostFlowLeaderBalancer.java}                | 100 ++++--
 .../manager/load/balancer/router/mcf/MCFEdge.java  |  34 --
 .../GreedyPriorityBalancer.java}                   |  10 +-
 .../IPriorityBalancer.java}                        |  24 +-
 .../LeaderPriorityBalancer.java}                   |  10 +-
 .../router/leader/GreedyLeaderBalancerTest.java    | 146 ++++++++
 .../leader/LeaderBalancerComparisonTest.java       | 369 +++++++++++++++++++++
 .../MinCostFlowLeaderBalancerTest.java}            |  33 +-
 .../GreedyPriorityTest.java}                       |   8 +-
 .../LeaderPriorityBalancerTest.java}               |   7 +-
 .../java/org/apache/iotdb/it/env/AbstractEnv.java  |   9 +
 .../it/IoTDBClusterRegionLeaderBalancingIT.java    |  26 +-
 .../iotdb/confignode/it/IoTDBClusterRestartIT.java |   8 +-
 .../resources/conf/iotdb-common.properties         |  15 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  39 ++-
 thrift/src/main/thrift/datanode.thrift             |   6 +-
 23 files changed, 1016 insertions(+), 238 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index dd98569d05..d76920f635 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -21,7 +21,8 @@ package org.apache.iotdb.confignode.conf;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 
@@ -147,11 +148,11 @@ public class ConfigNodeConfig {
   /** The unknown DataNode detect interval in milliseconds */
   private long unknownDataNodeDetectInterval = heartbeatIntervalInMs;
 
-  /** The routing policy of read/write requests */
-  private String routingPolicy = RouteBalancer.LEADER_POLICY;
+  /** The policy of cluster RegionGroups' leader distribution */
+  private String leaderDistributionPolicy = ILeaderBalancer.MIN_COST_FLOW_POLICY;
 
-  /** The ConfigNode-leader will automatically balance leader distribution if set true */
-  private boolean enableLeaderBalancing = false;
+  /** The route priority policy of cluster read/write requests */
+  private String routePriorityPolicy = IPriorityBalancer.LEADER_POLICY;
 
   private String readConsistencyLevel = "strong";
 
@@ -547,20 +548,20 @@ public class ConfigNodeConfig {
     this.unknownDataNodeDetectInterval = unknownDataNodeDetectInterval;
   }
 
-  public String getRoutingPolicy() {
-    return routingPolicy;
+  public String getLeaderDistributionPolicy() {
+    return leaderDistributionPolicy;
   }
 
-  public void setRoutingPolicy(String routingPolicy) {
-    this.routingPolicy = routingPolicy;
+  public void setLeaderDistributionPolicy(String leaderDistributionPolicy) {
+    this.leaderDistributionPolicy = leaderDistributionPolicy;
   }
 
-  public boolean isEnableLeaderBalancing() {
-    return enableLeaderBalancing;
+  public String getRoutePriorityPolicy() {
+    return routePriorityPolicy;
   }
 
-  public void setEnableLeaderBalancing(boolean enableLeaderBalancing) {
-    this.enableLeaderBalancing = enableLeaderBalancing;
+  public void setRoutePriorityPolicy(String routePriorityPolicy) {
+    this.routePriorityPolicy = routePriorityPolicy;
   }
 
   public String getReadConsistencyLevel() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 44b58731a6..ee88d737c9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -24,7 +24,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 
 import org.slf4j.Logger;
@@ -296,20 +297,31 @@ public class ConfigNodeDescriptor {
                     "heartbeat_interval_in_ms", String.valueOf(conf.getHeartbeatIntervalInMs()))
                 .trim()));
 
-    String routingPolicy = properties.getProperty("routing_policy", conf.getRoutingPolicy()).trim();
-    if (routingPolicy.equals(RouteBalancer.GREEDY_POLICY)
-        || routingPolicy.equals(RouteBalancer.LEADER_POLICY)) {
-      conf.setRoutingPolicy(routingPolicy);
+    String leaderDistributionPolicy =
+        properties
+            .getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy())
+            .trim();
+    if (ILeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
+        || ILeaderBalancer.MIN_COST_FLOW_POLICY.equals(leaderDistributionPolicy)) {
+      conf.setLeaderDistributionPolicy(leaderDistributionPolicy);
     } else {
       throw new IOException(
           String.format(
-              "Unknown routing_policy: %s, please set to \"leader\" or \"greedy\"", routingPolicy));
+              "Unknown leader_distribution_policy: %s, please set to \"GREEDY\" or \"MIN_COST_FLOW\"",
+              leaderDistributionPolicy));
     }
 
-    conf.setEnableLeaderBalancing(
-        Boolean.parseBoolean(
-            properties.getProperty(
-                "enable_leader_balancing", String.valueOf(conf.isEnableLeaderBalancing()))));
+    String routePriorityPolicy =
+        properties.getProperty("route_priority_policy", conf.getRoutePriorityPolicy()).trim();
+    if (IPriorityBalancer.GREEDY_POLICY.equals(routePriorityPolicy)
+        || IPriorityBalancer.LEADER_POLICY.equals(routePriorityPolicy)) {
+      conf.setRoutePriorityPolicy(routePriorityPolicy);
+    } else {
+      throw new IOException(
+          String.format(
+              "Unknown route_priority_policy: %s, please set to \"LEADER\" or \"GREEDY\"",
+              routePriorityPolicy));
+    }
 
     String readConsistencyLevel =
         properties.getProperty("read_consistency_level", conf.getReadConsistencyLevel()).trim();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 9b91c9ca84..50dff4cebc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -21,7 +21,8 @@ package org.apache.iotdb.confignode.conf;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
 import org.apache.iotdb.consensus.ConsensusFactory;
 
 import org.slf4j.Logger;
@@ -92,11 +93,18 @@ public class ConfigNodeStartupCheck {
               "%s or %s", ConsensusFactory.SIMPLE_CONSENSUS, ConsensusFactory.RATIS_CONSENSUS));
     }
 
-    // The routing policy is limited
-    if (!CONF.getRoutingPolicy().equals(RouteBalancer.LEADER_POLICY)
-        && !CONF.getRoutingPolicy().equals(RouteBalancer.GREEDY_POLICY)) {
+    // The leader distribution policy is limited
+    if (!ILeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
+        && !ILeaderBalancer.MIN_COST_FLOW_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
       throw new ConfigurationException(
-          "routing_policy", CONF.getRoutingPolicy(), "leader or greedy");
+          "leader_distribution_policy", CONF.getRoutePriorityPolicy(), "GREEDY or MIN_COST_FLOW");
+    }
+
+    // The route priority policy is limited
+    if (!CONF.getRoutePriorityPolicy().equals(IPriorityBalancer.LEADER_POLICY)
+        && !CONF.getRoutePriorityPolicy().equals(IPriorityBalancer.GREEDY_POLICY)) {
+      throw new ConfigurationException(
+          "route_priority_policy", CONF.getRoutePriorityPolicy(), "LEADER or GREEDY");
     }
 
     // The ip of target ConfigNode couldn't be 0.0.0.0
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 43dcf46038..e16541d6bc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -125,6 +125,7 @@ public class RegionBalancer {
     switch (regionAllocateStrategy) {
       case COPY_SET:
         return new CopySetRegionAllocator();
+      case GREEDY:
       default:
         return new GreedyRegionAllocator();
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 5ed4134658..6e0a7d95a3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -33,11 +33,13 @@ import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.load.balancer.router.IRouter;
-import org.apache.iotdb.confignode.manager.load.balancer.router.LeaderRouter;
-import org.apache.iotdb.confignode.manager.load.balancer.router.LoadScoreGreedyRouter;
 import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
-import org.apache.iotdb.confignode.manager.load.balancer.router.mcf.MCFLeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -70,17 +72,12 @@ public class RouteBalancer {
   private static final Logger LOGGER = LoggerFactory.getLogger(RouteBalancer.class);
 
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
-  private static final boolean ENABLE_LEADER_BALANCING = CONF.isEnableLeaderBalancing();
   private static final String SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS =
       CONF.getSchemaRegionConsensusProtocolClass();
   private static final String DATA_REGION_CONSENSUS_PROTOCOL_CLASS =
       CONF.getDataRegionConsensusProtocolClass();
   private static final boolean isMultiLeader =
-      ConsensusFactory.MULTI_LEADER_CONSENSUS.equals(
-          ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
-
-  public static final String LEADER_POLICY = "leader";
-  public static final String GREEDY_POLICY = "greedy";
+      ConsensusFactory.MULTI_LEADER_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass());
 
   private final IManager configManager;
 
@@ -92,8 +89,10 @@ public class RouteBalancer {
 
   /** RegionRouteMap */
   private final RegionRouteMap regionRouteMap;
-  // For generating optimal RegionRouteMap
-  private final IRouter router;
+  // For generating optimal RegionLeaderMap
+  private final ILeaderBalancer leaderBalancer;
+  // For generating optimal RegionPriorityMap
+  private final IPriorityBalancer priorityRouter;
 
   /** Leader Balancing service */
   private Future<?> currentLeaderBalancingFuture;
@@ -107,13 +106,24 @@ public class RouteBalancer {
 
     this.leaderCache = new ConcurrentHashMap<>();
     this.regionRouteMap = new RegionRouteMap();
-    switch (ConfigNodeDescriptor.getInstance().getConf().getRoutingPolicy()) {
-      case GREEDY_POLICY:
-        this.router = new LoadScoreGreedyRouter();
+
+    switch (CONF.getLeaderDistributionPolicy()) {
+      case ILeaderBalancer.GREEDY_POLICY:
+        this.leaderBalancer = new GreedyLeaderBalancer();
+        break;
+      case ILeaderBalancer.MIN_COST_FLOW_POLICY:
+      default:
+        this.leaderBalancer = new MinCostFlowLeaderBalancer();
         break;
-      case LEADER_POLICY:
+    }
+
+    switch (CONF.getRoutePriorityPolicy()) {
+      case IPriorityBalancer.GREEDY_POLICY:
+        this.priorityRouter = new GreedyPriorityBalancer();
+        break;
+      case IPriorityBalancer.LEADER_POLICY:
       default:
-        this.router = new LeaderRouter();
+        this.priorityRouter = new LeaderPriorityBalancer();
         break;
     }
   }
@@ -173,13 +183,13 @@ public class RouteBalancer {
 
     // Balancing region priority in each SchemaRegionGroup
     Map<TConsensusGroupId, TRegionReplicaSet> latestRegionPriorityMap =
-        router.getLatestRegionRouteMap(
+        priorityRouter.generateOptimalRoutePriority(
             getPartitionManager().getAllReplicaSets(TConsensusGroupType.SchemaRegion),
             regionLeaderMap,
             dataNodeLoadScoreMap);
     // Balancing region priority in each DataRegionGroup
     latestRegionPriorityMap.putAll(
-        router.getLatestRegionRouteMap(
+        priorityRouter.generateOptimalRoutePriority(
             getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion),
             regionLeaderMap,
             dataNodeLoadScoreMap));
@@ -230,34 +240,30 @@ public class RouteBalancer {
 
   /** Start the route balancing service */
   public void startRouteBalancingService() {
-    if (ENABLE_LEADER_BALANCING) {
-      synchronized (scheduleMonitor) {
-        if (currentLeaderBalancingFuture == null) {
-          currentLeaderBalancingFuture =
-              ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-                  leaderBalancingExecutor,
-                  this::balancingRegionLeader,
-                  0,
-                  // Execute route balancing service in every 5 loops of heartbeat service
-                  NodeManager.HEARTBEAT_INTERVAL * 5,
-                  TimeUnit.MILLISECONDS);
-          LOGGER.info("Route-Balancing service is started successfully.");
-        }
+    synchronized (scheduleMonitor) {
+      if (currentLeaderBalancingFuture == null) {
+        currentLeaderBalancingFuture =
+            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                leaderBalancingExecutor,
+                this::balancingRegionLeader,
+                0,
+                // Execute route balancing service in every 5 loops of heartbeat service
+                NodeManager.HEARTBEAT_INTERVAL * 5,
+                TimeUnit.MILLISECONDS);
+        LOGGER.info("Route-Balancing service is started successfully.");
       }
     }
   }
 
   /** Stop the route balancing service */
   public void stopRouteBalancingService() {
-    if (ENABLE_LEADER_BALANCING) {
-      synchronized (scheduleMonitor) {
-        if (currentLeaderBalancingFuture != null) {
-          currentLeaderBalancingFuture.cancel(false);
-          currentLeaderBalancingFuture = null;
-          leaderCache.clear();
-          regionRouteMap.clear();
-          LOGGER.info("Route-Balancing service is stopped successfully.");
-        }
+    synchronized (scheduleMonitor) {
+      if (currentLeaderBalancingFuture != null) {
+        currentLeaderBalancingFuture.cancel(false);
+        currentLeaderBalancingFuture = null;
+        leaderCache.clear();
+        regionRouteMap.clear();
+        LOGGER.info("Route-Balancing service is stopped successfully.");
       }
     }
   }
@@ -268,9 +274,9 @@ public class RouteBalancer {
   }
 
   private void balancingRegionLeader(TConsensusGroupType regionGroupType) {
-    // Collect latest data to generate leaderBalancer
-    MCFLeaderBalancer leaderBalancer =
-        new MCFLeaderBalancer(
+    // Collect the latest data and generate the optimal leader distribution
+    Map<TConsensusGroupId, Integer> leaderDistribution =
+        leaderBalancer.generateOptimalLeaderDistribution(
             getPartitionManager().getAllReplicaSetsMap(regionGroupType),
             regionRouteMap.getRegionLeaderMap(),
             getNodeManager()
@@ -281,10 +287,6 @@ public class RouteBalancer {
                 .map(TDataNodeLocation::getDataNodeId)
                 .collect(Collectors.toSet()));
 
-    // Calculate the optimal leader distribution
-    Map<TConsensusGroupId, Integer> leaderDistribution =
-        leaderBalancer.generateOptimalLeaderDistribution();
-
     // Transfer leader to the optimal distribution
     AtomicInteger requestId = new AtomicInteger(0);
     AsyncClientHandler<TRegionLeaderChangeReq, TSStatus> clientHandler =
@@ -307,9 +309,14 @@ public class RouteBalancer {
                 regionGroupId,
                 newLeaderId);
             changeRegionLeader(
-                consensusProtocolClass, requestId, clientHandler, regionGroupId, newLeaderId);
+                consensusProtocolClass,
+                requestId,
+                clientHandler,
+                regionGroupId,
+                getNodeManager().getRegisteredDataNode(newLeaderId).getLocation());
           }
         });
+
     if (requestId.get() > 0) {
       AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
     }
@@ -325,12 +332,12 @@ public class RouteBalancer {
       AtomicInteger requestId,
       AsyncClientHandler<TRegionLeaderChangeReq, TSStatus> clientHandler,
       TConsensusGroupId regionGroupId,
-      int newLeaderId) {
+      TDataNodeLocation newLeader) {
     switch (consensusProtocolClass) {
       case ConsensusFactory.MULTI_LEADER_CONSENSUS:
         // For multi-leader protocol, change RegionRouteMap is enough.
         // And the result will be broadcast by Cluster-LoadStatistics-Service soon.
-        regionRouteMap.setLeader(regionGroupId, newLeaderId);
+        regionRouteMap.setLeader(regionGroupId, newLeader.getDataNodeId());
         break;
       case ConsensusFactory.RATIS_CONSENSUS:
       default:
@@ -339,9 +346,11 @@ public class RouteBalancer {
         // And the RegionRouteMap will be updated by Cluster-Heartbeat-Service later if change
         // leader success.
         TRegionLeaderChangeReq regionLeaderChangeReq =
-            new TRegionLeaderChangeReq(
-                regionGroupId, getNodeManager().getRegisteredDataNode(newLeaderId).getLocation());
-        clientHandler.putRequest(requestId.getAndIncrement(), regionLeaderChangeReq);
+            new TRegionLeaderChangeReq(regionGroupId, newLeader);
+        int requestIndex = requestId.getAndIncrement();
+        clientHandler.putRequest(requestIndex, regionLeaderChangeReq);
+        clientHandler.putDataNodeLocation(requestIndex, newLeader);
+        break;
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
new file mode 100644
index 0000000000..21c4356bac
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class GreedyLeaderBalancer implements ILeaderBalancer {
+
+  private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+  private final Map<TConsensusGroupId, Integer> regionLeaderMap;
+  private final Set<Integer> disabledDataNodeSet;
+
+  public GreedyLeaderBalancer() {
+    this.regionReplicaSetMap = new HashMap<>();
+    this.regionLeaderMap = new ConcurrentHashMap<>();
+    this.disabledDataNodeSet = new HashSet<>();
+  }
+
+  @Override
+  public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap,
+      Set<Integer> disabledDataNodeSet) {
+    initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+
+    Map<TConsensusGroupId, Integer> result = constructGreedyDistribution();
+
+    clear();
+    return result;
+  }
+
+  private void initialize(
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap,
+      Set<Integer> disabledDataNodeSet) {
+    this.regionReplicaSetMap.putAll(regionReplicaSetMap);
+    this.regionLeaderMap.putAll(regionLeaderMap);
+    this.disabledDataNodeSet.addAll(disabledDataNodeSet);
+  }
+
+  private void clear() {
+    this.regionReplicaSetMap.clear();
+    this.regionLeaderMap.clear();
+    this.disabledDataNodeSet.clear();
+  }
+
+  private Map<TConsensusGroupId, Integer> constructGreedyDistribution() {
+    /* Count the number of leaders that each DataNode have */
+    // Map<DataNodeId, leader count>
+    Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+    regionReplicaSetMap.forEach(
+        (regionGroupId, regionReplicaSet) ->
+            regionReplicaSet
+                .getDataNodeLocations()
+                .forEach(
+                    dataNodeLocation ->
+                        leaderCounter.putIfAbsent(
+                            dataNodeLocation.getDataNodeId(), new AtomicInteger(0))));
+    regionLeaderMap.forEach(
+        (regionGroupId, leaderId) -> leaderCounter.get(leaderId).getAndIncrement());
+
+    /* Ensure all RegionGroups' leader are not inside disabled DataNodes */
+    for (TConsensusGroupId regionGroupId : regionReplicaSetMap.keySet()) {
+      int leaderId = regionLeaderMap.get(regionGroupId);
+      if (disabledDataNodeSet.contains(leaderId)) {
+        int newLeaderId = -1;
+        int newLeaderWeight = Integer.MAX_VALUE;
+        for (TDataNodeLocation candidate :
+            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+          int candidateId = candidate.getDataNodeId();
+          int candidateWeight = leaderCounter.get(candidateId).get();
+          // Select the available DataNode with the fewest leaders
+          if (!disabledDataNodeSet.contains(candidateId) && candidateWeight < newLeaderWeight) {
+            newLeaderId = candidateId;
+            newLeaderWeight = candidateWeight;
+          }
+        }
+
+        if (newLeaderId != -1) {
+          leaderCounter.get(leaderId).getAndDecrement();
+          leaderCounter.get(newLeaderId).getAndIncrement();
+          regionLeaderMap.replace(regionGroupId, newLeaderId);
+        }
+      }
+    }
+
+    /* Double keyword sort */
+    List<WeightEntry> weightList = new ArrayList<>();
+    for (TConsensusGroupId regionGroupId : regionReplicaSetMap.keySet()) {
+      int leaderId = regionLeaderMap.get(regionGroupId);
+      int leaderWeight = leaderCounter.get(regionLeaderMap.get(regionGroupId)).get();
+
+      int followerWeight = Integer.MAX_VALUE;
+      for (TDataNodeLocation follower :
+          regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+        int followerId = follower.getDataNodeId();
+        if (followerId != leaderId) {
+          followerWeight = Math.min(followerWeight, leaderCounter.get(followerId).get());
+        }
+      }
+
+      weightList.add(new WeightEntry(regionGroupId, leaderWeight, followerWeight));
+    }
+    weightList.sort(WeightEntry.COMPARATOR);
+
+    /* Greedy distribution */
+    for (WeightEntry weightEntry : weightList) {
+      TConsensusGroupId regionGroupId = weightEntry.regionGroupId;
+      int leaderId = regionLeaderMap.get(regionGroupId);
+      int leaderWeight = leaderCounter.get(regionLeaderMap.get(regionGroupId)).get();
+
+      int newLeaderId = -1;
+      int newLeaderWeight = Integer.MAX_VALUE;
+      for (TDataNodeLocation candidate :
+          regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+        int candidateId = candidate.getDataNodeId();
+        int candidateWeight = leaderCounter.get(candidateId).get();
+        if (!disabledDataNodeSet.contains(candidateId)
+            && candidateId != leaderId
+            && candidateWeight < newLeaderWeight) {
+          newLeaderId = candidateId;
+          newLeaderWeight = candidateWeight;
+        }
+      }
+
+      // Redistribution takes effect only when leaderWeight - newLeaderWeight > 1.
+      // i.e. Redistribution can reduce the range of the number of leaders that each DataNode owns.
+      if (leaderWeight - newLeaderWeight > 1) {
+        leaderCounter.get(leaderId).getAndDecrement();
+        leaderCounter.get(newLeaderId).getAndIncrement();
+        regionLeaderMap.replace(regionGroupId, newLeaderId);
+      }
+    }
+
+    return new ConcurrentHashMap<>(regionLeaderMap);
+  }
+
+  private static class WeightEntry {
+
+    private final TConsensusGroupId regionGroupId;
+    // The number of leaders owned by DataNode where the RegionGroup's leader resides
+    private final int firstKey;
+    // The minimum number of leaders owned by DataNode where the  RegionGroup's followers reside
+    private final int secondKey;
+
+    private WeightEntry(TConsensusGroupId regionGroupId, int firstKey, int secondKey) {
+      this.regionGroupId = regionGroupId;
+      this.firstKey = firstKey;
+      this.secondKey = secondKey;
+    }
+
+    // Compare the first key by descending order and the second key by ascending order.
+    private static final Comparator<WeightEntry> COMPARATOR =
+        (o1, o2) ->
+            o1.firstKey == o2.firstKey ? o1.secondKey - o2.secondKey : o2.firstKey - o1.firstKey;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
similarity index 58%
copy from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
index b6e875f77c..a69ccc9491 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
@@ -16,30 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-/**
- * The IRouter is a functional interface, which means a new functional class who implements the
- * IRouter must be created for generating the latest real-time routing policy.
- */
-public interface IRouter {
+public interface ILeaderBalancer {
+
+  String GREEDY_POLICY = "GREEDY";
+  String MIN_COST_FLOW_POLICY = "MIN_COST_FLOW";
 
   /**
-   * Generate an optimal real-time read/write requests routing policy.
+   * Generate an optimal leader distribution.
    *
-   * @param replicaSets All RegionReplicasEts currently owned by the cluster
-   * @return Map<TConsensusGroupId, TRegionReplicaSet>, The routing policy of read/write requests
-   *     for each Region is based on the order in the TRegionReplicaSet. The replica with higher
-   *     sorting result have higher priority.
+   * @param regionReplicaSetMap All RegionGroups the cluster currently have
+   * @param regionLeaderMap The current leader of each RegionGroup
+   * @param disabledDataNodeSet The DataNodes that currently unable to work(can't place
+   *     RegionGroup-leader)
+   * @return Map<TConsensusGroupId, Integer>, The optimal leader distribution
    */
-  Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap(
-      List<TRegionReplicaSet> replicaSets,
+  Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
-      Map<Integer, Long> dataNodeLoadScoreMap);
+      Set<Integer> disabledDataNodeSet);
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
similarity index 79%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index 5dbd292964..e07bd3e288 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -35,7 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /** Leader distribution balancer that uses minimum cost flow algorithm */
-public class MCFLeaderBalancer {
+public class MinCostFlowLeaderBalancer implements ILeaderBalancer {
 
   private static final int INFINITY = Integer.MAX_VALUE;
 
@@ -63,7 +64,7 @@ public class MCFLeaderBalancer {
   // Maximum index of graph edges
   private int maxEdge = 0;
 
-  private final List<MCFEdge> mcfEdges;
+  private final List<MinCostFlowEdge> minCostFlowEdges;
   private int[] nodeHeadEdge;
   private int[] nodeCurrentEdge;
 
@@ -73,28 +74,64 @@ public class MCFLeaderBalancer {
   private int maximumFlow = 0;
   private int minimumCost = 0;
 
-  public MCFLeaderBalancer(
-      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
-      Map<TConsensusGroupId, Integer> regionLeaderMap,
-      Set<Integer> disabledDataNodeSet) {
-    this.regionReplicaSetMap = regionReplicaSetMap;
-    this.regionLeaderMap = regionLeaderMap;
-    this.disabledDataNodeSet = disabledDataNodeSet;
-
+  public MinCostFlowLeaderBalancer() {
+    this.regionReplicaSetMap = new HashMap<>();
+    this.regionLeaderMap = new HashMap<>();
+    this.disabledDataNodeSet = new HashSet<>();
     this.rNodeMap = new HashMap<>();
     this.dNodeMap = new HashMap<>();
     this.dNodeReflect = new HashMap<>();
-
-    this.mcfEdges = new ArrayList<>();
+    this.minCostFlowEdges = new ArrayList<>();
   }
 
-  public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution() {
+  @Override
+  public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap,
+      Set<Integer> disabledDataNodeSet) {
+
+    initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+
+    Map<TConsensusGroupId, Integer> result;
     constructMCFGraph();
     dinicAlgorithm();
-    return collectLeaderDistribution();
+    result = collectLeaderDistribution();
+
+    clear();
+    return result;
+  }
+
+  private void initialize(
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap,
+      Set<Integer> disabledDataNodeSet) {
+    this.regionReplicaSetMap.putAll(regionReplicaSetMap);
+    this.regionLeaderMap.putAll(regionLeaderMap);
+    this.disabledDataNodeSet.addAll(disabledDataNodeSet);
+  }
+
+  private void clear() {
+    this.regionReplicaSetMap.clear();
+    this.regionLeaderMap.clear();
+    this.disabledDataNodeSet.clear();
+    this.rNodeMap.clear();
+    this.dNodeMap.clear();
+    this.dNodeReflect.clear();
+    this.minCostFlowEdges.clear();
+
+    this.nodeHeadEdge = null;
+    this.nodeCurrentEdge = null;
+    this.isNodeVisited = null;
+    this.nodeMinimumCost = null;
+
+    this.maxNode = tNode + 1;
+    this.maxEdge = 0;
   }
 
   private void constructMCFGraph() {
+    this.maximumFlow = 0;
+    this.minimumCost = 0;
+
     /* Indicate nodes in mcf */
     for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
       rNodeMap.put(regionReplicaSet.getRegionId(), maxNode++);
@@ -177,8 +214,8 @@ public class MCFLeaderBalancer {
   }
 
   private void addEdge(int fromNode, int destNode, int capacity, int cost) {
-    MCFEdge edge = new MCFEdge(destNode, capacity, cost, nodeHeadEdge[fromNode]);
-    mcfEdges.add(edge);
+    MinCostFlowEdge edge = new MinCostFlowEdge(destNode, capacity, cost, nodeHeadEdge[fromNode]);
+    minCostFlowEdges.add(edge);
     nodeHeadEdge[fromNode] = maxEdge++;
   }
 
@@ -203,8 +240,8 @@ public class MCFLeaderBalancer {
       isNodeVisited[currentNode] = false;
       for (int currentEdge = nodeHeadEdge[currentNode];
           currentEdge >= 0;
-          currentEdge = mcfEdges.get(currentEdge).nextEdge) {
-        MCFEdge edge = mcfEdges.get(currentEdge);
+          currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
+        MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
         if (edge.capacity > 0
             && nodeMinimumCost[currentNode] + edge.cost < nodeMinimumCost[edge.destNode]) {
           nodeMinimumCost[edge.destNode] = nodeMinimumCost[currentNode] + edge.cost;
@@ -230,8 +267,8 @@ public class MCFLeaderBalancer {
     isNodeVisited[currentNode] = true;
     for (currentEdge = nodeCurrentEdge[currentNode];
         currentEdge >= 0;
-        currentEdge = mcfEdges.get(currentEdge).nextEdge) {
-      MCFEdge edge = mcfEdges.get(currentEdge);
+        currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
+      MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
       if (nodeMinimumCost[currentNode] + edge.cost == nodeMinimumCost[edge.destNode]
           && edge.capacity > 0
           && !isNodeVisited[edge.destNode]) {
@@ -241,7 +278,7 @@ public class MCFLeaderBalancer {
         minimumCost += subOutputFlow * edge.cost;
 
         edge.capacity -= subOutputFlow;
-        mcfEdges.get(currentEdge ^ 1).capacity += subOutputFlow;
+        minCostFlowEdges.get(currentEdge ^ 1).capacity += subOutputFlow;
 
         inputFlow -= subOutputFlow;
         outputFlow += subOutputFlow;
@@ -278,8 +315,8 @@ public class MCFLeaderBalancer {
           boolean matchLeader = false;
           for (int currentEdge = nodeHeadEdge[rNode];
               currentEdge >= 0;
-              currentEdge = mcfEdges.get(currentEdge).nextEdge) {
-            MCFEdge edge = mcfEdges.get(currentEdge);
+              currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
+            MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
             if (edge.destNode != sNode && edge.capacity == 0) {
               matchLeader = true;
               result.put(regionGroupId, dNodeReflect.get(edge.destNode));
@@ -302,4 +339,19 @@ public class MCFLeaderBalancer {
   public int getMinimumCost() {
     return minimumCost;
   }
+
+  private static class MinCostFlowEdge {
+
+    private final int destNode;
+    private int capacity;
+    private final int cost;
+    private final int nextEdge;
+
+    private MinCostFlowEdge(int destNode, int capacity, int cost, int nextEdge) {
+      this.destNode = destNode;
+      this.capacity = capacity;
+      this.cost = cost;
+      this.nextEdge = nextEdge;
+    }
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java
deleted file mode 100644
index d849c195e3..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
-
-public class MCFEdge {
-
-  public int destNode;
-  public int capacity;
-  public int cost;
-  public int nextEdge;
-
-  public MCFEdge(int destNode, int capacity, int cost, int nextEdge) {
-    this.destNode = destNode;
-    this.capacity = capacity;
-    this.cost = cost;
-    this.nextEdge = nextEdge;
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
similarity index 91%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouter.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
index 5d5fda43b5..c2dd329b64 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -29,15 +29,15 @@ import java.util.Map;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** The LoadScoreGreedyRouter always pick the Replica with the lowest loadScore */
-public class LoadScoreGreedyRouter implements IRouter {
+/** The GreedyRouter always pick the Replica with the lowest loadScore */
+public class GreedyPriorityBalancer implements IPriorityBalancer {
 
-  public LoadScoreGreedyRouter() {
+  public GreedyPriorityBalancer() {
     // Empty constructor
   }
 
   @Override
-  public Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap(
+  public Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
       List<TRegionReplicaSet> replicaSets,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
       Map<Integer, Long> dataNodeLoadScoreMap) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/IPriorityBalancer.java
similarity index 64%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/IPriorityBalancer.java
index b6e875f77c..47d8650964 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/IPriorityBalancer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -24,21 +24,21 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import java.util.List;
 import java.util.Map;
 
-/**
- * The IRouter is a functional interface, which means a new functional class who implements the
- * IRouter must be created for generating the latest real-time routing policy.
- */
-public interface IRouter {
+public interface IPriorityBalancer {
+
+  String LEADER_POLICY = "LEADER";
+  String GREEDY_POLICY = "GREEDY";
 
   /**
-   * Generate an optimal real-time read/write requests routing policy.
+   * Generate an optimal route priority.
    *
-   * @param replicaSets All RegionReplicasEts currently owned by the cluster
-   * @return Map<TConsensusGroupId, TRegionReplicaSet>, The routing policy of read/write requests
-   *     for each Region is based on the order in the TRegionReplicaSet. The replica with higher
-   *     sorting result have higher priority.
+   * @param replicaSets All RegionGroups
+   * @param regionLeaderMap The current leader of each RegionGroup
+   * @param dataNodeLoadScoreMap The current load score of each DataNode
+   * @return Map<TConsensusGroupId, TRegionReplicaSet>, The optimal route priority for each
+   *     RegionGroup. The replica with higher sorting result have higher priority.
    */
-  Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap(
+  Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
       List<TRegionReplicaSet> replicaSets,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
       Map<Integer, Long> dataNodeLoadScoreMap);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
similarity index 93%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
index 4ab5505f48..80c1d1aaf5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -29,15 +29,15 @@ import java.util.Map;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** The LeaderRouter always pick the leader Replica */
-public class LeaderRouter implements IRouter {
+/** The LeaderRouter always pick the leader Replica as first */
+public class LeaderPriorityBalancer implements IPriorityBalancer {
 
-  public LeaderRouter() {
+  public LeaderPriorityBalancer() {
     // Empty constructor
   }
 
   @Override
-  public Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap(
+  public Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
       List<TRegionReplicaSet> replicaSets,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
       Map<Integer, Long> dataNodeLoadScoreMap) {
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
new file mode 100644
index 0000000000..b676a62ecf
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class GreedyLeaderBalancerTest {
+
+  private static final GreedyLeaderBalancer BALANCER = new GreedyLeaderBalancer();
+
+  @Test
+  public void optimalLeaderDistributionTest() {
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+    Set<Integer> disabledDataNodeSet = new HashSet<>();
+    Random random = new Random();
+
+    // Build 9 RegionGroups in DataNodes 0~2
+    for (int i = 0; i < 9; i++) {
+      TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+      TRegionReplicaSet regionReplicaSet =
+          new TRegionReplicaSet(
+              regionGroupId,
+              Arrays.asList(
+                  new TDataNodeLocation().setDataNodeId(0),
+                  new TDataNodeLocation().setDataNodeId(1),
+                  new TDataNodeLocation().setDataNodeId(2)));
+      regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+      regionLeaderMap.put(regionGroupId, random.nextInt(3));
+    }
+
+    // Build 9 RegionGroups in DataNodes 3~5
+    for (int i = 9; i < 18; i++) {
+      TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+      TRegionReplicaSet regionReplicaSet =
+          new TRegionReplicaSet(
+              regionGroupId,
+              Arrays.asList(
+                  new TDataNodeLocation().setDataNodeId(3),
+                  new TDataNodeLocation().setDataNodeId(4),
+                  new TDataNodeLocation().setDataNodeId(5)));
+      regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+      regionLeaderMap.put(regionGroupId, 3 + random.nextInt(3));
+    }
+
+    Map<TConsensusGroupId, Integer> leaderDistribution =
+        BALANCER.generateOptimalLeaderDistribution(
+            regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+    Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+    leaderDistribution.forEach(
+        (regionGroupId, leaderId) ->
+            leaderCounter
+                .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
+                .getAndIncrement());
+
+    // Each DataNode has exactly 3 leaders
+    for (int i = 0; i < 6; i++) {
+      Assert.assertEquals(3, leaderCounter.get(i).get());
+    }
+  }
+
+  @Test
+  public void disableTest() {
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+    Set<Integer> disabledDataNodeSet = new HashSet<>();
+
+    disabledDataNodeSet.add(1);
+    disabledDataNodeSet.add(4);
+
+    // Build 10 RegionGroup whose leaders are all 1(Disabled)
+    for (int i = 0; i < 10; i++) {
+      TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+      TRegionReplicaSet regionReplicaSet =
+          new TRegionReplicaSet(
+              regionGroupId,
+              Arrays.asList(
+                  new TDataNodeLocation().setDataNodeId(0),
+                  new TDataNodeLocation().setDataNodeId(1),
+                  new TDataNodeLocation().setDataNodeId(2)));
+      regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+      regionLeaderMap.put(regionGroupId, 1);
+    }
+
+    // Build 10 RegionGroup whose leaders are all 4(Disabled)
+    for (int i = 10; i < 20; i++) {
+      TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+      TRegionReplicaSet regionReplicaSet =
+          new TRegionReplicaSet(
+              regionGroupId,
+              Arrays.asList(
+                  new TDataNodeLocation().setDataNodeId(3),
+                  new TDataNodeLocation().setDataNodeId(4),
+                  new TDataNodeLocation().setDataNodeId(5)));
+      regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+      regionLeaderMap.put(regionGroupId, 4);
+    }
+
+    Map<TConsensusGroupId, Integer> leaderDistribution =
+        BALANCER.generateOptimalLeaderDistribution(
+            regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+    Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+    leaderDistribution.forEach(
+        (regionGroupId, leaderId) ->
+            leaderCounter
+                .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
+                .getAndIncrement());
+
+    for (int i = 0; i < 6; i++) {
+      if (i != 1 && i != 4) {
+        Assert.assertEquals(5, leaderCounter.get(i).get());
+      }
+    }
+  }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
new file mode 100644
index 0000000000..9c7bda4287
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LeaderBalancerComparisonTest {
+
+  // Set this field to true, and you can see the readable test results in command line
+  private static final boolean isCommandLineMode = false;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeaderBalancerComparisonTest.class);
+  private static FileWriter WRITER;
+
+  private static final GreedyLeaderBalancer GREEDY_LEADER_BALANCER = new GreedyLeaderBalancer();
+  private static final MinCostFlowLeaderBalancer MIN_COST_FLOW_LEADER_BALANCER =
+      new MinCostFlowLeaderBalancer();
+
+  private static final Random RANDOM = new Random();
+  private static final int TEST_MAX_DATA_NODE_NUM = 100;
+  private static final int TEST_CPU_CORE_NUM = 16;
+  private static final int TEST_REPLICA_NUM = 3;
+  private static final double GREEDY_INIT_RATE = 0.9;
+  private static final double DISABLE_DATA_NODE_RATE = 0.05;
+
+  // Invoke this interface if you want to record the test result
+  public static void prepareWriter() throws IOException {
+    if (isCommandLineMode) {
+      WRITER = null;
+    } else {
+      WRITER = new FileWriter("./leaderBalancerTest.txt");
+    }
+  }
+
+  // Add @Test here to enable this test
+  public void leaderBalancerComparisonTest() throws IOException {
+    for (int dataNodeNum = 3; dataNodeNum <= TEST_MAX_DATA_NODE_NUM; dataNodeNum++) {
+      // Simulate each DataNode has 16 CPU cores
+      // and each RegionGroup has 3 replicas
+      int regionGroupNum = TEST_CPU_CORE_NUM * dataNodeNum / TEST_REPLICA_NUM;
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+      Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+      generateTestData(dataNodeNum, regionGroupNum, regionReplicaSetMap, regionLeaderMap);
+
+      if (isCommandLineMode) {
+        LOGGER.info("============================");
+        LOGGER.info("DataNodeNum: {}, RegionGroupNum: {}", dataNodeNum, regionGroupNum);
+      }
+
+      // Basic test
+      Map<TConsensusGroupId, Integer> greedyLeaderDistribution = new ConcurrentHashMap<>();
+      Statistics greedyStatistics =
+          doBalancing(
+              dataNodeNum,
+              regionGroupNum,
+              GREEDY_LEADER_BALANCER,
+              regionReplicaSetMap,
+              regionLeaderMap,
+              new HashSet<>(),
+              greedyLeaderDistribution);
+      Map<TConsensusGroupId, Integer> mcfLeaderDistribution = new ConcurrentHashMap<>();
+      Statistics mcfStatistics =
+          doBalancing(
+              dataNodeNum,
+              regionGroupNum,
+              MIN_COST_FLOW_LEADER_BALANCER,
+              regionReplicaSetMap,
+              regionLeaderMap,
+              new HashSet<>(),
+              mcfLeaderDistribution);
+      if (isCommandLineMode) {
+        LOGGER.info("[Basic test]");
+        LOGGER.info("Greedy balancer: {}", greedyStatistics);
+        LOGGER.info("MinCostFlow balancer: {}", mcfStatistics);
+      } else {
+        greedyStatistics.toFile();
+        mcfStatistics.toFile();
+      }
+
+      // Disaster test
+      int disabledDataNodeNum = (int) Math.ceil(dataNodeNum * DISABLE_DATA_NODE_RATE);
+      HashSet<Integer> disabledDataNodeSet = new HashSet<>();
+      while (disabledDataNodeSet.size() < disabledDataNodeNum) {
+        int dataNodeId = RANDOM.nextInt(dataNodeNum);
+        if (disabledDataNodeSet.contains(dataNodeId)) {
+          continue;
+        }
+        disabledDataNodeSet.add(dataNodeId);
+      }
+      greedyStatistics =
+          doBalancing(
+              dataNodeNum,
+              regionGroupNum,
+              GREEDY_LEADER_BALANCER,
+              regionReplicaSetMap,
+              greedyLeaderDistribution,
+              disabledDataNodeSet,
+              greedyLeaderDistribution);
+      mcfStatistics =
+          doBalancing(
+              dataNodeNum,
+              regionGroupNum,
+              MIN_COST_FLOW_LEADER_BALANCER,
+              regionReplicaSetMap,
+              mcfLeaderDistribution,
+              disabledDataNodeSet,
+              mcfLeaderDistribution);
+      if (isCommandLineMode) {
+        LOGGER.info("[Disaster test]");
+        LOGGER.info("Greedy balancer: {}", greedyStatistics);
+        LOGGER.info("MinCostFlow balancer: {}", mcfStatistics);
+      } else {
+        greedyStatistics.toFile();
+        mcfStatistics.toFile();
+      }
+
+      // Recovery test
+      greedyStatistics =
+          doBalancing(
+              dataNodeNum,
+              regionGroupNum,
+              GREEDY_LEADER_BALANCER,
+              regionReplicaSetMap,
+              greedyLeaderDistribution,
+              new HashSet<>(),
+              greedyLeaderDistribution);
+      mcfStatistics =
+          doBalancing(
+              dataNodeNum,
+              regionGroupNum,
+              MIN_COST_FLOW_LEADER_BALANCER,
+              regionReplicaSetMap,
+              mcfLeaderDistribution,
+              new HashSet<>(),
+              mcfLeaderDistribution);
+      if (isCommandLineMode) {
+        LOGGER.info("[Recovery test]");
+        LOGGER.info("Greedy balancer: {}", greedyStatistics);
+        LOGGER.info("MinCostFlow balancer: {}", mcfStatistics);
+      } else {
+        greedyStatistics.toFile();
+        mcfStatistics.toFile();
+      }
+    }
+  }
+
+  private void generateTestData(
+      int dataNodeNum,
+      int regionGroupNum,
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap) {
+
+    Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
+    Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+    for (int i = 0; i < dataNodeNum; i++) {
+      regionCounter.put(i, new AtomicInteger(0));
+      leaderCounter.put(i, new AtomicInteger(0));
+    }
+
+    int greedyNum = (int) (GREEDY_INIT_RATE * regionGroupNum);
+    int randomNum = regionGroupNum - greedyNum;
+    for (int index = 0; index < regionGroupNum; index++) {
+      int leaderId = -1;
+      TConsensusGroupId regionGroupId =
+          new TConsensusGroupId(TConsensusGroupType.DataRegion, index);
+      TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet().setRegionId(regionGroupId);
+
+      int seed = RANDOM.nextInt(greedyNum + randomNum);
+      if (seed < greedyNum) {
+        // Greedy pick RegionReplicas and leader
+        int leaderWeight = Integer.MAX_VALUE;
+        PriorityQueue<Pair<Integer, Integer>> dataNodePriorityQueue =
+            new PriorityQueue<>(Comparator.comparingInt(Pair::getRight));
+        regionCounter.forEach(
+            (dataNodeId, regionGroupCount) ->
+                dataNodePriorityQueue.offer(new Pair<>(dataNodeId, regionGroupCount.get())));
+        for (int i = 0; i < TEST_REPLICA_NUM; i++) {
+          int dataNodeId = Objects.requireNonNull(dataNodePriorityQueue.poll()).getLeft();
+          regionReplicaSet.addToDataNodeLocations(
+              new TDataNodeLocation().setDataNodeId(dataNodeId));
+          if (leaderCounter.get(dataNodeId).get() < leaderWeight) {
+            leaderWeight = leaderCounter.get(dataNodeId).get();
+            leaderId = dataNodeId;
+          }
+        }
+        greedyNum -= 1;
+      } else {
+        // Random pick RegionReplicas and leader
+        Set<Integer> randomSet = new HashSet<>();
+        while (randomSet.size() < TEST_REPLICA_NUM) {
+          int dataNodeId = RANDOM.nextInt(dataNodeNum);
+          if (randomSet.contains(dataNodeId)) {
+            continue;
+          }
+
+          randomSet.add(dataNodeId);
+          regionReplicaSet.addToDataNodeLocations(
+              new TDataNodeLocation().setDataNodeId(dataNodeId));
+        }
+        leaderId = new ArrayList<>(randomSet).get(RANDOM.nextInt(TEST_REPLICA_NUM));
+        randomNum -= 1;
+      }
+
+      regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+      regionReplicaSet
+          .getDataNodeLocations()
+          .forEach(
+              dataNodeLocation ->
+                  regionCounter.get(dataNodeLocation.getDataNodeId()).getAndIncrement());
+      regionLeaderMap.put(regionGroupId, leaderId);
+      leaderCounter.get(leaderId).getAndIncrement();
+    }
+  }
+
+  private Statistics doBalancing(
+      int dataNodeNum,
+      int regionGroupNum,
+      ILeaderBalancer leaderBalancer,
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap,
+      Set<Integer> disabledDataNodeSet,
+      Map<TConsensusGroupId, Integer> stableLeaderDistribution) {
+
+    Statistics result = new Statistics();
+    result.rounds = -1;
+    Map<TConsensusGroupId, Integer> lastDistribution = new ConcurrentHashMap<>(regionLeaderMap);
+    for (int rounds = 0; rounds < 1000; rounds++) {
+      Map<TConsensusGroupId, Integer> currentDistribution =
+          leaderBalancer.generateOptimalLeaderDistribution(
+              regionReplicaSetMap, lastDistribution, disabledDataNodeSet);
+      if (currentDistribution.equals(lastDistribution)) {
+        // The leader distribution is stable
+        result.rounds = rounds;
+        break;
+      }
+
+      AtomicInteger switchTimes = new AtomicInteger();
+      lastDistribution
+          .keySet()
+          .forEach(
+              regionGroupId -> {
+                if (!Objects.equals(
+                    lastDistribution.get(regionGroupId), currentDistribution.get(regionGroupId))) {
+                  switchTimes.getAndIncrement();
+                }
+              });
+
+      result.switchTimes += switchTimes.get();
+      lastDistribution.clear();
+      lastDistribution.putAll(currentDistribution);
+    }
+
+    stableLeaderDistribution.clear();
+    stableLeaderDistribution.putAll(lastDistribution);
+
+    double sum = 0;
+    double avg = (double) (regionGroupNum) / (double) (dataNodeNum);
+    int minLeaderCount = Integer.MAX_VALUE;
+    int maxLeaderCount = Integer.MIN_VALUE;
+    Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+    lastDistribution.forEach(
+        (regionGroupId, leaderId) ->
+            leaderCounter
+                .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
+                .getAndIncrement());
+    for (Map.Entry<Integer, AtomicInteger> entry : leaderCounter.entrySet()) {
+      int leaderCount = entry.getValue().get();
+      sum += Math.pow((double) leaderCount - avg, 2);
+      minLeaderCount = Math.min(minLeaderCount, leaderCount);
+      maxLeaderCount = Math.max(maxLeaderCount, leaderCount);
+    }
+    result.range = maxLeaderCount - minLeaderCount;
+    result.variance = sum / (double) (dataNodeNum);
+
+    return result;
+  }
+
+  private static class Statistics {
+
+    // The number of execution rounds that the output of balance algorithm is stable
+    private int rounds;
+    // The number of change leader until the output of balance algorithm is stable
+    private int switchTimes;
+    // The range of the number of cluster leaders
+    private int range;
+    // The variance of the number of cluster leaders
+    private double variance;
+
+    private Statistics() {
+      this.rounds = 0;
+      this.switchTimes = 0;
+      this.range = 0;
+      this.variance = 0;
+    }
+
+    private void toFile() throws IOException {
+      WRITER.write(
+          rounds + "," + switchTimes + "," + range + "," + String.format("%.6f", variance) + "\n");
+      WRITER.flush();
+    }
+
+    @Override
+    public String toString() {
+      return "Statistics{"
+          + "rounds="
+          + rounds
+          + ", switchTimes="
+          + switchTimes
+          + ", range="
+          + range
+          + ", variance="
+          + variance
+          + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      Statistics that = (Statistics) o;
+      return rounds == that.rounds
+          && switchTimes == that.switchTimes
+          && range == that.range
+          && Math.abs(variance - that.variance) <= 0.1;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(rounds, switchTimes, range, variance);
+    }
+  }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
similarity index 88%
rename from confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
index 4ce2e1843a..f676956780 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
@@ -38,7 +38,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class MCFLeaderBalancerTest {
+public class MinCostFlowLeaderBalancerTest {
+
+  private static final MinCostFlowLeaderBalancer BALANCER = new MinCostFlowLeaderBalancer();
 
   /** This test shows a simple case that greedy algorithm might fail */
   @Test
@@ -83,18 +85,17 @@ public class MCFLeaderBalancerTest {
     disabledDataNodeSet.add(0);
 
     // Do balancing
-    MCFLeaderBalancer mcfLeaderBalancer =
-        new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
     Map<TConsensusGroupId, Integer> leaderDistribution =
-        mcfLeaderBalancer.generateOptimalLeaderDistribution();
+        BALANCER.generateOptimalLeaderDistribution(
+            regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
     // All RegionGroup got a leader
     Assert.assertEquals(3, leaderDistribution.size());
     // Each DataNode occurs exactly once
     Assert.assertEquals(3, new HashSet<>(leaderDistribution.values()).size());
     // MaxFlow is 3
-    Assert.assertEquals(3, mcfLeaderBalancer.getMaximumFlow());
+    Assert.assertEquals(3, BALANCER.getMaximumFlow());
     // MinimumCost is 3(switch leader cost) + 3(load cost, 1 for each DataNode)
-    Assert.assertEquals(3 + 3, mcfLeaderBalancer.getMinimumCost());
+    Assert.assertEquals(3 + 3, BALANCER.getMinimumCost());
   }
 
   /** The leader will remain the same if all DataNodes are disabled */
@@ -119,10 +120,9 @@ public class MCFLeaderBalancerTest {
     disabledDataNodeSet.add(2);
 
     // Do balancing
-    MCFLeaderBalancer mcfLeaderBalancer =
-        new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
     Map<TConsensusGroupId, Integer> leaderDistribution =
-        mcfLeaderBalancer.generateOptimalLeaderDistribution();
+        BALANCER.generateOptimalLeaderDistribution(
+            regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
     Assert.assertEquals(1, leaderDistribution.size());
     Assert.assertEquals(1, new HashSet<>(leaderDistribution.values()).size());
     // Leader remains the same
@@ -130,9 +130,9 @@ public class MCFLeaderBalancerTest {
         regionLeaderMap.get(regionReplicaSet.getRegionId()),
         leaderDistribution.get(regionReplicaSet.getRegionId()));
     // MaxFlow is 0
-    Assert.assertEquals(0, mcfLeaderBalancer.getMaximumFlow());
+    Assert.assertEquals(0, BALANCER.getMaximumFlow());
     // MinimumCost is 0
-    Assert.assertEquals(0, mcfLeaderBalancer.getMinimumCost());
+    Assert.assertEquals(0, BALANCER.getMinimumCost());
   }
 
   /**
@@ -171,10 +171,9 @@ public class MCFLeaderBalancerTest {
     }
 
     // Do balancing
-    MCFLeaderBalancer mcfLeaderBalancer =
-        new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, new HashSet<>());
     Map<TConsensusGroupId, Integer> leaderDistribution =
-        mcfLeaderBalancer.generateOptimalLeaderDistribution();
+        BALANCER.generateOptimalLeaderDistribution(
+            regionReplicaSetMap, regionLeaderMap, new HashSet<>());
     // All RegionGroup got a leader
     Assert.assertEquals(regionGroupNum, leaderDistribution.size());
 
@@ -194,9 +193,9 @@ public class MCFLeaderBalancerTest {
         .forEach(leaderNum -> Assert.assertEquals(regionGroupNum / dataNodeNum, leaderNum.get()));
 
     // MaxFlow is regionGroupNum
-    Assert.assertEquals(regionGroupNum, mcfLeaderBalancer.getMaximumFlow());
+    Assert.assertEquals(regionGroupNum, BALANCER.getMaximumFlow());
 
-    int minimumCost = mcfLeaderBalancer.getMinimumCost();
+    int minimumCost = BALANCER.getMinimumCost();
     Assert.assertTrue(minimumCost >= loadCost * dataNodeNum);
     // The number of RegionGroups who have switched leader
     int switchCost = minimumCost - loadCost * dataNodeNum;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
similarity index 97%
rename from confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
index 042a80ce23..e54fab7cd3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
@@ -39,7 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class LoadScoreGreedyRouterTest {
+public class GreedyPriorityTest {
 
   @Test
   public void testGenLoadScoreGreedyRoutingPolicy() {
@@ -92,8 +92,8 @@ public class LoadScoreGreedyRouterTest {
 
     /* Check result */
     Map<TConsensusGroupId, TRegionReplicaSet> result =
-        new LoadScoreGreedyRouter()
-            .getLatestRegionRouteMap(
+        new GreedyPriorityBalancer()
+            .generateOptimalRoutePriority(
                 Arrays.asList(regionReplicaSet1, regionReplicaSet2), new HashMap<>(), loadScoreMap);
     Assert.assertEquals(2, result.size());
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
similarity index 96%
rename from confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index b369fedc6b..51a475f7a8 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
@@ -39,7 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class LeaderRouterTest {
+public class LeaderPriorityBalancerTest {
 
   @Test
   public void testGenRealTimeRoutingPolicy() {
@@ -100,7 +100,8 @@ public class LeaderRouterTest {
 
     // Check result
     Map<TConsensusGroupId, TRegionReplicaSet> result =
-        new LeaderRouter().getLatestRegionRouteMap(regionReplicaSets, leaderMap, loadScoreMap);
+        new LeaderPriorityBalancer()
+            .generateOptimalRoutePriority(regionReplicaSets, leaderMap, loadScoreMap);
     TRegionReplicaSet result1 = result.get(groupId1);
     // Leader first
     Assert.assertEquals(dataNodeLocations.get(1), result1.getDataNodeLocations().get(0));
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index 7603c8552c..aa06f14d94 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -87,6 +87,15 @@ public abstract class AbstractEnv implements BaseEnv {
     String targetConfigNode = seedConfigNodeWrapper.getIpAndPortString();
     this.configNodeWrapperList.add(seedConfigNodeWrapper);
 
+    // Check if the Seed-ConfigNode started successfully
+    try (SyncConfigNodeIServiceClient ignored =
+        (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
+      // Do nothing
+      logger.info("The Seed-ConfigNode started successfully!");
+    } catch (Exception e) {
+      logger.error("Failed to get connection to the Seed-ConfigNode", e);
+    }
+
     List<String> configNodeEndpoints = new ArrayList<>();
     RequestDelegate<Void> configNodesDelegate = new SerialRequestDelegate<>(configNodeEndpoints);
     for (int i = 1; i < configNodesNum; i++) {
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
index a94727984f..ae463159f1 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
@@ -25,8 +25,11 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
@@ -180,16 +183,15 @@ public class IoTDBClusterRegionLeaderBalancingIT {
         status = client.setStorageGroup(setReq);
         Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
 
-        // TODO: Create a SchemaRegionGroup for each StorageGroup
-        // TODO: (The Ratis protocol class is now hard to change leader)
-        //        TSchemaPartitionTableResp schemaPartitionTableResp =
-        //            client.getOrCreateSchemaPartitionTable(
-        //                new TSchemaPartitionReq(
-        //                    ConfigNodeTestUtils.generatePatternTreeBuffer(
-        //                        new String[] {sg + i + "." + "d"})));
-        //        Assert.assertEquals(
-        //            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-        //            schemaPartitionTableResp.getStatus().getCode());
+        // Create a SchemaRegionGroup for each StorageGroup
+        TSchemaPartitionTableResp schemaPartitionTableResp =
+            client.getOrCreateSchemaPartitionTable(
+                new TSchemaPartitionReq(
+                    ConfigNodeTestUtils.generatePatternTreeBuffer(
+                        new String[] {sg + i + "." + "d"})));
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            schemaPartitionTableResp.getStatus().getCode());
 
         // Create a DataRegionGroup for each StorageGroup
         Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesSlotMap = new HashMap<>();
@@ -227,7 +229,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
         isDistributionBalanced = leaderCounter.size() == testDataNodeNum;
         // Each DataNode has exactly 4 Region-leader
         for (AtomicInteger leaderCount : leaderCounter.values()) {
-          if (leaderCount.get() != 2) {
+          if (leaderCount.get() != 4) {
             isDistributionBalanced = false;
           }
         }
@@ -286,7 +288,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
         isDistributionBalanced = leaderCounter.size() == testDataNodeNum - 1;
         // Each Running DataNode has exactly 6 Region-leader
         for (AtomicInteger leaderCount : leaderCounter.values()) {
-          if (leaderCount.get() != 3) {
+          if (leaderCount.get() != 6) {
             isDistributionBalanced = false;
           }
         }
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
index 2510e81e2a..e34e142815 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
@@ -69,9 +69,9 @@ public class IoTDBClusterRestartIT {
 
   private static final String ratisConsensusProtocolClass =
       "org.apache.iotdb.consensus.ratis.RatisConsensus";
-  private static final int testConfigNodeNum = 3;
-  private static final int testDataNodeNum = 3;
-  private static final int testReplicationFactor = 3;
+  private static final int testConfigNodeNum = 2;
+  private static final int testDataNodeNum = 2;
+  private static final int testReplicationFactor = 2;
   private static final long testTimePartitionInterval = 604800000;
   protected static String originalConfigNodeConsensusProtocolClass;
   protected static String originalSchemaRegionConsensusProtocolClass;
@@ -100,7 +100,7 @@ public class IoTDBClusterRestartIT {
 
     originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
     ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
-    // Init 3C3D cluster environment
+    // Init 2C2D cluster environment
     EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
   }
 
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index bb0b0a63e7..27f5958ff0 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -100,16 +100,13 @@
 # Datatype: boolean
 # enable_data_partition_inherit_policy=false
 
-# The routing policy of read/write requests
-# These routing policy are currently supported:
-# 1. leader(Default, routing to leader replica)
-# 2. greedy(Routing to replica with the lowest load, might cause read un-consistent)
+# The policy of cluster RegionGroups' leader distribution.
+# E.g. we should balance cluster RegionGroups' leader distribution when some DataNodes are shutdown or re-connected.
+# These policies are currently supported:
+# 1. GREEDY(Distribute leader through a simple greedy algorithm, might cause unbalance)
+# 2. MIN_COST_FLOW(Default, distribute leader through min cost flow algorithm)
 # Datatype: string
-# routing_policy=leader
-
-# Whether enable ConfigNode-leader to balance RegionGroups' leader distribution automatically.
-# Datatype: boolean
-# enable_leader_balancing=false
+# leader_distribution_policy=MIN_COST_FLOW
 
 ####################
 ### Cluster management
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 7ca7555a8f..ba39d0fcea 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1167,17 +1167,30 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     }
   }
 
-  public TSStatus changeRegionLeader(TRegionLeaderChangeReq req) throws TException {
+  @Override
+  public TSStatus changeRegionLeader(TRegionLeaderChangeReq req) {
+    LOGGER.info("[ChangeRegionLeader] {}", req);
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     TConsensusGroupId tgId = req.getRegionId();
     ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId);
     TEndPoint newNode = getConsensusEndPoint(req.getNewLeaderNode(), regionId);
     Peer newLeaderPeer = new Peer(regionId, req.getNewLeaderNode().getDataNodeId(), newNode);
-    if (!isLeader(regionId)) {
-      LOGGER.info("region {} is not leader, no need to change leader", regionId);
-      return status;
-    }
-    LOGGER.info("region {} is leader, will change leader", regionId);
+
+    if (isLeader(regionId)) {
+      String msg =
+          "[ChangeRegionLeader] The current DataNode: "
+              + req.getNewLeaderNode().getDataNodeId()
+              + " is already the leader of RegionGroup: "
+              + regionId
+              + ", skip leader transfer.";
+      LOGGER.info(msg);
+      return status.setMessage(msg);
+    }
+
+    LOGGER.info(
+        "[ChangeRegionLeader] Start change the leader of RegionGroup: {} to DataNode: {}",
+        regionId,
+        req.getNewLeaderNode().getDataNodeId());
     return transferLeader(regionId, newLeaderPeer);
   }
 
@@ -1190,16 +1203,24 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
       resp = SchemaRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
     } else {
       status.setCode(TSStatusCode.REGION_LEADER_CHANGE_ERROR.getStatusCode());
-      status.setMessage("Error Region type. region: " + regionId);
+      status.setMessage("[ChangeRegionLeader] Error Region type: " + regionId);
       return status;
     }
+
     if (!resp.isSuccess()) {
-      LOGGER.error("change region {} leader failed", regionId, resp.getException());
+      LOGGER.error(
+          "[ChangeRegionLeader] Failed to change the leader of RegionGroup: {}",
+          regionId,
+          resp.getException());
       status.setCode(TSStatusCode.REGION_LEADER_CHANGE_ERROR.getStatusCode());
       status.setMessage(resp.getException().getMessage());
       return status;
     }
-    status.setMessage("change region " + regionId + " leader succeed");
+    status.setMessage(
+        "[ChangeRegionLeader] Successfully change the leader of RegionGroup: "
+            + regionId
+            + " to "
+            + newLeaderPeer.getNodeId());
     return status;
   }
 
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index ec567aab99..eb6abc0364 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -415,9 +415,9 @@ service IDataNodeRPCService {
   common.TSStatus deleteRegion(common.TConsensusGroupId consensusGroupId)
 
   /**
-   * Config node will change a region leader to other data node int same consensus group
-   * if the region is not leader on the node, will do nothing
-   * @param change a region leader to which node
+   * Change the leader of specified RegionGroup to another DataNode
+   *
+   * @param The specified RegionGroup and the new leader DataNode
    */
   common.TSStatus changeRegionLeader(TRegionLeaderChangeReq req);