You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/08 06:04:32 UTC
[iotdb] branch master updated: [IOTDB-4768] Balancing cluster RegionGroup leader distribution by MinimumCostFlow algorithm (#7774)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1f8085c65d [IOTDB-4768] Balancing cluster RegionGroup leader distribution by MinimumCostFlow algorithm (#7774)
1f8085c65d is described below
commit 1f8085c65d66e7bb7dfc91df0f6119a0a2e698ca
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Nov 8 14:04:25 2022 +0800
[IOTDB-4768] Balancing cluster RegionGroup leader distribution by MinimumCostFlow algorithm (#7774)
---
.../confignode/client/DataNodeRequestType.java | 1 +
.../client/async/AsyncDataNodeClientPool.java | 7 +
.../client/async/handlers/AsyncClientHandler.java | 1 +
.../iotdb/confignode/conf/ConfigNodeConfig.java | 11 +
.../confignode/conf/ConfigNodeDescriptor.java | 5 +
.../manager/load/balancer/RouteBalancer.java | 132 +++++++--
.../manager/load/balancer/router/mcf/MCFEdge.java | 34 +++
.../balancer/router/mcf/MCFLeaderBalancer.java | 305 +++++++++++++++++++++
.../iotdb/confignode/manager/node/NodeManager.java | 13 +
.../manager/partition/PartitionManager.java | 12 +
.../confignode/persistence/node/NodeInfo.java | 12 +-
.../balancer/router/mcf/MCFLeaderBalancerTest.java | 216 +++++++++++++++
.../java/org/apache/iotdb/it/env/MppConfig.java | 7 +
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +
.../it/IoTDBClusterRegionLeaderBalancingIT.java | 151 ++++++++++
.../resources/conf/iotdb-common.properties | 4 +
16 files changed, 899 insertions(+), 20 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 78c3524801..ff236ce339 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -43,6 +43,7 @@ public enum DataNodeRequestType {
DELETE_OLD_REGION_PEER,
UPDATE_REGION_ROUTE_MAP,
+ CHANGE_REGION_LEADER,
/** PartitionCache */
INVALIDATE_PARTITION_CACHE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index e6f1c6375c..bfd686a890 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq;
@@ -219,6 +220,12 @@ public class AsyncDataNodeClientPool {
(AsyncTSStatusRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
+ case CHANGE_REGION_LEADER:
+ client.changeRegionLeader(
+ (TRegionLeaderChangeReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
case BROADCAST_LATEST_CONFIG_NODE_GROUP:
client.updateConfigNodeGroup(
(TUpdateConfigNodeGroupReq) clientHandler.getRequest(requestId),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 612bea4f05..43b8ad82e4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -214,6 +214,7 @@ public class AsyncClientHandler<Q, R> {
case BROADCAST_LATEST_CONFIG_NODE_GROUP:
case INVALIDATE_MATCHED_SCHEMA_CACHE:
case UPDATE_TEMPLATE:
+ case CHANGE_REGION_LEADER:
default:
return new AsyncTSStatusRPCHandler(
requestType,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 3990b02f48..1cf93b4aa8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -141,6 +141,9 @@ public class ConfigNodeConfig {
/** The routing policy of read/write requests */
private String routingPolicy = RouteBalancer.LEADER_POLICY;
+ /** The ConfigNode-leader will automatically balance leader distribution if set true */
+ private boolean enableLeaderBalancing = false;
+
private String readConsistencyLevel = "strong";
/** RatisConsensus protocol, Max size for a single log append request from leader */
@@ -529,6 +532,14 @@ public class ConfigNodeConfig {
this.routingPolicy = routingPolicy;
}
+ public boolean isEnableLeaderBalancing() {
+ return enableLeaderBalancing;
+ }
+
+ public void setEnableLeaderBalancing(boolean enableLeaderBalancing) {
+ this.enableLeaderBalancing = enableLeaderBalancing;
+ }
+
public String getReadConsistencyLevel() {
return readConsistencyLevel;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 4eed5f5137..d84872ebbc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -304,6 +304,11 @@ public class ConfigNodeDescriptor {
"Unknown routing_policy: %s, please set to \"leader\" or \"greedy\"", routingPolicy));
}
+ conf.setEnableLeaderBalancing(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_leader_balancing", String.valueOf(conf.isEnableLeaderBalancing()))));
+
String readConsistencyLevel =
properties.getProperty("read_consistency_level", conf.getReadConsistencyLevel()).trim();
if (readConsistencyLevel.equals("strong") || readConsistencyLevel.equals("weak")) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index ee8a8bdd9a..e9d21891a8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -20,19 +20,28 @@ package org.apache.iotdb.confignode.manager.load.balancer;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.router.IRouter;
import org.apache.iotdb.confignode.manager.load.balancer.router.LeaderRouter;
import org.apache.iotdb.confignode.manager.load.balancer.router.LoadScoreGreedyRouter;
import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.confignode.manager.load.balancer.router.mcf.MCFLeaderBalancer;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -60,9 +69,16 @@ public class RouteBalancer {
private static final Logger LOGGER = LoggerFactory.getLogger(RouteBalancer.class);
+ private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+ private static final boolean ENABLE_LEADER_BALANCING = CONF.isEnableLeaderBalancing();
+ private static final String SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS =
+ CONF.getSchemaRegionConsensusProtocolClass();
+ private static final String DATA_REGION_CONSENSUS_PROTOCOL_CLASS =
+ CONF.getDataRegionConsensusProtocolClass();
private static final boolean isMultiLeader =
ConsensusFactory.MULTI_LEADER_CONSENSUS.equals(
ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
+
public static final String LEADER_POLICY = "leader";
public static final String GREEDY_POLICY = "greedy";
@@ -212,36 +228,114 @@ public class RouteBalancer {
/** Start the route balancing service */
public void startRouteBalancingService() {
- synchronized (scheduleMonitor) {
- if (currentLeaderBalancingFuture == null) {
- currentLeaderBalancingFuture =
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- leaderBalancingExecutor,
- this::balancingRegionLeader,
- 0,
- // Execute route balancing service in every 10 loops of heartbeat service
- NodeManager.HEARTBEAT_INTERVAL * 10,
- TimeUnit.MILLISECONDS);
- LOGGER.info("Route-Balancing service is started successfully.");
+ if (ENABLE_LEADER_BALANCING) {
+ synchronized (scheduleMonitor) {
+ if (currentLeaderBalancingFuture == null) {
+ currentLeaderBalancingFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ leaderBalancingExecutor,
+ this::balancingRegionLeader,
+ 0,
+ // Execute route balancing service in every 5 loops of heartbeat service
+ NodeManager.HEARTBEAT_INTERVAL * 5,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Route-Balancing service is started successfully.");
+ }
}
}
}
/** Stop the route balancing service */
public void stopRouteBalancingService() {
- synchronized (scheduleMonitor) {
- if (currentLeaderBalancingFuture != null) {
- currentLeaderBalancingFuture.cancel(false);
- currentLeaderBalancingFuture = null;
- leaderCache.clear();
- regionRouteMap.clear();
- LOGGER.info("Route-Balancing service is stopped successfully.");
+ if (ENABLE_LEADER_BALANCING) {
+ synchronized (scheduleMonitor) {
+ if (currentLeaderBalancingFuture != null) {
+ currentLeaderBalancingFuture.cancel(false);
+ currentLeaderBalancingFuture = null;
+ leaderCache.clear();
+ regionRouteMap.clear();
+ LOGGER.info("Route-Balancing service is stopped successfully.");
+ }
}
}
}
private void balancingRegionLeader() {
- // TODO: IOTDB-4768
+ balancingRegionLeader(TConsensusGroupType.SchemaRegion);
+ balancingRegionLeader(TConsensusGroupType.DataRegion);
+ }
+
+ private void balancingRegionLeader(TConsensusGroupType regionGroupType) {
+ // Collect latest data to generate leaderBalancer
+ MCFLeaderBalancer leaderBalancer =
+ new MCFLeaderBalancer(
+ getPartitionManager().getAllReplicaSetsMap(regionGroupType),
+ regionRouteMap.getRegionLeaderMap(),
+ getNodeManager()
+ .filterDataNodeThroughStatus(
+ NodeStatus.Unknown, NodeStatus.ReadOnly, NodeStatus.Removing)
+ .stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet()));
+
+ // Calculate the optimal leader distribution
+ Map<TConsensusGroupId, Integer> leaderDistribution =
+ leaderBalancer.generateOptimalLeaderDistribution();
+
+ // Transfer leader to the optimal distribution
+ AtomicInteger requestId = new AtomicInteger(0);
+ AsyncClientHandler<TRegionLeaderChangeReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.CHANGE_REGION_LEADER);
+ leaderDistribution.forEach(
+ (regionGroupId, newLeaderId) -> {
+ if (newLeaderId != regionRouteMap.getLeader(regionGroupId)) {
+ String consensusProtocolClass;
+ switch (regionGroupId.getType()) {
+ case SchemaRegion:
+ consensusProtocolClass = SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS;
+ break;
+ case DataRegion:
+ default:
+ consensusProtocolClass = DATA_REGION_CONSENSUS_PROTOCOL_CLASS;
+ break;
+ }
+ LOGGER.info(
+ "[LeaderBalancer] Try to change the leader of Region: {} to DataNode: {} ",
+ regionGroupId,
+ newLeaderId);
+ changeRegionLeader(
+ consensusProtocolClass, requestId, clientHandler, regionGroupId, newLeaderId);
+ }
+ });
+ if (requestId.get() > 0) {
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ }
+ }
+
+ private void changeRegionLeader(
+ String consensusProtocolClass,
+ AtomicInteger requestId,
+ AsyncClientHandler<TRegionLeaderChangeReq, TSStatus> clientHandler,
+ TConsensusGroupId regionGroupId,
+ int newLeaderId) {
+ switch (consensusProtocolClass) {
+ case ConsensusFactory.MULTI_LEADER_CONSENSUS:
+ // For multi-leader protocol, change RegionRouteMap is enough.
+ // And the result will be broadcast by Cluster-LoadStatistics-Service soon.
+ regionRouteMap.setLeader(regionGroupId, newLeaderId);
+ break;
+ case ConsensusFactory.RATIS_CONSENSUS:
+ default:
+ // For ratis protocol, the ConfigNode-leader will send a changeLeaderRequest to the new
+ // leader.
+ // And the RegionRouteMap will be updated by Cluster-Heartbeat-Service later if change
+ // leader success.
+ TRegionLeaderChangeReq regionLeaderChangeReq =
+ new TRegionLeaderChangeReq(
+ regionGroupId, getNodeManager().getRegisteredDataNode(newLeaderId).getLocation());
+ clientHandler.putRequest(requestId.getAndIncrement(), regionLeaderChangeReq);
+ }
}
/** Initialize the regionRouteMap when the ConfigNode-Leader is switched */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java
new file mode 100644
index 0000000000..d849c195e3
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
+
+public class MCFEdge {
+
+ public int destNode;
+ public int capacity;
+ public int cost;
+ public int nextEdge;
+
+ public MCFEdge(int destNode, int capacity, int cost, int nextEdge) {
+ this.destNode = destNode;
+ this.capacity = capacity;
+ this.cost = cost;
+ this.nextEdge = nextEdge;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java
new file mode 100644
index 0000000000..5dbd292964
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Leader distribution balancer that uses minimum cost flow algorithm */
+public class MCFLeaderBalancer {
+
+ private static final int INFINITY = Integer.MAX_VALUE;
+
+ /** Input parameters */
+ private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+
+ private final Map<TConsensusGroupId, Integer> regionLeaderMap;
+ private final Set<Integer> disabledDataNodeSet;
+
+ /** Graph nodes */
+ // Super source node
+ private static final int sNode = 0;
+ // Super terminal node
+ private static final int tNode = 1;
+ // Maximum index of graph nodes
+ private int maxNode = tNode + 1;
+ // Map<RegionGroupId, rNode>
+ private final Map<TConsensusGroupId, Integer> rNodeMap;
+ // Map<DataNodeId, dNode>
+ private final Map<Integer, Integer> dNodeMap;
+ // Map<dNode, DataNodeId>
+ private final Map<Integer, Integer> dNodeReflect;
+
+ /** Graph edges */
+ // Maximum index of graph edges
+ private int maxEdge = 0;
+
+ private final List<MCFEdge> mcfEdges;
+ private int[] nodeHeadEdge;
+ private int[] nodeCurrentEdge;
+
+ private boolean[] isNodeVisited;
+ private int[] nodeMinimumCost;
+
+ private int maximumFlow = 0;
+ private int minimumCost = 0;
+
+ public MCFLeaderBalancer(
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Set<Integer> disabledDataNodeSet) {
+ this.regionReplicaSetMap = regionReplicaSetMap;
+ this.regionLeaderMap = regionLeaderMap;
+ this.disabledDataNodeSet = disabledDataNodeSet;
+
+ this.rNodeMap = new HashMap<>();
+ this.dNodeMap = new HashMap<>();
+ this.dNodeReflect = new HashMap<>();
+
+ this.mcfEdges = new ArrayList<>();
+ }
+
+ public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution() {
+ constructMCFGraph();
+ dinicAlgorithm();
+ return collectLeaderDistribution();
+ }
+
+ private void constructMCFGraph() {
+ /* Indicate nodes in mcf */
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+ rNodeMap.put(regionReplicaSet.getRegionId(), maxNode++);
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ if (!dNodeMap.containsKey(dataNodeLocation.getDataNodeId())) {
+ dNodeMap.put(dataNodeLocation.getDataNodeId(), maxNode);
+ dNodeReflect.put(maxNode, dataNodeLocation.getDataNodeId());
+ maxNode += 1;
+ }
+ }
+ }
+
+ /* Prepare arrays */
+ isNodeVisited = new boolean[maxNode];
+ nodeMinimumCost = new int[maxNode];
+ nodeCurrentEdge = new int[maxNode];
+ nodeHeadEdge = new int[maxNode];
+ Arrays.fill(nodeHeadEdge, -1);
+
+ /* Construct edges: sNode -> rNodes */
+ for (int rNode : rNodeMap.values()) {
+ // Cost: 0
+ addAdjacentEdges(sNode, rNode, 1, 0);
+ }
+
+ /* Construct edges: rNodes -> dNodes */
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+ int rNode = rNodeMap.get(regionReplicaSet.getRegionId());
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId());
+ // Cost: 1 if the dNode is corresponded to the current leader of the rNode,
+ // 0 otherwise.
+ // Therefore, the RegionGroup will keep the leader as constant as possible.
+ int cost =
+ regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1)
+ == dataNodeLocation.getDataNodeId()
+ ? 0
+ : 1;
+ addAdjacentEdges(rNode, dNode, 1, cost);
+ }
+ }
+
+ /* Construct edges: dNodes -> tNode */
+ // Count the possible maximum number of leader in each DataNode
+ Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>();
+ regionReplicaSetMap
+ .values()
+ .forEach(
+ regionReplicaSet ->
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ maxLeaderCounter
+ .computeIfAbsent(
+ dataNodeLocation.getDataNodeId(), empty -> new AtomicInteger(0))
+ .getAndIncrement()));
+
+ for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) {
+ int dataNodeId = dNodeEntry.getKey();
+ int dNode = dNodeEntry.getValue();
+
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+
+ int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get();
+ for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) {
+ // Cost: x^2 for the x-th edge at the current dNode.
+ // Thus, the leader distribution will be as balance as possible.
+ addAdjacentEdges(dNode, tNode, 1, extraEdge * extraEdge);
+ }
+ }
+ }
+
+ private void addAdjacentEdges(int fromNode, int destNode, int capacity, int cost) {
+ addEdge(fromNode, destNode, capacity, cost);
+ addEdge(destNode, fromNode, 0, -cost);
+ }
+
+ private void addEdge(int fromNode, int destNode, int capacity, int cost) {
+ MCFEdge edge = new MCFEdge(destNode, capacity, cost, nodeHeadEdge[fromNode]);
+ mcfEdges.add(edge);
+ nodeHeadEdge[fromNode] = maxEdge++;
+ }
+
+ /**
+ * Check whether there is an augmented path in the MCF graph by Bellman-Ford algorithm.
+ *
+ * <p>Notice: Never use Dijkstra algorithm to replace this since there might exist negative
+ * circles.
+ *
+ * @return True if there exist augmented paths, false otherwise.
+ */
+ private boolean bellmanFordCheck() {
+ Arrays.fill(isNodeVisited, false);
+ Arrays.fill(nodeMinimumCost, INFINITY);
+
+ Queue<Integer> queue = new LinkedList<>();
+ nodeMinimumCost[sNode] = 0;
+ isNodeVisited[sNode] = true;
+ queue.offer(sNode);
+ while (!queue.isEmpty()) {
+ int currentNode = queue.poll();
+ isNodeVisited[currentNode] = false;
+ for (int currentEdge = nodeHeadEdge[currentNode];
+ currentEdge >= 0;
+ currentEdge = mcfEdges.get(currentEdge).nextEdge) {
+ MCFEdge edge = mcfEdges.get(currentEdge);
+ if (edge.capacity > 0
+ && nodeMinimumCost[currentNode] + edge.cost < nodeMinimumCost[edge.destNode]) {
+ nodeMinimumCost[edge.destNode] = nodeMinimumCost[currentNode] + edge.cost;
+ if (!isNodeVisited[edge.destNode]) {
+ isNodeVisited[edge.destNode] = true;
+ queue.offer(edge.destNode);
+ }
+ }
+ }
+ }
+
+ return nodeMinimumCost[tNode] < INFINITY;
+ }
+
+ /** Do augmentation by dfs algorithm */
+ private int dfsAugmentation(int currentNode, int inputFlow) {
+ if (currentNode == tNode || inputFlow == 0) {
+ return inputFlow;
+ }
+
+ int currentEdge;
+ int outputFlow = 0;
+ isNodeVisited[currentNode] = true;
+ for (currentEdge = nodeCurrentEdge[currentNode];
+ currentEdge >= 0;
+ currentEdge = mcfEdges.get(currentEdge).nextEdge) {
+ MCFEdge edge = mcfEdges.get(currentEdge);
+ if (nodeMinimumCost[currentNode] + edge.cost == nodeMinimumCost[edge.destNode]
+ && edge.capacity > 0
+ && !isNodeVisited[edge.destNode]) {
+
+ int subOutputFlow = dfsAugmentation(edge.destNode, Math.min(inputFlow, edge.capacity));
+
+ minimumCost += subOutputFlow * edge.cost;
+
+ edge.capacity -= subOutputFlow;
+ mcfEdges.get(currentEdge ^ 1).capacity += subOutputFlow;
+
+ inputFlow -= subOutputFlow;
+ outputFlow += subOutputFlow;
+
+ if (inputFlow == 0) {
+ break;
+ }
+ }
+ }
+ nodeCurrentEdge[currentNode] = currentEdge;
+
+ if (outputFlow > 0) {
+ isNodeVisited[currentNode] = false;
+ }
+ return outputFlow;
+ }
+
+ private void dinicAlgorithm() {
+ while (bellmanFordCheck()) {
+ int currentFlow;
+ System.arraycopy(nodeHeadEdge, 0, nodeCurrentEdge, 0, maxNode);
+ while ((currentFlow = dfsAugmentation(sNode, INFINITY)) > 0) {
+ maximumFlow += currentFlow;
+ }
+ }
+ }
+
+ /** @return Map<RegionGroupId, DataNodeId where the new leader locate> */
+ private Map<TConsensusGroupId, Integer> collectLeaderDistribution() {
+ Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
+
+ rNodeMap.forEach(
+ (regionGroupId, rNode) -> {
+ boolean matchLeader = false;
+ for (int currentEdge = nodeHeadEdge[rNode];
+ currentEdge >= 0;
+ currentEdge = mcfEdges.get(currentEdge).nextEdge) {
+ MCFEdge edge = mcfEdges.get(currentEdge);
+ if (edge.destNode != sNode && edge.capacity == 0) {
+ matchLeader = true;
+ result.put(regionGroupId, dNodeReflect.get(edge.destNode));
+ }
+ }
+ if (!matchLeader) {
+ result.put(regionGroupId, regionLeaderMap.get(regionGroupId));
+ }
+ });
+
+ return result;
+ }
+
+ @TestOnly
+ public int getMaximumFlow() {
+ return maximumFlow;
+ }
+
+ @TestOnly
+ public int getMinimumCost() {
+ return minimumCost;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 505dde2076..3db90ba4fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -415,6 +415,19 @@ public class NodeManager {
return nodeInfo.getRegisteredDataNodes();
}
+ /**
+ * Only leader use this interface
+ *
+ * <p>Notice: The result will be an empty TDataNodeConfiguration if the specified DataNode doesn't
+ * register
+ *
+ * @param dataNodeId The specified DataNode's index
+ * @return The specified registered DataNode
+ */
+ public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
+ return nodeInfo.getRegisteredDataNode(dataNodeId);
+ }
+
public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
nodeInfo
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 7722a668b5..690a5b122c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -440,6 +440,18 @@ public class PartitionManager {
return partitionInfo.getStorageGroupRelatedDataNodes(storageGroup, type);
}
+ /**
+ * Only leader use this interface
+ *
+ * @param type The specified TConsensusGroupType
+ * @return Deep copy of all Regions' RegionReplicaSet and organized to Map
+ */
+ public Map<TConsensusGroupId, TRegionReplicaSet> getAllReplicaSetsMap(TConsensusGroupType type) {
+ return partitionInfo.getAllReplicaSets(type).stream()
+ .collect(
+ Collectors.toMap(TRegionReplicaSet::getRegionId, regionReplicaSet -> regionReplicaSet));
+ }
+
/**
* Only leader use this interface
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index 2526abbad6..ad2f4d23b3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -274,7 +274,7 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
- /** Return All registered DataNodes */
+ /** @return All registered DataNodes */
public List<TDataNodeConfiguration> getRegisteredDataNodes() {
List<TDataNodeConfiguration> result;
dataNodeInfoReadWriteLock.readLock().lock();
@@ -286,6 +286,16 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
+ /** @return The specified registered DataNode */
+ public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
+ dataNodeInfoReadWriteLock.readLock().lock();
+ try {
+ return registeredDataNodes.getOrDefault(dataNodeId, new TDataNodeConfiguration()).deepCopy();
+ } finally {
+ dataNodeInfoReadWriteLock.readLock().unlock();
+ }
+ }
+
/**
* Update ConfigNodeList both in memory and confignode-system.properties file
*
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java
new file mode 100644
index 0000000000..4ce2e1843a
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MCFLeaderBalancerTest {
+
+ /** This test shows a simple case that greedy algorithm might fail */
+ @Test
+ public void optimalLeaderDistributionTest() {
+ // Prepare Data
+ List<TConsensusGroupId> regionGroupIds = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ regionGroupIds.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, i));
+ }
+
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(i));
+ }
+
+ List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>();
+ regionReplicaSets.add(
+ new TRegionReplicaSet(
+ regionGroupIds.get(0),
+ Arrays.asList(
+ dataNodeLocations.get(0), dataNodeLocations.get(1), dataNodeLocations.get(2))));
+ regionReplicaSets.add(
+ new TRegionReplicaSet(
+ regionGroupIds.get(1),
+ Arrays.asList(
+ dataNodeLocations.get(0), dataNodeLocations.get(1), dataNodeLocations.get(3))));
+ regionReplicaSets.add(
+ new TRegionReplicaSet(
+ regionGroupIds.get(2),
+ Arrays.asList(
+ dataNodeLocations.get(0), dataNodeLocations.get(2), dataNodeLocations.get(3))));
+
+ // Prepare input parameters
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+ regionReplicaSets.forEach(
+ regionReplicaSet ->
+ regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet));
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ regionReplicaSets.forEach(
+ regionReplicaSet -> regionLeaderMap.put(regionReplicaSet.getRegionId(), 0));
+ Set<Integer> disabledDataNodeSet = new HashSet<>();
+ disabledDataNodeSet.add(0);
+
+ // Do balancing
+ MCFLeaderBalancer mcfLeaderBalancer =
+ new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+ Map<TConsensusGroupId, Integer> leaderDistribution =
+ mcfLeaderBalancer.generateOptimalLeaderDistribution();
+ // All RegionGroup got a leader
+ Assert.assertEquals(3, leaderDistribution.size());
+ // Each DataNode occurs exactly once
+ Assert.assertEquals(3, new HashSet<>(leaderDistribution.values()).size());
+ // MaxFlow is 3
+ Assert.assertEquals(3, mcfLeaderBalancer.getMaximumFlow());
+ // MinimumCost is 3(switch leader cost) + 3(load cost, 1 for each DataNode)
+ Assert.assertEquals(3 + 3, mcfLeaderBalancer.getMinimumCost());
+ }
+
+ /** The leader will remain the same if all DataNodes are disabled */
+ @Test
+ public void disableTest() {
+ TRegionReplicaSet regionReplicaSet =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 0),
+ Arrays.asList(
+ new TDataNodeLocation().setDataNodeId(0),
+ new TDataNodeLocation().setDataNodeId(1),
+ new TDataNodeLocation().setDataNodeId(2)));
+
+ // Prepare input parameters
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+ regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ regionLeaderMap.put(regionReplicaSet.getRegionId(), 1);
+ Set<Integer> disabledDataNodeSet = new HashSet<>();
+ disabledDataNodeSet.add(0);
+ disabledDataNodeSet.add(1);
+ disabledDataNodeSet.add(2);
+
+ // Do balancing
+ MCFLeaderBalancer mcfLeaderBalancer =
+ new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+ Map<TConsensusGroupId, Integer> leaderDistribution =
+ mcfLeaderBalancer.generateOptimalLeaderDistribution();
+ Assert.assertEquals(1, leaderDistribution.size());
+ Assert.assertEquals(1, new HashSet<>(leaderDistribution.values()).size());
+ // Leader remains the same
+ Assert.assertEquals(
+ regionLeaderMap.get(regionReplicaSet.getRegionId()),
+ leaderDistribution.get(regionReplicaSet.getRegionId()));
+ // MaxFlow is 0
+ Assert.assertEquals(0, mcfLeaderBalancer.getMaximumFlow());
+ // MinimumCost is 0
+ Assert.assertEquals(0, mcfLeaderBalancer.getMinimumCost());
+ }
+
+ /**
+ * In this case shows the balance ability for big cluster.
+ *
+ * <p>i.e. Simulate 1500 RegionGroups and 300 DataNodes
+ */
+ @Test
+ public void bigClusterTest() {
+ final int regionGroupNum = 1500;
+ final int dataNodeNum = 300;
+ final int replicationFactor = 3;
+
+ // The loadCost for each DataNode are the same
+ int x = regionGroupNum / dataNodeNum;
+ // i.e. formula of 1^2 + 2^2 + 3^2 + ...
+ int loadCost = x * (x + 1) * (2 * x + 1) / 6;
+
+ int dataNodeId = 0;
+ Random random = new Random();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ for (int i = 0; i < regionGroupNum; i++) {
+ TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+ int leaderId = (dataNodeId + random.nextInt(replicationFactor)) % dataNodeNum;
+
+ TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+ regionReplicaSet.setRegionId(regionGroupId);
+ for (int j = 0; j < 3; j++) {
+ regionReplicaSet.addToDataNodeLocations(new TDataNodeLocation().setDataNodeId(dataNodeId));
+ dataNodeId = (dataNodeId + 1) % dataNodeNum;
+ }
+
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionLeaderMap.put(regionGroupId, leaderId);
+ }
+
+ // Do balancing
+ MCFLeaderBalancer mcfLeaderBalancer =
+ new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, new HashSet<>());
+ Map<TConsensusGroupId, Integer> leaderDistribution =
+ mcfLeaderBalancer.generateOptimalLeaderDistribution();
+ // All RegionGroup got a leader
+ Assert.assertEquals(regionGroupNum, leaderDistribution.size());
+
+ Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ leaderDistribution
+ .values()
+ .forEach(
+ leaderId ->
+ leaderCounter
+ .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
+ .getAndIncrement());
+ // Every DataNode has leader
+ Assert.assertEquals(dataNodeNum, leaderCounter.size());
+ // Every DataNode has exactly regionGroupNum / dataNodeNum leaders
+ leaderCounter
+ .values()
+ .forEach(leaderNum -> Assert.assertEquals(regionGroupNum / dataNodeNum, leaderNum.get()));
+
+ // MaxFlow is regionGroupNum
+ Assert.assertEquals(regionGroupNum, mcfLeaderBalancer.getMaximumFlow());
+
+ int minimumCost = mcfLeaderBalancer.getMinimumCost();
+ Assert.assertTrue(minimumCost >= loadCost * dataNodeNum);
+ // The number of RegionGroups who have switched leader
+ int switchCost = minimumCost - loadCost * dataNodeNum;
+ AtomicInteger switchCount = new AtomicInteger(0);
+ regionLeaderMap.forEach(
+ (regionGroupId, originLeader) -> {
+ if (!Objects.equals(originLeader, leaderDistribution.get(regionGroupId))) {
+ switchCount.getAndIncrement();
+ }
+ });
+ Assert.assertEquals(switchCost, switchCount.get());
+
+ System.out.printf(
+ "MCF algorithm switch leader for %s times to construct a balanced leader distribution of 300 DataNodes and 1500 RegionGroups cluster.%n",
+ switchCost);
+ }
+}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index 70195ada74..7f419b0f41 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -326,4 +326,11 @@ public class MppConfig implements BaseConfig {
String.valueOf(selectIntoInsertTabletPlanRowLimit));
return this;
}
+
+ @Override
+ public BaseConfig setEnableLeaderBalancing(boolean enableLeaderBalancing) {
+ confignodeProperties.setProperty(
+ "enable_leader_balancing", String.valueOf(enableLeaderBalancing));
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 20c6464436..d9d9023b57 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -342,4 +342,12 @@ public interface BaseConfig {
default int getSelectIntoInsertTabletPlanRowLimit() {
return 10000;
}
+
+ default BaseConfig setEnableLeaderBalancing(boolean enableLeaderBalancing) {
+ return this;
+ }
+
+ default boolean isEnableLeaderBalancing() {
+ return false;
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
index e043ff1209..a94727984f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
@@ -22,10 +22,13 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionRoleType;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -50,12 +53,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBClusterRegionLeaderBalancingIT {
+ protected static boolean originalEnableLeaderBalancing;
+
protected static String originalSchemaRegionConsensusProtocolClass;
private static final String testSchemaRegionConsensusProtocolClass =
ConsensusFactory.RATIS_CONSENSUS;
@@ -71,6 +77,9 @@ public class IoTDBClusterRegionLeaderBalancingIT {
@BeforeClass
public static void setUp() {
+ originalEnableLeaderBalancing = ConfigFactory.getConfig().isEnableLeaderBalancing();
+ ConfigFactory.getConfig().setEnableLeaderBalancing(true);
+
originalSchemaRegionConsensusProtocolClass =
ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
ConfigFactory.getConfig()
@@ -89,6 +98,8 @@ public class IoTDBClusterRegionLeaderBalancingIT {
@AfterClass
public static void tearDown() {
+ ConfigFactory.getConfig().setEnableLeaderBalancing(originalEnableLeaderBalancing);
+
ConfigFactory.getConfig()
.setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
ConfigFactory.getConfig()
@@ -149,4 +160,144 @@ public class IoTDBClusterRegionLeaderBalancingIT {
leaderCounter.values().forEach(leaderCount -> Assert.assertEquals(1, leaderCount.get()));
}
}
+
+ @Test
+ public void testMCFLeaderDistribution()
+ throws IOException, InterruptedException, TException, IllegalPathException {
+ final int testConfigNodeNum = 1;
+ final int testDataNodeNum = 3;
+ final int retryNum = 40;
+ EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
+
+ TSStatus status;
+ final int storageGroupNum = 6;
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ for (int i = 0; i < storageGroupNum; i++) {
+ // Set StorageGroups
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
+ status = client.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+ // TODO: Create a SchemaRegionGroup for each StorageGroup
+ // TODO: (The Ratis protocol class is now hard to change leader)
+ // TSchemaPartitionTableResp schemaPartitionTableResp =
+ // client.getOrCreateSchemaPartitionTable(
+ // new TSchemaPartitionReq(
+ // ConfigNodeTestUtils.generatePatternTreeBuffer(
+ // new String[] {sg + i + "." + "d"})));
+ // Assert.assertEquals(
+ // TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ // schemaPartitionTableResp.getStatus().getCode());
+
+ // Create a DataRegionGroup for each StorageGroup
+ Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesSlotMap = new HashMap<>();
+ seriesSlotMap.put(
+ new TSeriesPartitionSlot(1), Collections.singletonList(new TTimePartitionSlot(100)));
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> sgSlotsMap =
+ new HashMap<>();
+ sgSlotsMap.put(sg + i, seriesSlotMap);
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getOrCreateDataPartitionTable(new TDataPartitionReq(sgSlotsMap));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ }
+
+ // Check leader distribution
+ Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ TShowRegionResp showRegionResp;
+ boolean isDistributionBalanced = false;
+ for (int retry = 0; retry < retryNum; retry++) {
+ leaderCounter.clear();
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(
+ regionInfo -> {
+ if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+ leaderCounter
+ .computeIfAbsent(regionInfo.getDataNodeId(), empty -> new AtomicInteger(0))
+ .getAndIncrement();
+ }
+ });
+
+ // All DataNodes have Region-leader
+ isDistributionBalanced = leaderCounter.size() == testDataNodeNum;
+ // Each DataNode has exactly 4 Region-leader
+ for (AtomicInteger leaderCount : leaderCounter.values()) {
+ if (leaderCount.get() != 2) {
+ isDistributionBalanced = false;
+ }
+ }
+
+ if (isDistributionBalanced) {
+ break;
+ } else {
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertTrue(isDistributionBalanced);
+
+ // Shutdown a DataNode
+ boolean isDataNodeShutdown = false;
+ EnvFactory.getEnv().shutdownDataNode(0);
+ for (int retry = 0; retry < retryNum; retry++) {
+ AtomicInteger runningCnt = new AtomicInteger(0);
+ AtomicInteger unknownCnt = new AtomicInteger(0);
+ TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+ showDataNodesResp
+ .getDataNodesInfoList()
+ .forEach(
+ dataNodeInfo -> {
+ if (NodeStatus.Running.getStatus().equals(dataNodeInfo.getStatus())) {
+ runningCnt.getAndIncrement();
+ } else if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
+ unknownCnt.getAndIncrement();
+ }
+ });
+ if (runningCnt.get() == testDataNodeNum - 1 && unknownCnt.get() == 1) {
+ isDataNodeShutdown = true;
+ break;
+ }
+
+ TimeUnit.SECONDS.sleep(1);
+ }
+ Assert.assertTrue(isDataNodeShutdown);
+
+ // Check leader distribution
+ isDistributionBalanced = false;
+ for (int retry = 0; retry < retryNum; retry++) {
+ leaderCounter.clear();
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(
+ regionInfo -> {
+ if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+ leaderCounter
+ .computeIfAbsent(regionInfo.getDataNodeId(), empty -> new AtomicInteger(0))
+ .getAndIncrement();
+ }
+ });
+
+ // Only Running DataNodes have Region-leader
+ isDistributionBalanced = leaderCounter.size() == testDataNodeNum - 1;
+ // Each Running DataNode has exactly 6 Region-leader
+ for (AtomicInteger leaderCount : leaderCounter.values()) {
+ if (leaderCount.get() != 3) {
+ isDistributionBalanced = false;
+ }
+ }
+
+ if (isDistributionBalanced) {
+ break;
+ } else {
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertTrue(isDistributionBalanced);
+ }
+ }
}
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 107bf6d807..6a894ffc8f 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -101,6 +101,10 @@
# Datatype: string
# routing_policy=leader
+# Whether enable ConfigNode-leader to balance RegionGroups' leader distribution automatically.
+# Datatype: boolean
+# enable_leader_balancing=false
+
####################
### Cluster management
####################