You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/08 06:04:32 UTC

[iotdb] branch master updated: [IOTDB-4768] Balancing cluster RegionGroup leader distribution by MinimumCostFlow algorithm (#7774)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f8085c65d [IOTDB-4768] Balancing cluster RegionGroup leader distribution by MinimumCostFlow algorithm (#7774)
1f8085c65d is described below

commit 1f8085c65d66e7bb7dfc91df0f6119a0a2e698ca
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Nov 8 14:04:25 2022 +0800

    [IOTDB-4768] Balancing cluster RegionGroup leader distribution by MinimumCostFlow algorithm (#7774)
---
 .../confignode/client/DataNodeRequestType.java     |   1 +
 .../client/async/AsyncDataNodeClientPool.java      |   7 +
 .../client/async/handlers/AsyncClientHandler.java  |   1 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  11 +
 .../confignode/conf/ConfigNodeDescriptor.java      |   5 +
 .../manager/load/balancer/RouteBalancer.java       | 132 +++++++--
 .../manager/load/balancer/router/mcf/MCFEdge.java  |  34 +++
 .../balancer/router/mcf/MCFLeaderBalancer.java     | 305 +++++++++++++++++++++
 .../iotdb/confignode/manager/node/NodeManager.java |  13 +
 .../manager/partition/PartitionManager.java        |  12 +
 .../confignode/persistence/node/NodeInfo.java      |  12 +-
 .../balancer/router/mcf/MCFLeaderBalancerTest.java | 216 +++++++++++++++
 .../java/org/apache/iotdb/it/env/MppConfig.java    |   7 +
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |   8 +
 .../it/IoTDBClusterRegionLeaderBalancingIT.java    | 151 ++++++++++
 .../resources/conf/iotdb-common.properties         |   4 +
 16 files changed, 899 insertions(+), 20 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 78c3524801..ff236ce339 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -43,6 +43,7 @@ public enum DataNodeRequestType {
   DELETE_OLD_REGION_PEER,
 
   UPDATE_REGION_ROUTE_MAP,
+  CHANGE_REGION_LEADER,
 
   /** PartitionCache */
   INVALIDATE_PARTITION_CACHE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index e6f1c6375c..bfd686a890 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq;
@@ -219,6 +220,12 @@ public class AsyncDataNodeClientPool {
               (AsyncTSStatusRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
           break;
+        case CHANGE_REGION_LEADER:
+          client.changeRegionLeader(
+              (TRegionLeaderChangeReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
         case BROADCAST_LATEST_CONFIG_NODE_GROUP:
           client.updateConfigNodeGroup(
               (TUpdateConfigNodeGroupReq) clientHandler.getRequest(requestId),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 612bea4f05..43b8ad82e4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -214,6 +214,7 @@ public class AsyncClientHandler<Q, R> {
       case BROADCAST_LATEST_CONFIG_NODE_GROUP:
       case INVALIDATE_MATCHED_SCHEMA_CACHE:
       case UPDATE_TEMPLATE:
+      case CHANGE_REGION_LEADER:
       default:
         return new AsyncTSStatusRPCHandler(
             requestType,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 3990b02f48..1cf93b4aa8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -141,6 +141,9 @@ public class ConfigNodeConfig {
   /** The routing policy of read/write requests */
   private String routingPolicy = RouteBalancer.LEADER_POLICY;
 
+  /** The ConfigNode-leader will automatically balance leader distribution if set true */
+  private boolean enableLeaderBalancing = false;
+
   private String readConsistencyLevel = "strong";
 
   /** RatisConsensus protocol, Max size for a single log append request from leader */
@@ -529,6 +532,14 @@ public class ConfigNodeConfig {
     this.routingPolicy = routingPolicy;
   }
 
+  public boolean isEnableLeaderBalancing() {
+    return enableLeaderBalancing;
+  }
+
+  public void setEnableLeaderBalancing(boolean enableLeaderBalancing) {
+    this.enableLeaderBalancing = enableLeaderBalancing;
+  }
+
   public String getReadConsistencyLevel() {
     return readConsistencyLevel;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 4eed5f5137..d84872ebbc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -304,6 +304,11 @@ public class ConfigNodeDescriptor {
               "Unknown routing_policy: %s, please set to \"leader\" or \"greedy\"", routingPolicy));
     }
 
+    conf.setEnableLeaderBalancing(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_leader_balancing", String.valueOf(conf.isEnableLeaderBalancing()))));
+
     String readConsistencyLevel =
         properties.getProperty("read_consistency_level", conf.getReadConsistencyLevel()).trim();
     if (readConsistencyLevel.equals("strong") || readConsistencyLevel.equals("weak")) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index ee8a8bdd9a..e9d21891a8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -20,19 +20,28 @@ package org.apache.iotdb.confignode.manager.load.balancer;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.balancer.router.IRouter;
 import org.apache.iotdb.confignode.manager.load.balancer.router.LeaderRouter;
 import org.apache.iotdb.confignode.manager.load.balancer.router.LoadScoreGreedyRouter;
 import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.confignode.manager.load.balancer.router.mcf.MCFLeaderBalancer;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
@@ -60,9 +69,16 @@ public class RouteBalancer {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(RouteBalancer.class);
 
+  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+  private static final boolean ENABLE_LEADER_BALANCING = CONF.isEnableLeaderBalancing();
+  private static final String SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS =
+      CONF.getSchemaRegionConsensusProtocolClass();
+  private static final String DATA_REGION_CONSENSUS_PROTOCOL_CLASS =
+      CONF.getDataRegionConsensusProtocolClass();
   private static final boolean isMultiLeader =
       ConsensusFactory.MULTI_LEADER_CONSENSUS.equals(
           ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
+
   public static final String LEADER_POLICY = "leader";
   public static final String GREEDY_POLICY = "greedy";
 
@@ -212,36 +228,114 @@ public class RouteBalancer {
 
   /** Start the route balancing service */
   public void startRouteBalancingService() {
-    synchronized (scheduleMonitor) {
-      if (currentLeaderBalancingFuture == null) {
-        currentLeaderBalancingFuture =
-            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-                leaderBalancingExecutor,
-                this::balancingRegionLeader,
-                0,
-                // Execute route balancing service in every 10 loops of heartbeat service
-                NodeManager.HEARTBEAT_INTERVAL * 10,
-                TimeUnit.MILLISECONDS);
-        LOGGER.info("Route-Balancing service is started successfully.");
+    if (ENABLE_LEADER_BALANCING) {
+      synchronized (scheduleMonitor) {
+        if (currentLeaderBalancingFuture == null) {
+          currentLeaderBalancingFuture =
+              ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                  leaderBalancingExecutor,
+                  this::balancingRegionLeader,
+                  0,
+                  // Execute route balancing service in every 5 loops of heartbeat service
+                  NodeManager.HEARTBEAT_INTERVAL * 5,
+                  TimeUnit.MILLISECONDS);
+          LOGGER.info("Route-Balancing service is started successfully.");
+        }
       }
     }
   }
 
   /** Stop the route balancing service */
   public void stopRouteBalancingService() {
-    synchronized (scheduleMonitor) {
-      if (currentLeaderBalancingFuture != null) {
-        currentLeaderBalancingFuture.cancel(false);
-        currentLeaderBalancingFuture = null;
-        leaderCache.clear();
-        regionRouteMap.clear();
-        LOGGER.info("Route-Balancing service is stopped successfully.");
+    if (ENABLE_LEADER_BALANCING) {
+      synchronized (scheduleMonitor) {
+        if (currentLeaderBalancingFuture != null) {
+          currentLeaderBalancingFuture.cancel(false);
+          currentLeaderBalancingFuture = null;
+          leaderCache.clear();
+          regionRouteMap.clear();
+          LOGGER.info("Route-Balancing service is stopped successfully.");
+        }
       }
     }
   }
 
   private void balancingRegionLeader() {
-    // TODO: IOTDB-4768
+    balancingRegionLeader(TConsensusGroupType.SchemaRegion);
+    balancingRegionLeader(TConsensusGroupType.DataRegion);
+  }
+
+  private void balancingRegionLeader(TConsensusGroupType regionGroupType) {
+    // Collect latest data to generate leaderBalancer
+    MCFLeaderBalancer leaderBalancer =
+        new MCFLeaderBalancer(
+            getPartitionManager().getAllReplicaSetsMap(regionGroupType),
+            regionRouteMap.getRegionLeaderMap(),
+            getNodeManager()
+                .filterDataNodeThroughStatus(
+                    NodeStatus.Unknown, NodeStatus.ReadOnly, NodeStatus.Removing)
+                .stream()
+                .map(TDataNodeConfiguration::getLocation)
+                .map(TDataNodeLocation::getDataNodeId)
+                .collect(Collectors.toSet()));
+
+    // Calculate the optimal leader distribution
+    Map<TConsensusGroupId, Integer> leaderDistribution =
+        leaderBalancer.generateOptimalLeaderDistribution();
+
+    // Transfer leader to the optimal distribution
+    AtomicInteger requestId = new AtomicInteger(0);
+    AsyncClientHandler<TRegionLeaderChangeReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.CHANGE_REGION_LEADER);
+    leaderDistribution.forEach(
+        (regionGroupId, newLeaderId) -> {
+          if (newLeaderId != regionRouteMap.getLeader(regionGroupId)) {
+            String consensusProtocolClass;
+            switch (regionGroupId.getType()) {
+              case SchemaRegion:
+                consensusProtocolClass = SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS;
+                break;
+              case DataRegion:
+              default:
+                consensusProtocolClass = DATA_REGION_CONSENSUS_PROTOCOL_CLASS;
+                break;
+            }
+            LOGGER.info(
+                "[LeaderBalancer] Try to change the leader of Region: {} to DataNode: {} ",
+                regionGroupId,
+                newLeaderId);
+            changeRegionLeader(
+                consensusProtocolClass, requestId, clientHandler, regionGroupId, newLeaderId);
+          }
+        });
+    if (requestId.get() > 0) {
+      AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    }
+  }
+
+  private void changeRegionLeader(
+      String consensusProtocolClass,
+      AtomicInteger requestId,
+      AsyncClientHandler<TRegionLeaderChangeReq, TSStatus> clientHandler,
+      TConsensusGroupId regionGroupId,
+      int newLeaderId) {
+    switch (consensusProtocolClass) {
+      case ConsensusFactory.MULTI_LEADER_CONSENSUS:
+        // For multi-leader protocol, change RegionRouteMap is enough.
+        // And the result will be broadcast by Cluster-LoadStatistics-Service soon.
+        regionRouteMap.setLeader(regionGroupId, newLeaderId);
+        break;
+      case ConsensusFactory.RATIS_CONSENSUS:
+      default:
+        // For ratis protocol, the ConfigNode-leader will send a changeLeaderRequest to the new
+        // leader.
+        // And the RegionRouteMap will be updated by Cluster-Heartbeat-Service later if change
+        // leader success.
+        TRegionLeaderChangeReq regionLeaderChangeReq =
+            new TRegionLeaderChangeReq(
+                regionGroupId, getNodeManager().getRegisteredDataNode(newLeaderId).getLocation());
+        clientHandler.putRequest(requestId.getAndIncrement(), regionLeaderChangeReq);
+    }
   }
 
   /** Initialize the regionRouteMap when the ConfigNode-Leader is switched */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java
new file mode 100644
index 0000000000..d849c195e3
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
+
+public class MCFEdge {
+
+  public int destNode;
+  public int capacity;
+  public int cost;
+  public int nextEdge;
+
+  public MCFEdge(int destNode, int capacity, int cost, int nextEdge) {
+    this.destNode = destNode;
+    this.capacity = capacity;
+    this.cost = cost;
+    this.nextEdge = nextEdge;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java
new file mode 100644
index 0000000000..5dbd292964
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Leader distribution balancer that uses minimum cost flow algorithm */
+public class MCFLeaderBalancer {
+
+  private static final int INFINITY = Integer.MAX_VALUE;
+
+  /** Input parameters */
+  private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+
+  private final Map<TConsensusGroupId, Integer> regionLeaderMap;
+  private final Set<Integer> disabledDataNodeSet;
+
+  /** Graph nodes */
+  // Super source node
+  private static final int sNode = 0;
+  // Super terminal node
+  private static final int tNode = 1;
+  // Maximum index of graph nodes
+  private int maxNode = tNode + 1;
+  // Map<RegionGroupId, rNode>
+  private final Map<TConsensusGroupId, Integer> rNodeMap;
+  // Map<DataNodeId, dNode>
+  private final Map<Integer, Integer> dNodeMap;
+  // Map<dNode, DataNodeId>
+  private final Map<Integer, Integer> dNodeReflect;
+
+  /** Graph edges */
+  // Maximum index of graph edges
+  private int maxEdge = 0;
+
+  private final List<MCFEdge> mcfEdges;
+  private int[] nodeHeadEdge;
+  private int[] nodeCurrentEdge;
+
+  private boolean[] isNodeVisited;
+  private int[] nodeMinimumCost;
+
+  private int maximumFlow = 0;
+  private int minimumCost = 0;
+
+  public MCFLeaderBalancer(
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap,
+      Set<Integer> disabledDataNodeSet) {
+    this.regionReplicaSetMap = regionReplicaSetMap;
+    this.regionLeaderMap = regionLeaderMap;
+    this.disabledDataNodeSet = disabledDataNodeSet;
+
+    this.rNodeMap = new HashMap<>();
+    this.dNodeMap = new HashMap<>();
+    this.dNodeReflect = new HashMap<>();
+
+    this.mcfEdges = new ArrayList<>();
+  }
+
+  public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution() {
+    constructMCFGraph();
+    dinicAlgorithm();
+    return collectLeaderDistribution();
+  }
+
+  private void constructMCFGraph() {
+    /* Indicate nodes in mcf */
+    for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+      rNodeMap.put(regionReplicaSet.getRegionId(), maxNode++);
+      for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+        if (!dNodeMap.containsKey(dataNodeLocation.getDataNodeId())) {
+          dNodeMap.put(dataNodeLocation.getDataNodeId(), maxNode);
+          dNodeReflect.put(maxNode, dataNodeLocation.getDataNodeId());
+          maxNode += 1;
+        }
+      }
+    }
+
+    /* Prepare arrays */
+    isNodeVisited = new boolean[maxNode];
+    nodeMinimumCost = new int[maxNode];
+    nodeCurrentEdge = new int[maxNode];
+    nodeHeadEdge = new int[maxNode];
+    Arrays.fill(nodeHeadEdge, -1);
+
+    /* Construct edges: sNode -> rNodes */
+    for (int rNode : rNodeMap.values()) {
+      // Cost: 0
+      addAdjacentEdges(sNode, rNode, 1, 0);
+    }
+
+    /* Construct edges: rNodes -> dNodes */
+    for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+      int rNode = rNodeMap.get(regionReplicaSet.getRegionId());
+      for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+        int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId());
+        // Cost: 1 if the dNode is corresponded to the current leader of the rNode,
+        //       0 otherwise.
+        // Therefore, the RegionGroup will keep the leader as constant as possible.
+        int cost =
+            regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1)
+                    == dataNodeLocation.getDataNodeId()
+                ? 0
+                : 1;
+        addAdjacentEdges(rNode, dNode, 1, cost);
+      }
+    }
+
+    /* Construct edges: dNodes -> tNode */
+    // Count the possible maximum number of leader in each DataNode
+    Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>();
+    regionReplicaSetMap
+        .values()
+        .forEach(
+            regionReplicaSet ->
+                regionReplicaSet
+                    .getDataNodeLocations()
+                    .forEach(
+                        dataNodeLocation ->
+                            maxLeaderCounter
+                                .computeIfAbsent(
+                                    dataNodeLocation.getDataNodeId(), empty -> new AtomicInteger(0))
+                                .getAndIncrement()));
+
+    for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) {
+      int dataNodeId = dNodeEntry.getKey();
+      int dNode = dNodeEntry.getValue();
+
+      if (disabledDataNodeSet.contains(dataNodeId)) {
+        // Skip disabled DataNode
+        continue;
+      }
+
+      int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get();
+      for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) {
+        // Cost: x^2 for the x-th edge at the current dNode.
+        // Thus, the leader distribution will be as balance as possible.
+        addAdjacentEdges(dNode, tNode, 1, extraEdge * extraEdge);
+      }
+    }
+  }
+
+  private void addAdjacentEdges(int fromNode, int destNode, int capacity, int cost) {
+    addEdge(fromNode, destNode, capacity, cost);
+    addEdge(destNode, fromNode, 0, -cost);
+  }
+
+  private void addEdge(int fromNode, int destNode, int capacity, int cost) {
+    MCFEdge edge = new MCFEdge(destNode, capacity, cost, nodeHeadEdge[fromNode]);
+    mcfEdges.add(edge);
+    nodeHeadEdge[fromNode] = maxEdge++;
+  }
+
+  /**
+   * Check whether there is an augmented path in the MCF graph by Bellman-Ford algorithm.
+   *
+   * <p>Notice: Never use Dijkstra algorithm to replace this since there might exist negative
+   * circles.
+   *
+   * @return True if there exist augmented paths, false otherwise.
+   */
+  private boolean bellmanFordCheck() {
+    Arrays.fill(isNodeVisited, false);
+    Arrays.fill(nodeMinimumCost, INFINITY);
+
+    Queue<Integer> queue = new LinkedList<>();
+    nodeMinimumCost[sNode] = 0;
+    isNodeVisited[sNode] = true;
+    queue.offer(sNode);
+    while (!queue.isEmpty()) {
+      int currentNode = queue.poll();
+      isNodeVisited[currentNode] = false;
+      for (int currentEdge = nodeHeadEdge[currentNode];
+          currentEdge >= 0;
+          currentEdge = mcfEdges.get(currentEdge).nextEdge) {
+        MCFEdge edge = mcfEdges.get(currentEdge);
+        if (edge.capacity > 0
+            && nodeMinimumCost[currentNode] + edge.cost < nodeMinimumCost[edge.destNode]) {
+          nodeMinimumCost[edge.destNode] = nodeMinimumCost[currentNode] + edge.cost;
+          if (!isNodeVisited[edge.destNode]) {
+            isNodeVisited[edge.destNode] = true;
+            queue.offer(edge.destNode);
+          }
+        }
+      }
+    }
+
+    return nodeMinimumCost[tNode] < INFINITY;
+  }
+
+  /** Do augmentation by dfs algorithm */
+  private int dfsAugmentation(int currentNode, int inputFlow) {
+    if (currentNode == tNode || inputFlow == 0) {
+      return inputFlow;
+    }
+
+    int currentEdge;
+    int outputFlow = 0;
+    isNodeVisited[currentNode] = true;
+    for (currentEdge = nodeCurrentEdge[currentNode];
+        currentEdge >= 0;
+        currentEdge = mcfEdges.get(currentEdge).nextEdge) {
+      MCFEdge edge = mcfEdges.get(currentEdge);
+      if (nodeMinimumCost[currentNode] + edge.cost == nodeMinimumCost[edge.destNode]
+          && edge.capacity > 0
+          && !isNodeVisited[edge.destNode]) {
+
+        int subOutputFlow = dfsAugmentation(edge.destNode, Math.min(inputFlow, edge.capacity));
+
+        minimumCost += subOutputFlow * edge.cost;
+
+        edge.capacity -= subOutputFlow;
+        mcfEdges.get(currentEdge ^ 1).capacity += subOutputFlow;
+
+        inputFlow -= subOutputFlow;
+        outputFlow += subOutputFlow;
+
+        if (inputFlow == 0) {
+          break;
+        }
+      }
+    }
+    nodeCurrentEdge[currentNode] = currentEdge;
+
+    if (outputFlow > 0) {
+      isNodeVisited[currentNode] = false;
+    }
+    return outputFlow;
+  }
+
+  private void dinicAlgorithm() {
+    while (bellmanFordCheck()) {
+      int currentFlow;
+      System.arraycopy(nodeHeadEdge, 0, nodeCurrentEdge, 0, maxNode);
+      while ((currentFlow = dfsAugmentation(sNode, INFINITY)) > 0) {
+        maximumFlow += currentFlow;
+      }
+    }
+  }
+
+  /** @return Map<RegionGroupId, DataNodeId where the new leader locate> */
+  private Map<TConsensusGroupId, Integer> collectLeaderDistribution() {
+    Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
+
+    rNodeMap.forEach(
+        (regionGroupId, rNode) -> {
+          boolean matchLeader = false;
+          for (int currentEdge = nodeHeadEdge[rNode];
+              currentEdge >= 0;
+              currentEdge = mcfEdges.get(currentEdge).nextEdge) {
+            MCFEdge edge = mcfEdges.get(currentEdge);
+            if (edge.destNode != sNode && edge.capacity == 0) {
+              matchLeader = true;
+              result.put(regionGroupId, dNodeReflect.get(edge.destNode));
+            }
+          }
+          if (!matchLeader) {
+            result.put(regionGroupId, regionLeaderMap.get(regionGroupId));
+          }
+        });
+
+    return result;
+  }
+
+  @TestOnly
+  public int getMaximumFlow() {
+    return maximumFlow;
+  }
+
+  @TestOnly
+  public int getMinimumCost() {
+    return minimumCost;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 505dde2076..3db90ba4fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -415,6 +415,19 @@ public class NodeManager {
     return nodeInfo.getRegisteredDataNodes();
   }
 
+  /**
+   * Only leader use this interface
+   *
+   * <p>Notice: The result will be an empty TDataNodeConfiguration if the specified DataNode doesn't
+   * register
+   *
+   * @param dataNodeId The specified DataNode's index
+   * @return The specified registered DataNode
+   */
+  public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
+    return nodeInfo.getRegisteredDataNode(dataNodeId);
+  }
+
   public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
     Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
     nodeInfo
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 7722a668b5..690a5b122c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -440,6 +440,18 @@ public class PartitionManager {
     return partitionInfo.getStorageGroupRelatedDataNodes(storageGroup, type);
   }
 
+  /**
+   * Only leader use this interface
+   *
+   * @param type The specified TConsensusGroupType
+   * @return Deep copy of all Regions' RegionReplicaSet and organized to Map
+   */
+  public Map<TConsensusGroupId, TRegionReplicaSet> getAllReplicaSetsMap(TConsensusGroupType type) {
+    return partitionInfo.getAllReplicaSets(type).stream()
+        .collect(
+            Collectors.toMap(TRegionReplicaSet::getRegionId, regionReplicaSet -> regionReplicaSet));
+  }
+
   /**
    * Only leader use this interface
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index 2526abbad6..ad2f4d23b3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -274,7 +274,7 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
-  /** Return All registered DataNodes */
+  /** @return All registered DataNodes */
   public List<TDataNodeConfiguration> getRegisteredDataNodes() {
     List<TDataNodeConfiguration> result;
     dataNodeInfoReadWriteLock.readLock().lock();
@@ -286,6 +286,16 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
+  /** @return The specified registered DataNode */
+  public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
+    dataNodeInfoReadWriteLock.readLock().lock();
+    try {
+      return registeredDataNodes.getOrDefault(dataNodeId, new TDataNodeConfiguration()).deepCopy();
+    } finally {
+      dataNodeInfoReadWriteLock.readLock().unlock();
+    }
+  }
+
   /**
    * Update ConfigNodeList both in memory and confignode-system.properties file
    *
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java
new file mode 100644
index 0000000000..4ce2e1843a
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MCFLeaderBalancerTest {
+
+  /** This test shows a simple case that greedy algorithm might fail */
+  @Test
+  public void optimalLeaderDistributionTest() {
+    // Prepare Data
+    List<TConsensusGroupId> regionGroupIds = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      regionGroupIds.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, i));
+    }
+
+    List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(i));
+    }
+
+    List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>();
+    regionReplicaSets.add(
+        new TRegionReplicaSet(
+            regionGroupIds.get(0),
+            Arrays.asList(
+                dataNodeLocations.get(0), dataNodeLocations.get(1), dataNodeLocations.get(2))));
+    regionReplicaSets.add(
+        new TRegionReplicaSet(
+            regionGroupIds.get(1),
+            Arrays.asList(
+                dataNodeLocations.get(0), dataNodeLocations.get(1), dataNodeLocations.get(3))));
+    regionReplicaSets.add(
+        new TRegionReplicaSet(
+            regionGroupIds.get(2),
+            Arrays.asList(
+                dataNodeLocations.get(0), dataNodeLocations.get(2), dataNodeLocations.get(3))));
+
+    // Prepare input parameters
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+    regionReplicaSets.forEach(
+        regionReplicaSet ->
+            regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet));
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+    regionReplicaSets.forEach(
+        regionReplicaSet -> regionLeaderMap.put(regionReplicaSet.getRegionId(), 0));
+    Set<Integer> disabledDataNodeSet = new HashSet<>();
+    disabledDataNodeSet.add(0);
+
+    // Do balancing
+    MCFLeaderBalancer mcfLeaderBalancer =
+        new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+    Map<TConsensusGroupId, Integer> leaderDistribution =
+        mcfLeaderBalancer.generateOptimalLeaderDistribution();
+    // All RegionGroup got a leader
+    Assert.assertEquals(3, leaderDistribution.size());
+    // Each DataNode occurs exactly once
+    Assert.assertEquals(3, new HashSet<>(leaderDistribution.values()).size());
+    // MaxFlow is 3
+    Assert.assertEquals(3, mcfLeaderBalancer.getMaximumFlow());
+    // MinimumCost is 3(switch leader cost) + 3(load cost, 1 for each DataNode)
+    Assert.assertEquals(3 + 3, mcfLeaderBalancer.getMinimumCost());
+  }
+
+  /** The leader will remain the same if all DataNodes are disabled */
+  @Test
+  public void disableTest() {
+    TRegionReplicaSet regionReplicaSet =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 0),
+            Arrays.asList(
+                new TDataNodeLocation().setDataNodeId(0),
+                new TDataNodeLocation().setDataNodeId(1),
+                new TDataNodeLocation().setDataNodeId(2)));
+
+    // Prepare input parameters
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+    regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+    regionLeaderMap.put(regionReplicaSet.getRegionId(), 1);
+    Set<Integer> disabledDataNodeSet = new HashSet<>();
+    disabledDataNodeSet.add(0);
+    disabledDataNodeSet.add(1);
+    disabledDataNodeSet.add(2);
+
+    // Do balancing
+    MCFLeaderBalancer mcfLeaderBalancer =
+        new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+    Map<TConsensusGroupId, Integer> leaderDistribution =
+        mcfLeaderBalancer.generateOptimalLeaderDistribution();
+    Assert.assertEquals(1, leaderDistribution.size());
+    Assert.assertEquals(1, new HashSet<>(leaderDistribution.values()).size());
+    // Leader remains the same
+    Assert.assertEquals(
+        regionLeaderMap.get(regionReplicaSet.getRegionId()),
+        leaderDistribution.get(regionReplicaSet.getRegionId()));
+    // MaxFlow is 0
+    Assert.assertEquals(0, mcfLeaderBalancer.getMaximumFlow());
+    // MinimumCost is 0
+    Assert.assertEquals(0, mcfLeaderBalancer.getMinimumCost());
+  }
+
+  /**
+   * In this case shows the balance ability for big cluster.
+   *
+   * <p>i.e. Simulate 1500 RegionGroups and 300 DataNodes
+   */
+  @Test
+  public void bigClusterTest() {
+    final int regionGroupNum = 1500;
+    final int dataNodeNum = 300;
+    final int replicationFactor = 3;
+
+    // The loadCost for each DataNode are the same
+    int x = regionGroupNum / dataNodeNum;
+    // i.e. formula of 1^2 + 2^2 + 3^2 + ...
+    int loadCost = x * (x + 1) * (2 * x + 1) / 6;
+
+    int dataNodeId = 0;
+    Random random = new Random();
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+    for (int i = 0; i < regionGroupNum; i++) {
+      TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+      int leaderId = (dataNodeId + random.nextInt(replicationFactor)) % dataNodeNum;
+
+      TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+      regionReplicaSet.setRegionId(regionGroupId);
+      for (int j = 0; j < 3; j++) {
+        regionReplicaSet.addToDataNodeLocations(new TDataNodeLocation().setDataNodeId(dataNodeId));
+        dataNodeId = (dataNodeId + 1) % dataNodeNum;
+      }
+
+      regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+      regionLeaderMap.put(regionGroupId, leaderId);
+    }
+
+    // Do balancing
+    MCFLeaderBalancer mcfLeaderBalancer =
+        new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, new HashSet<>());
+    Map<TConsensusGroupId, Integer> leaderDistribution =
+        mcfLeaderBalancer.generateOptimalLeaderDistribution();
+    // All RegionGroup got a leader
+    Assert.assertEquals(regionGroupNum, leaderDistribution.size());
+
+    Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+    leaderDistribution
+        .values()
+        .forEach(
+            leaderId ->
+                leaderCounter
+                    .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
+                    .getAndIncrement());
+    // Every DataNode has leader
+    Assert.assertEquals(dataNodeNum, leaderCounter.size());
+    // Every DataNode has exactly regionGroupNum / dataNodeNum leaders
+    leaderCounter
+        .values()
+        .forEach(leaderNum -> Assert.assertEquals(regionGroupNum / dataNodeNum, leaderNum.get()));
+
+    // MaxFlow is regionGroupNum
+    Assert.assertEquals(regionGroupNum, mcfLeaderBalancer.getMaximumFlow());
+
+    int minimumCost = mcfLeaderBalancer.getMinimumCost();
+    Assert.assertTrue(minimumCost >= loadCost * dataNodeNum);
+    // The number of RegionGroups who have switched leader
+    int switchCost = minimumCost - loadCost * dataNodeNum;
+    AtomicInteger switchCount = new AtomicInteger(0);
+    regionLeaderMap.forEach(
+        (regionGroupId, originLeader) -> {
+          if (!Objects.equals(originLeader, leaderDistribution.get(regionGroupId))) {
+            switchCount.getAndIncrement();
+          }
+        });
+    Assert.assertEquals(switchCost, switchCount.get());
+
+    System.out.printf(
+        "MCF algorithm switch leader for %s times to construct a balanced leader distribution of 300 DataNodes and 1500 RegionGroups cluster.%n",
+        switchCost);
+  }
+}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index 70195ada74..7f419b0f41 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -326,4 +326,11 @@ public class MppConfig implements BaseConfig {
         String.valueOf(selectIntoInsertTabletPlanRowLimit));
     return this;
   }
+
+  @Override
+  public BaseConfig setEnableLeaderBalancing(boolean enableLeaderBalancing) {
+    confignodeProperties.setProperty(
+        "enable_leader_balancing", String.valueOf(enableLeaderBalancing));
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 20c6464436..d9d9023b57 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -342,4 +342,12 @@ public interface BaseConfig {
   default int getSelectIntoInsertTabletPlanRowLimit() {
     return 10000;
   }
+
+  default BaseConfig setEnableLeaderBalancing(boolean enableLeaderBalancing) {
+    return this;
+  }
+
+  default boolean isEnableLeaderBalancing() {
+    return false;
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
index e043ff1209..a94727984f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
@@ -22,10 +22,13 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -50,12 +53,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
 public class IoTDBClusterRegionLeaderBalancingIT {
 
+  protected static boolean originalEnableLeaderBalancing;
+
   protected static String originalSchemaRegionConsensusProtocolClass;
   private static final String testSchemaRegionConsensusProtocolClass =
       ConsensusFactory.RATIS_CONSENSUS;
@@ -71,6 +77,9 @@ public class IoTDBClusterRegionLeaderBalancingIT {
 
   @BeforeClass
   public static void setUp() {
+    originalEnableLeaderBalancing = ConfigFactory.getConfig().isEnableLeaderBalancing();
+    ConfigFactory.getConfig().setEnableLeaderBalancing(true);
+
     originalSchemaRegionConsensusProtocolClass =
         ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
     ConfigFactory.getConfig()
@@ -89,6 +98,8 @@ public class IoTDBClusterRegionLeaderBalancingIT {
 
   @AfterClass
   public static void tearDown() {
+    ConfigFactory.getConfig().setEnableLeaderBalancing(originalEnableLeaderBalancing);
+
     ConfigFactory.getConfig()
         .setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
     ConfigFactory.getConfig()
@@ -149,4 +160,144 @@ public class IoTDBClusterRegionLeaderBalancingIT {
       leaderCounter.values().forEach(leaderCount -> Assert.assertEquals(1, leaderCount.get()));
     }
   }
+
+  @Test
+  public void testMCFLeaderDistribution()
+      throws IOException, InterruptedException, TException, IllegalPathException {
+    final int testConfigNodeNum = 1;
+    final int testDataNodeNum = 3;
+    final int retryNum = 40;
+    EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
+
+    TSStatus status;
+    final int storageGroupNum = 6;
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+      for (int i = 0; i < storageGroupNum; i++) {
+        // Set StorageGroups
+        TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
+        status = client.setStorageGroup(setReq);
+        Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+        // TODO: Create a SchemaRegionGroup for each StorageGroup
+        // TODO: (The Ratis protocol class is now hard to change leader)
+        //        TSchemaPartitionTableResp schemaPartitionTableResp =
+        //            client.getOrCreateSchemaPartitionTable(
+        //                new TSchemaPartitionReq(
+        //                    ConfigNodeTestUtils.generatePatternTreeBuffer(
+        //                        new String[] {sg + i + "." + "d"})));
+        //        Assert.assertEquals(
+        //            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+        //            schemaPartitionTableResp.getStatus().getCode());
+
+        // Create a DataRegionGroup for each StorageGroup
+        Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesSlotMap = new HashMap<>();
+        seriesSlotMap.put(
+            new TSeriesPartitionSlot(1), Collections.singletonList(new TTimePartitionSlot(100)));
+        Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> sgSlotsMap =
+            new HashMap<>();
+        sgSlotsMap.put(sg + i, seriesSlotMap);
+        TDataPartitionTableResp dataPartitionTableResp =
+            client.getOrCreateDataPartitionTable(new TDataPartitionReq(sgSlotsMap));
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            dataPartitionTableResp.getStatus().getCode());
+      }
+
+      // Check leader distribution
+      Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+      TShowRegionResp showRegionResp;
+      boolean isDistributionBalanced = false;
+      for (int retry = 0; retry < retryNum; retry++) {
+        leaderCounter.clear();
+        showRegionResp = client.showRegion(new TShowRegionReq());
+        showRegionResp
+            .getRegionInfoList()
+            .forEach(
+                regionInfo -> {
+                  if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+                    leaderCounter
+                        .computeIfAbsent(regionInfo.getDataNodeId(), empty -> new AtomicInteger(0))
+                        .getAndIncrement();
+                  }
+                });
+
+        // All DataNodes have Region-leader
+        isDistributionBalanced = leaderCounter.size() == testDataNodeNum;
+        // Each DataNode has exactly 4 Region-leader
+        for (AtomicInteger leaderCount : leaderCounter.values()) {
+          if (leaderCount.get() != 2) {
+            isDistributionBalanced = false;
+          }
+        }
+
+        if (isDistributionBalanced) {
+          break;
+        } else {
+          TimeUnit.SECONDS.sleep(1);
+        }
+      }
+      Assert.assertTrue(isDistributionBalanced);
+
+      // Shutdown a DataNode
+      boolean isDataNodeShutdown = false;
+      EnvFactory.getEnv().shutdownDataNode(0);
+      for (int retry = 0; retry < retryNum; retry++) {
+        AtomicInteger runningCnt = new AtomicInteger(0);
+        AtomicInteger unknownCnt = new AtomicInteger(0);
+        TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+        showDataNodesResp
+            .getDataNodesInfoList()
+            .forEach(
+                dataNodeInfo -> {
+                  if (NodeStatus.Running.getStatus().equals(dataNodeInfo.getStatus())) {
+                    runningCnt.getAndIncrement();
+                  } else if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
+                    unknownCnt.getAndIncrement();
+                  }
+                });
+        if (runningCnt.get() == testDataNodeNum - 1 && unknownCnt.get() == 1) {
+          isDataNodeShutdown = true;
+          break;
+        }
+
+        TimeUnit.SECONDS.sleep(1);
+      }
+      Assert.assertTrue(isDataNodeShutdown);
+
+      // Check leader distribution
+      isDistributionBalanced = false;
+      for (int retry = 0; retry < retryNum; retry++) {
+        leaderCounter.clear();
+        showRegionResp = client.showRegion(new TShowRegionReq());
+        showRegionResp
+            .getRegionInfoList()
+            .forEach(
+                regionInfo -> {
+                  if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+                    leaderCounter
+                        .computeIfAbsent(regionInfo.getDataNodeId(), empty -> new AtomicInteger(0))
+                        .getAndIncrement();
+                  }
+                });
+
+        // Only Running DataNodes have Region-leader
+        isDistributionBalanced = leaderCounter.size() == testDataNodeNum - 1;
+        // Each Running DataNode has exactly 6 Region-leader
+        for (AtomicInteger leaderCount : leaderCounter.values()) {
+          if (leaderCount.get() != 3) {
+            isDistributionBalanced = false;
+          }
+        }
+
+        if (isDistributionBalanced) {
+          break;
+        } else {
+          TimeUnit.SECONDS.sleep(1);
+        }
+      }
+      Assert.assertTrue(isDistributionBalanced);
+    }
+  }
 }
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 107bf6d807..6a894ffc8f 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -101,6 +101,10 @@
 # Datatype: string
 # routing_policy=leader
 
+# Whether enable ConfigNode-leader to balance RegionGroups' leader distribution automatically.
+# Datatype: boolean
+# enable_leader_balancing=false
+
 ####################
 ### Cluster management
 ####################