You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/23 08:02:26 UTC
[iotdb] branch master updated: [IOTDB-4983] Greedy leader balancing policy (#8057)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 29b7f48f05 [IOTDB-4983] Greedy leader balancing policy (#8057)
29b7f48f05 is described below
commit 29b7f48f05ad67187189fef0012477a32e9657eb
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Wed Nov 23 16:02:20 2022 +0800
[IOTDB-4983] Greedy leader balancing policy (#8057)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 27 +-
.../confignode/conf/ConfigNodeDescriptor.java | 32 +-
.../confignode/conf/ConfigNodeStartupCheck.java | 18 +-
.../manager/load/balancer/RegionBalancer.java | 1 +
.../manager/load/balancer/RouteBalancer.java | 117 ++++---
.../router/leader/GreedyLeaderBalancer.java | 185 +++++++++++
.../{IRouter.java => leader/ILeaderBalancer.java} | 30 +-
.../MinCostFlowLeaderBalancer.java} | 100 ++++--
.../manager/load/balancer/router/mcf/MCFEdge.java | 34 --
.../GreedyPriorityBalancer.java} | 10 +-
.../IPriorityBalancer.java} | 24 +-
.../LeaderPriorityBalancer.java} | 10 +-
.../router/leader/GreedyLeaderBalancerTest.java | 146 ++++++++
.../leader/LeaderBalancerComparisonTest.java | 369 +++++++++++++++++++++
.../MinCostFlowLeaderBalancerTest.java} | 33 +-
.../GreedyPriorityTest.java} | 8 +-
.../LeaderPriorityBalancerTest.java} | 7 +-
.../java/org/apache/iotdb/it/env/AbstractEnv.java | 9 +
.../it/IoTDBClusterRegionLeaderBalancingIT.java | 26 +-
.../iotdb/confignode/it/IoTDBClusterRestartIT.java | 8 +-
.../resources/conf/iotdb-common.properties | 15 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 39 ++-
thrift/src/main/thrift/datanode.thrift | 6 +-
23 files changed, 1016 insertions(+), 238 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index dd98569d05..d76920f635 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -21,7 +21,8 @@ package org.apache.iotdb.confignode.conf;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.rpc.RpcUtils;
@@ -147,11 +148,11 @@ public class ConfigNodeConfig {
/** The unknown DataNode detect interval in milliseconds */
private long unknownDataNodeDetectInterval = heartbeatIntervalInMs;
- /** The routing policy of read/write requests */
- private String routingPolicy = RouteBalancer.LEADER_POLICY;
+ /** The policy of cluster RegionGroups' leader distribution */
+ private String leaderDistributionPolicy = ILeaderBalancer.MIN_COST_FLOW_POLICY;
- /** The ConfigNode-leader will automatically balance leader distribution if set true */
- private boolean enableLeaderBalancing = false;
+ /** The route priority policy of cluster read/write requests */
+ private String routePriorityPolicy = IPriorityBalancer.LEADER_POLICY;
private String readConsistencyLevel = "strong";
@@ -547,20 +548,20 @@ public class ConfigNodeConfig {
this.unknownDataNodeDetectInterval = unknownDataNodeDetectInterval;
}
- public String getRoutingPolicy() {
- return routingPolicy;
+ public String getLeaderDistributionPolicy() {
+ return leaderDistributionPolicy;
}
- public void setRoutingPolicy(String routingPolicy) {
- this.routingPolicy = routingPolicy;
+ public void setLeaderDistributionPolicy(String leaderDistributionPolicy) {
+ this.leaderDistributionPolicy = leaderDistributionPolicy;
}
- public boolean isEnableLeaderBalancing() {
- return enableLeaderBalancing;
+ public String getRoutePriorityPolicy() {
+ return routePriorityPolicy;
}
- public void setEnableLeaderBalancing(boolean enableLeaderBalancing) {
- this.enableLeaderBalancing = enableLeaderBalancing;
+ public void setRoutePriorityPolicy(String routePriorityPolicy) {
+ this.routePriorityPolicy = routePriorityPolicy;
}
public String getReadConsistencyLevel() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 44b58731a6..ee88d737c9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -24,7 +24,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.slf4j.Logger;
@@ -296,20 +297,31 @@ public class ConfigNodeDescriptor {
"heartbeat_interval_in_ms", String.valueOf(conf.getHeartbeatIntervalInMs()))
.trim()));
- String routingPolicy = properties.getProperty("routing_policy", conf.getRoutingPolicy()).trim();
- if (routingPolicy.equals(RouteBalancer.GREEDY_POLICY)
- || routingPolicy.equals(RouteBalancer.LEADER_POLICY)) {
- conf.setRoutingPolicy(routingPolicy);
+ String leaderDistributionPolicy =
+ properties
+ .getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy())
+ .trim();
+ if (ILeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
+ || ILeaderBalancer.MIN_COST_FLOW_POLICY.equals(leaderDistributionPolicy)) {
+ conf.setLeaderDistributionPolicy(leaderDistributionPolicy);
} else {
throw new IOException(
String.format(
- "Unknown routing_policy: %s, please set to \"leader\" or \"greedy\"", routingPolicy));
+ "Unknown leader_distribution_policy: %s, please set to \"GREEDY\" or \"MIN_COST_FLOW\"",
+ leaderDistributionPolicy));
}
- conf.setEnableLeaderBalancing(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_leader_balancing", String.valueOf(conf.isEnableLeaderBalancing()))));
+ String routePriorityPolicy =
+ properties.getProperty("route_priority_policy", conf.getRoutePriorityPolicy()).trim();
+ if (IPriorityBalancer.GREEDY_POLICY.equals(routePriorityPolicy)
+ || IPriorityBalancer.LEADER_POLICY.equals(routePriorityPolicy)) {
+ conf.setRoutePriorityPolicy(routePriorityPolicy);
+ } else {
+ throw new IOException(
+ String.format(
+ "Unknown route_priority_policy: %s, please set to \"LEADER\" or \"GREEDY\"",
+ routePriorityPolicy));
+ }
String readConsistencyLevel =
properties.getProperty("read_consistency_level", conf.getReadConsistencyLevel()).trim();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 9b91c9ca84..50dff4cebc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -21,7 +21,8 @@ package org.apache.iotdb.confignode.conf;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.slf4j.Logger;
@@ -92,11 +93,18 @@ public class ConfigNodeStartupCheck {
"%s or %s", ConsensusFactory.SIMPLE_CONSENSUS, ConsensusFactory.RATIS_CONSENSUS));
}
- // The routing policy is limited
- if (!CONF.getRoutingPolicy().equals(RouteBalancer.LEADER_POLICY)
- && !CONF.getRoutingPolicy().equals(RouteBalancer.GREEDY_POLICY)) {
+ // The leader distribution policy is limited
+ if (!ILeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
+ && !ILeaderBalancer.MIN_COST_FLOW_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
throw new ConfigurationException(
- "routing_policy", CONF.getRoutingPolicy(), "leader or greedy");
+ "leader_distribution_policy", CONF.getRoutePriorityPolicy(), "GREEDY or MIN_COST_FLOW");
+ }
+
+ // The route priority policy is limited
+ if (!CONF.getRoutePriorityPolicy().equals(IPriorityBalancer.LEADER_POLICY)
+ && !CONF.getRoutePriorityPolicy().equals(IPriorityBalancer.GREEDY_POLICY)) {
+ throw new ConfigurationException(
+ "route_priority_policy", CONF.getRoutePriorityPolicy(), "LEADER or GREEDY");
}
// The ip of target ConfigNode couldn't be 0.0.0.0
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 43dcf46038..e16541d6bc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -125,6 +125,7 @@ public class RegionBalancer {
switch (regionAllocateStrategy) {
case COPY_SET:
return new CopySetRegionAllocator();
+ case GREEDY:
default:
return new GreedyRegionAllocator();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 5ed4134658..6e0a7d95a3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -33,11 +33,13 @@ import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.load.balancer.router.IRouter;
-import org.apache.iotdb.confignode.manager.load.balancer.router.LeaderRouter;
-import org.apache.iotdb.confignode.manager.load.balancer.router.LoadScoreGreedyRouter;
import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
-import org.apache.iotdb.confignode.manager.load.balancer.router.mcf.MCFLeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -70,17 +72,12 @@ public class RouteBalancer {
private static final Logger LOGGER = LoggerFactory.getLogger(RouteBalancer.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
- private static final boolean ENABLE_LEADER_BALANCING = CONF.isEnableLeaderBalancing();
private static final String SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS =
CONF.getSchemaRegionConsensusProtocolClass();
private static final String DATA_REGION_CONSENSUS_PROTOCOL_CLASS =
CONF.getDataRegionConsensusProtocolClass();
private static final boolean isMultiLeader =
- ConsensusFactory.MULTI_LEADER_CONSENSUS.equals(
- ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
-
- public static final String LEADER_POLICY = "leader";
- public static final String GREEDY_POLICY = "greedy";
+ ConsensusFactory.MULTI_LEADER_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass());
private final IManager configManager;
@@ -92,8 +89,10 @@ public class RouteBalancer {
/** RegionRouteMap */
private final RegionRouteMap regionRouteMap;
- // For generating optimal RegionRouteMap
- private final IRouter router;
+ // For generating optimal RegionLeaderMap
+ private final ILeaderBalancer leaderBalancer;
+ // For generating optimal RegionPriorityMap
+ private final IPriorityBalancer priorityRouter;
/** Leader Balancing service */
private Future<?> currentLeaderBalancingFuture;
@@ -107,13 +106,24 @@ public class RouteBalancer {
this.leaderCache = new ConcurrentHashMap<>();
this.regionRouteMap = new RegionRouteMap();
- switch (ConfigNodeDescriptor.getInstance().getConf().getRoutingPolicy()) {
- case GREEDY_POLICY:
- this.router = new LoadScoreGreedyRouter();
+
+ switch (CONF.getLeaderDistributionPolicy()) {
+ case ILeaderBalancer.GREEDY_POLICY:
+ this.leaderBalancer = new GreedyLeaderBalancer();
+ break;
+ case ILeaderBalancer.MIN_COST_FLOW_POLICY:
+ default:
+ this.leaderBalancer = new MinCostFlowLeaderBalancer();
break;
- case LEADER_POLICY:
+ }
+
+ switch (CONF.getRoutePriorityPolicy()) {
+ case IPriorityBalancer.GREEDY_POLICY:
+ this.priorityRouter = new GreedyPriorityBalancer();
+ break;
+ case IPriorityBalancer.LEADER_POLICY:
default:
- this.router = new LeaderRouter();
+ this.priorityRouter = new LeaderPriorityBalancer();
break;
}
}
@@ -173,13 +183,13 @@ public class RouteBalancer {
// Balancing region priority in each SchemaRegionGroup
Map<TConsensusGroupId, TRegionReplicaSet> latestRegionPriorityMap =
- router.getLatestRegionRouteMap(
+ priorityRouter.generateOptimalRoutePriority(
getPartitionManager().getAllReplicaSets(TConsensusGroupType.SchemaRegion),
regionLeaderMap,
dataNodeLoadScoreMap);
// Balancing region priority in each DataRegionGroup
latestRegionPriorityMap.putAll(
- router.getLatestRegionRouteMap(
+ priorityRouter.generateOptimalRoutePriority(
getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion),
regionLeaderMap,
dataNodeLoadScoreMap));
@@ -230,34 +240,30 @@ public class RouteBalancer {
/** Start the route balancing service */
public void startRouteBalancingService() {
- if (ENABLE_LEADER_BALANCING) {
- synchronized (scheduleMonitor) {
- if (currentLeaderBalancingFuture == null) {
- currentLeaderBalancingFuture =
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- leaderBalancingExecutor,
- this::balancingRegionLeader,
- 0,
- // Execute route balancing service in every 5 loops of heartbeat service
- NodeManager.HEARTBEAT_INTERVAL * 5,
- TimeUnit.MILLISECONDS);
- LOGGER.info("Route-Balancing service is started successfully.");
- }
+ synchronized (scheduleMonitor) {
+ if (currentLeaderBalancingFuture == null) {
+ currentLeaderBalancingFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ leaderBalancingExecutor,
+ this::balancingRegionLeader,
+ 0,
+ // Execute route balancing service in every 5 loops of heartbeat service
+ NodeManager.HEARTBEAT_INTERVAL * 5,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Route-Balancing service is started successfully.");
}
}
}
/** Stop the route balancing service */
public void stopRouteBalancingService() {
- if (ENABLE_LEADER_BALANCING) {
- synchronized (scheduleMonitor) {
- if (currentLeaderBalancingFuture != null) {
- currentLeaderBalancingFuture.cancel(false);
- currentLeaderBalancingFuture = null;
- leaderCache.clear();
- regionRouteMap.clear();
- LOGGER.info("Route-Balancing service is stopped successfully.");
- }
+ synchronized (scheduleMonitor) {
+ if (currentLeaderBalancingFuture != null) {
+ currentLeaderBalancingFuture.cancel(false);
+ currentLeaderBalancingFuture = null;
+ leaderCache.clear();
+ regionRouteMap.clear();
+ LOGGER.info("Route-Balancing service is stopped successfully.");
}
}
}
@@ -268,9 +274,9 @@ public class RouteBalancer {
}
private void balancingRegionLeader(TConsensusGroupType regionGroupType) {
- // Collect latest data to generate leaderBalancer
- MCFLeaderBalancer leaderBalancer =
- new MCFLeaderBalancer(
+ // Collect the latest data and generate the optimal leader distribution
+ Map<TConsensusGroupId, Integer> leaderDistribution =
+ leaderBalancer.generateOptimalLeaderDistribution(
getPartitionManager().getAllReplicaSetsMap(regionGroupType),
regionRouteMap.getRegionLeaderMap(),
getNodeManager()
@@ -281,10 +287,6 @@ public class RouteBalancer {
.map(TDataNodeLocation::getDataNodeId)
.collect(Collectors.toSet()));
- // Calculate the optimal leader distribution
- Map<TConsensusGroupId, Integer> leaderDistribution =
- leaderBalancer.generateOptimalLeaderDistribution();
-
// Transfer leader to the optimal distribution
AtomicInteger requestId = new AtomicInteger(0);
AsyncClientHandler<TRegionLeaderChangeReq, TSStatus> clientHandler =
@@ -307,9 +309,14 @@ public class RouteBalancer {
regionGroupId,
newLeaderId);
changeRegionLeader(
- consensusProtocolClass, requestId, clientHandler, regionGroupId, newLeaderId);
+ consensusProtocolClass,
+ requestId,
+ clientHandler,
+ regionGroupId,
+ getNodeManager().getRegisteredDataNode(newLeaderId).getLocation());
}
});
+
if (requestId.get() > 0) {
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
}
@@ -325,12 +332,12 @@ public class RouteBalancer {
AtomicInteger requestId,
AsyncClientHandler<TRegionLeaderChangeReq, TSStatus> clientHandler,
TConsensusGroupId regionGroupId,
- int newLeaderId) {
+ TDataNodeLocation newLeader) {
switch (consensusProtocolClass) {
case ConsensusFactory.MULTI_LEADER_CONSENSUS:
// For multi-leader protocol, change RegionRouteMap is enough.
// And the result will be broadcast by Cluster-LoadStatistics-Service soon.
- regionRouteMap.setLeader(regionGroupId, newLeaderId);
+ regionRouteMap.setLeader(regionGroupId, newLeader.getDataNodeId());
break;
case ConsensusFactory.RATIS_CONSENSUS:
default:
@@ -339,9 +346,11 @@ public class RouteBalancer {
// And the RegionRouteMap will be updated by Cluster-Heartbeat-Service later if change
// leader success.
TRegionLeaderChangeReq regionLeaderChangeReq =
- new TRegionLeaderChangeReq(
- regionGroupId, getNodeManager().getRegisteredDataNode(newLeaderId).getLocation());
- clientHandler.putRequest(requestId.getAndIncrement(), regionLeaderChangeReq);
+ new TRegionLeaderChangeReq(regionGroupId, newLeader);
+ int requestIndex = requestId.getAndIncrement();
+ clientHandler.putRequest(requestIndex, regionLeaderChangeReq);
+ clientHandler.putDataNodeLocation(requestIndex, newLeader);
+ break;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
new file mode 100644
index 0000000000..21c4356bac
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class GreedyLeaderBalancer implements ILeaderBalancer {
+
+ private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+ private final Map<TConsensusGroupId, Integer> regionLeaderMap;
+ private final Set<Integer> disabledDataNodeSet;
+
+ public GreedyLeaderBalancer() {
+ this.regionReplicaSetMap = new HashMap<>();
+ this.regionLeaderMap = new ConcurrentHashMap<>();
+ this.disabledDataNodeSet = new HashSet<>();
+ }
+
+ @Override
+ public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Set<Integer> disabledDataNodeSet) {
+ initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+
+ Map<TConsensusGroupId, Integer> result = constructGreedyDistribution();
+
+ clear();
+ return result;
+ }
+
+ private void initialize(
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Set<Integer> disabledDataNodeSet) {
+ this.regionReplicaSetMap.putAll(regionReplicaSetMap);
+ this.regionLeaderMap.putAll(regionLeaderMap);
+ this.disabledDataNodeSet.addAll(disabledDataNodeSet);
+ }
+
+ private void clear() {
+ this.regionReplicaSetMap.clear();
+ this.regionLeaderMap.clear();
+ this.disabledDataNodeSet.clear();
+ }
+
+ private Map<TConsensusGroupId, Integer> constructGreedyDistribution() {
+ /* Count the number of leaders that each DataNode have */
+ // Map<DataNodeId, leader count>
+ Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ regionReplicaSetMap.forEach(
+ (regionGroupId, regionReplicaSet) ->
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ leaderCounter.putIfAbsent(
+ dataNodeLocation.getDataNodeId(), new AtomicInteger(0))));
+ regionLeaderMap.forEach(
+ (regionGroupId, leaderId) -> leaderCounter.get(leaderId).getAndIncrement());
+
+ /* Ensure all RegionGroups' leader are not inside disabled DataNodes */
+ for (TConsensusGroupId regionGroupId : regionReplicaSetMap.keySet()) {
+ int leaderId = regionLeaderMap.get(regionGroupId);
+ if (disabledDataNodeSet.contains(leaderId)) {
+ int newLeaderId = -1;
+ int newLeaderWeight = Integer.MAX_VALUE;
+ for (TDataNodeLocation candidate :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int candidateId = candidate.getDataNodeId();
+ int candidateWeight = leaderCounter.get(candidateId).get();
+ // Select the available DataNode with the fewest leaders
+ if (!disabledDataNodeSet.contains(candidateId) && candidateWeight < newLeaderWeight) {
+ newLeaderId = candidateId;
+ newLeaderWeight = candidateWeight;
+ }
+ }
+
+ if (newLeaderId != -1) {
+ leaderCounter.get(leaderId).getAndDecrement();
+ leaderCounter.get(newLeaderId).getAndIncrement();
+ regionLeaderMap.replace(regionGroupId, newLeaderId);
+ }
+ }
+ }
+
+ /* Double keyword sort */
+ List<WeightEntry> weightList = new ArrayList<>();
+ for (TConsensusGroupId regionGroupId : regionReplicaSetMap.keySet()) {
+ int leaderId = regionLeaderMap.get(regionGroupId);
+ int leaderWeight = leaderCounter.get(regionLeaderMap.get(regionGroupId)).get();
+
+ int followerWeight = Integer.MAX_VALUE;
+ for (TDataNodeLocation follower :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int followerId = follower.getDataNodeId();
+ if (followerId != leaderId) {
+ followerWeight = Math.min(followerWeight, leaderCounter.get(followerId).get());
+ }
+ }
+
+ weightList.add(new WeightEntry(regionGroupId, leaderWeight, followerWeight));
+ }
+ weightList.sort(WeightEntry.COMPARATOR);
+
+ /* Greedy distribution */
+ for (WeightEntry weightEntry : weightList) {
+ TConsensusGroupId regionGroupId = weightEntry.regionGroupId;
+ int leaderId = regionLeaderMap.get(regionGroupId);
+ int leaderWeight = leaderCounter.get(regionLeaderMap.get(regionGroupId)).get();
+
+ int newLeaderId = -1;
+ int newLeaderWeight = Integer.MAX_VALUE;
+ for (TDataNodeLocation candidate :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int candidateId = candidate.getDataNodeId();
+ int candidateWeight = leaderCounter.get(candidateId).get();
+ if (!disabledDataNodeSet.contains(candidateId)
+ && candidateId != leaderId
+ && candidateWeight < newLeaderWeight) {
+ newLeaderId = candidateId;
+ newLeaderWeight = candidateWeight;
+ }
+ }
+
+ // Redistribution takes effect only when leaderWeight - newLeaderWeight > 1.
+ // i.e. Redistribution can reduce the range of the number of leaders that each DataNode owns.
+ if (leaderWeight - newLeaderWeight > 1) {
+ leaderCounter.get(leaderId).getAndDecrement();
+ leaderCounter.get(newLeaderId).getAndIncrement();
+ regionLeaderMap.replace(regionGroupId, newLeaderId);
+ }
+ }
+
+ return new ConcurrentHashMap<>(regionLeaderMap);
+ }
+
+ private static class WeightEntry {
+
+ private final TConsensusGroupId regionGroupId;
+ // The number of leaders owned by DataNode where the RegionGroup's leader resides
+ private final int firstKey;
+ // The minimum number of leaders owned by DataNode where the RegionGroup's followers reside
+ private final int secondKey;
+
+ private WeightEntry(TConsensusGroupId regionGroupId, int firstKey, int secondKey) {
+ this.regionGroupId = regionGroupId;
+ this.firstKey = firstKey;
+ this.secondKey = secondKey;
+ }
+
+ // Compare the first key by descending order and the second key by ascending order.
+ private static final Comparator<WeightEntry> COMPARATOR =
+ (o1, o2) ->
+ o1.firstKey == o2.firstKey ? o1.secondKey - o2.secondKey : o2.firstKey - o1.firstKey;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
similarity index 58%
copy from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
index b6e875f77c..a69ccc9491 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
@@ -16,30 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
-/**
- * The IRouter is a functional interface, which means a new functional class who implements the
- * IRouter must be created for generating the latest real-time routing policy.
- */
-public interface IRouter {
+public interface ILeaderBalancer {
+
+ String GREEDY_POLICY = "GREEDY";
+ String MIN_COST_FLOW_POLICY = "MIN_COST_FLOW";
/**
- * Generate an optimal real-time read/write requests routing policy.
+ * Generate an optimal leader distribution.
*
- * @param replicaSets All RegionReplicasEts currently owned by the cluster
- * @return Map<TConsensusGroupId, TRegionReplicaSet>, The routing policy of read/write requests
- * for each Region is based on the order in the TRegionReplicaSet. The replica with higher
- * sorting result have higher priority.
+ * @param regionReplicaSetMap All RegionGroups the cluster currently have
+ * @param regionLeaderMap The current leader of each RegionGroup
+ * @param disabledDataNodeSet The DataNodes that currently unable to work(can't place
+ * RegionGroup-leader)
+ * @return Map<TConsensusGroupId, Integer>, The optimal leader distribution
*/
- Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap(
- List<TRegionReplicaSet> replicaSets,
+ Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
- Map<Integer, Long> dataNodeLoadScoreMap);
+ Set<Integer> disabledDataNodeSet);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
similarity index 79%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index 5dbd292964..e07bd3e288 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -35,7 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/** Leader distribution balancer that uses minimum cost flow algorithm */
-public class MCFLeaderBalancer {
+public class MinCostFlowLeaderBalancer implements ILeaderBalancer {
private static final int INFINITY = Integer.MAX_VALUE;
@@ -63,7 +64,7 @@ public class MCFLeaderBalancer {
// Maximum index of graph edges
private int maxEdge = 0;
- private final List<MCFEdge> mcfEdges;
+ private final List<MinCostFlowEdge> minCostFlowEdges;
private int[] nodeHeadEdge;
private int[] nodeCurrentEdge;
@@ -73,28 +74,64 @@ public class MCFLeaderBalancer {
private int maximumFlow = 0;
private int minimumCost = 0;
- public MCFLeaderBalancer(
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
- Map<TConsensusGroupId, Integer> regionLeaderMap,
- Set<Integer> disabledDataNodeSet) {
- this.regionReplicaSetMap = regionReplicaSetMap;
- this.regionLeaderMap = regionLeaderMap;
- this.disabledDataNodeSet = disabledDataNodeSet;
-
+ public MinCostFlowLeaderBalancer() {
+ this.regionReplicaSetMap = new HashMap<>();
+ this.regionLeaderMap = new HashMap<>();
+ this.disabledDataNodeSet = new HashSet<>();
this.rNodeMap = new HashMap<>();
this.dNodeMap = new HashMap<>();
this.dNodeReflect = new HashMap<>();
-
- this.mcfEdges = new ArrayList<>();
+ this.minCostFlowEdges = new ArrayList<>();
}
- public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution() {
+ @Override
+ public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Set<Integer> disabledDataNodeSet) {
+
+ initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+
+ Map<TConsensusGroupId, Integer> result;
constructMCFGraph();
dinicAlgorithm();
- return collectLeaderDistribution();
+ result = collectLeaderDistribution();
+
+ clear();
+ return result;
+ }
+
+ private void initialize(
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Set<Integer> disabledDataNodeSet) {
+ this.regionReplicaSetMap.putAll(regionReplicaSetMap);
+ this.regionLeaderMap.putAll(regionLeaderMap);
+ this.disabledDataNodeSet.addAll(disabledDataNodeSet);
+ }
+
+ private void clear() {
+ this.regionReplicaSetMap.clear();
+ this.regionLeaderMap.clear();
+ this.disabledDataNodeSet.clear();
+ this.rNodeMap.clear();
+ this.dNodeMap.clear();
+ this.dNodeReflect.clear();
+ this.minCostFlowEdges.clear();
+
+ this.nodeHeadEdge = null;
+ this.nodeCurrentEdge = null;
+ this.isNodeVisited = null;
+ this.nodeMinimumCost = null;
+
+ this.maxNode = tNode + 1;
+ this.maxEdge = 0;
}
private void constructMCFGraph() {
+ this.maximumFlow = 0;
+ this.minimumCost = 0;
+
/* Indicate nodes in mcf */
for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
rNodeMap.put(regionReplicaSet.getRegionId(), maxNode++);
@@ -177,8 +214,8 @@ public class MCFLeaderBalancer {
}
private void addEdge(int fromNode, int destNode, int capacity, int cost) {
- MCFEdge edge = new MCFEdge(destNode, capacity, cost, nodeHeadEdge[fromNode]);
- mcfEdges.add(edge);
+ MinCostFlowEdge edge = new MinCostFlowEdge(destNode, capacity, cost, nodeHeadEdge[fromNode]);
+ minCostFlowEdges.add(edge);
nodeHeadEdge[fromNode] = maxEdge++;
}
@@ -203,8 +240,8 @@ public class MCFLeaderBalancer {
isNodeVisited[currentNode] = false;
for (int currentEdge = nodeHeadEdge[currentNode];
currentEdge >= 0;
- currentEdge = mcfEdges.get(currentEdge).nextEdge) {
- MCFEdge edge = mcfEdges.get(currentEdge);
+ currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
+ MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
if (edge.capacity > 0
&& nodeMinimumCost[currentNode] + edge.cost < nodeMinimumCost[edge.destNode]) {
nodeMinimumCost[edge.destNode] = nodeMinimumCost[currentNode] + edge.cost;
@@ -230,8 +267,8 @@ public class MCFLeaderBalancer {
isNodeVisited[currentNode] = true;
for (currentEdge = nodeCurrentEdge[currentNode];
currentEdge >= 0;
- currentEdge = mcfEdges.get(currentEdge).nextEdge) {
- MCFEdge edge = mcfEdges.get(currentEdge);
+ currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
+ MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
if (nodeMinimumCost[currentNode] + edge.cost == nodeMinimumCost[edge.destNode]
&& edge.capacity > 0
&& !isNodeVisited[edge.destNode]) {
@@ -241,7 +278,7 @@ public class MCFLeaderBalancer {
minimumCost += subOutputFlow * edge.cost;
edge.capacity -= subOutputFlow;
- mcfEdges.get(currentEdge ^ 1).capacity += subOutputFlow;
+ minCostFlowEdges.get(currentEdge ^ 1).capacity += subOutputFlow;
inputFlow -= subOutputFlow;
outputFlow += subOutputFlow;
@@ -278,8 +315,8 @@ public class MCFLeaderBalancer {
boolean matchLeader = false;
for (int currentEdge = nodeHeadEdge[rNode];
currentEdge >= 0;
- currentEdge = mcfEdges.get(currentEdge).nextEdge) {
- MCFEdge edge = mcfEdges.get(currentEdge);
+ currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
+ MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
if (edge.destNode != sNode && edge.capacity == 0) {
matchLeader = true;
result.put(regionGroupId, dNodeReflect.get(edge.destNode));
@@ -302,4 +339,19 @@ public class MCFLeaderBalancer {
public int getMinimumCost() {
return minimumCost;
}
+
+ private static class MinCostFlowEdge {
+
+ private final int destNode;
+ private int capacity;
+ private final int cost;
+ private final int nextEdge;
+
+ private MinCostFlowEdge(int destNode, int capacity, int cost, int nextEdge) {
+ this.destNode = destNode;
+ this.capacity = capacity;
+ this.cost = cost;
+ this.nextEdge = nextEdge;
+ }
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java
deleted file mode 100644
index d849c195e3..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFEdge.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
-
-public class MCFEdge {
-
- public int destNode;
- public int capacity;
- public int cost;
- public int nextEdge;
-
- public MCFEdge(int destNode, int capacity, int cost, int nextEdge) {
- this.destNode = destNode;
- this.capacity = capacity;
- this.cost = cost;
- this.nextEdge = nextEdge;
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
similarity index 91%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouter.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
index 5d5fda43b5..c2dd329b64 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -29,15 +29,15 @@ import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
-/** The LoadScoreGreedyRouter always pick the Replica with the lowest loadScore */
-public class LoadScoreGreedyRouter implements IRouter {
+/** The GreedyRouter always pick the Replica with the lowest loadScore */
+public class GreedyPriorityBalancer implements IPriorityBalancer {
- public LoadScoreGreedyRouter() {
+ public GreedyPriorityBalancer() {
// Empty constructor
}
@Override
- public Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap(
+ public Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
List<TRegionReplicaSet> replicaSets,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, Long> dataNodeLoadScoreMap) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/IPriorityBalancer.java
similarity index 64%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/IPriorityBalancer.java
index b6e875f77c..47d8650964 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/IPriorityBalancer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -24,21 +24,21 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import java.util.List;
import java.util.Map;
-/**
- * The IRouter is a functional interface, which means a new functional class who implements the
- * IRouter must be created for generating the latest real-time routing policy.
- */
-public interface IRouter {
+public interface IPriorityBalancer {
+
+ String LEADER_POLICY = "LEADER";
+ String GREEDY_POLICY = "GREEDY";
/**
- * Generate an optimal real-time read/write requests routing policy.
+ * Generate an optimal route priority.
*
- * @param replicaSets All RegionReplicasEts currently owned by the cluster
- * @return Map<TConsensusGroupId, TRegionReplicaSet>, The routing policy of read/write requests
- * for each Region is based on the order in the TRegionReplicaSet. The replica with higher
- * sorting result have higher priority.
+ * @param replicaSets All RegionGroups
+ * @param regionLeaderMap The current leader of each RegionGroup
+ * @param dataNodeLoadScoreMap The current load score of each DataNode
+ * @return Map<TConsensusGroupId, TRegionReplicaSet>, The optimal route priority for each
+ * RegionGroup. The replica with higher sorting result have higher priority.
*/
- Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap(
+ Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
List<TRegionReplicaSet> replicaSets,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, Long> dataNodeLoadScoreMap);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
similarity index 93%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
index 4ab5505f48..80c1d1aaf5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -29,15 +29,15 @@ import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
-/** The LeaderRouter always pick the leader Replica */
-public class LeaderRouter implements IRouter {
+/** The LeaderRouter always pick the leader Replica as first */
+public class LeaderPriorityBalancer implements IPriorityBalancer {
- public LeaderRouter() {
+ public LeaderPriorityBalancer() {
// Empty constructor
}
@Override
- public Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap(
+ public Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
List<TRegionReplicaSet> replicaSets,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, Long> dataNodeLoadScoreMap) {
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
new file mode 100644
index 0000000000..b676a62ecf
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class GreedyLeaderBalancerTest {
+
+ private static final GreedyLeaderBalancer BALANCER = new GreedyLeaderBalancer();
+
+ @Test
+ public void optimalLeaderDistributionTest() {
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ Set<Integer> disabledDataNodeSet = new HashSet<>();
+ Random random = new Random();
+
+ // Build 9 RegionGroups in DataNodes 0~2
+ for (int i = 0; i < 9; i++) {
+ TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+ TRegionReplicaSet regionReplicaSet =
+ new TRegionReplicaSet(
+ regionGroupId,
+ Arrays.asList(
+ new TDataNodeLocation().setDataNodeId(0),
+ new TDataNodeLocation().setDataNodeId(1),
+ new TDataNodeLocation().setDataNodeId(2)));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionLeaderMap.put(regionGroupId, random.nextInt(3));
+ }
+
+ // Build 9 RegionGroups in DataNodes 3~5
+ for (int i = 9; i < 18; i++) {
+ TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+ TRegionReplicaSet regionReplicaSet =
+ new TRegionReplicaSet(
+ regionGroupId,
+ Arrays.asList(
+ new TDataNodeLocation().setDataNodeId(3),
+ new TDataNodeLocation().setDataNodeId(4),
+ new TDataNodeLocation().setDataNodeId(5)));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionLeaderMap.put(regionGroupId, 3 + random.nextInt(3));
+ }
+
+ Map<TConsensusGroupId, Integer> leaderDistribution =
+ BALANCER.generateOptimalLeaderDistribution(
+ regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+ Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ leaderDistribution.forEach(
+ (regionGroupId, leaderId) ->
+ leaderCounter
+ .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
+ .getAndIncrement());
+
+ // Each DataNode has exactly 3 leaders
+ for (int i = 0; i < 6; i++) {
+ Assert.assertEquals(3, leaderCounter.get(i).get());
+ }
+ }
+
+ @Test
+ public void disableTest() {
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ Set<Integer> disabledDataNodeSet = new HashSet<>();
+
+ disabledDataNodeSet.add(1);
+ disabledDataNodeSet.add(4);
+
+ // Build 10 RegionGroup whose leaders are all 1(Disabled)
+ for (int i = 0; i < 10; i++) {
+ TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+ TRegionReplicaSet regionReplicaSet =
+ new TRegionReplicaSet(
+ regionGroupId,
+ Arrays.asList(
+ new TDataNodeLocation().setDataNodeId(0),
+ new TDataNodeLocation().setDataNodeId(1),
+ new TDataNodeLocation().setDataNodeId(2)));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionLeaderMap.put(regionGroupId, 1);
+ }
+
+ // Build 10 RegionGroup whose leaders are all 4(Disabled)
+ for (int i = 10; i < 20; i++) {
+ TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+ TRegionReplicaSet regionReplicaSet =
+ new TRegionReplicaSet(
+ regionGroupId,
+ Arrays.asList(
+ new TDataNodeLocation().setDataNodeId(3),
+ new TDataNodeLocation().setDataNodeId(4),
+ new TDataNodeLocation().setDataNodeId(5)));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionLeaderMap.put(regionGroupId, 4);
+ }
+
+ Map<TConsensusGroupId, Integer> leaderDistribution =
+ BALANCER.generateOptimalLeaderDistribution(
+ regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+ Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ leaderDistribution.forEach(
+ (regionGroupId, leaderId) ->
+ leaderCounter
+ .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
+ .getAndIncrement());
+
+ for (int i = 0; i < 6; i++) {
+ if (i != 1 && i != 4) {
+ Assert.assertEquals(5, leaderCounter.get(i).get());
+ }
+ }
+ }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
new file mode 100644
index 0000000000..9c7bda4287
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LeaderBalancerComparisonTest {
+
+ // Set this field to true, and you can see the readable test results in command line
+ private static final boolean isCommandLineMode = false;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(LeaderBalancerComparisonTest.class);
+ private static FileWriter WRITER;
+
+ private static final GreedyLeaderBalancer GREEDY_LEADER_BALANCER = new GreedyLeaderBalancer();
+ private static final MinCostFlowLeaderBalancer MIN_COST_FLOW_LEADER_BALANCER =
+ new MinCostFlowLeaderBalancer();
+
+ private static final Random RANDOM = new Random();
+ private static final int TEST_MAX_DATA_NODE_NUM = 100;
+ private static final int TEST_CPU_CORE_NUM = 16;
+ private static final int TEST_REPLICA_NUM = 3;
+ private static final double GREEDY_INIT_RATE = 0.9;
+ private static final double DISABLE_DATA_NODE_RATE = 0.05;
+
+ // Invoke this interface if you want to record the test result
+ public static void prepareWriter() throws IOException {
+ if (isCommandLineMode) {
+ WRITER = null;
+ } else {
+ WRITER = new FileWriter("./leaderBalancerTest.txt");
+ }
+ }
+
+ // Add @Test here to enable this test
+ public void leaderBalancerComparisonTest() throws IOException {
+ for (int dataNodeNum = 3; dataNodeNum <= TEST_MAX_DATA_NODE_NUM; dataNodeNum++) {
+ // Simulate each DataNode has 16 CPU cores
+ // and each RegionGroup has 3 replicas
+ int regionGroupNum = TEST_CPU_CORE_NUM * dataNodeNum / TEST_REPLICA_NUM;
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ generateTestData(dataNodeNum, regionGroupNum, regionReplicaSetMap, regionLeaderMap);
+
+ if (isCommandLineMode) {
+ LOGGER.info("============================");
+ LOGGER.info("DataNodeNum: {}, RegionGroupNum: {}", dataNodeNum, regionGroupNum);
+ }
+
+ // Basic test
+ Map<TConsensusGroupId, Integer> greedyLeaderDistribution = new ConcurrentHashMap<>();
+ Statistics greedyStatistics =
+ doBalancing(
+ dataNodeNum,
+ regionGroupNum,
+ GREEDY_LEADER_BALANCER,
+ regionReplicaSetMap,
+ regionLeaderMap,
+ new HashSet<>(),
+ greedyLeaderDistribution);
+ Map<TConsensusGroupId, Integer> mcfLeaderDistribution = new ConcurrentHashMap<>();
+ Statistics mcfStatistics =
+ doBalancing(
+ dataNodeNum,
+ regionGroupNum,
+ MIN_COST_FLOW_LEADER_BALANCER,
+ regionReplicaSetMap,
+ regionLeaderMap,
+ new HashSet<>(),
+ mcfLeaderDistribution);
+ if (isCommandLineMode) {
+ LOGGER.info("[Basic test]");
+ LOGGER.info("Greedy balancer: {}", greedyStatistics);
+ LOGGER.info("MinCostFlow balancer: {}", mcfStatistics);
+ } else {
+ greedyStatistics.toFile();
+ mcfStatistics.toFile();
+ }
+
+ // Disaster test
+ int disabledDataNodeNum = (int) Math.ceil(dataNodeNum * DISABLE_DATA_NODE_RATE);
+ HashSet<Integer> disabledDataNodeSet = new HashSet<>();
+ while (disabledDataNodeSet.size() < disabledDataNodeNum) {
+ int dataNodeId = RANDOM.nextInt(dataNodeNum);
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ continue;
+ }
+ disabledDataNodeSet.add(dataNodeId);
+ }
+ greedyStatistics =
+ doBalancing(
+ dataNodeNum,
+ regionGroupNum,
+ GREEDY_LEADER_BALANCER,
+ regionReplicaSetMap,
+ greedyLeaderDistribution,
+ disabledDataNodeSet,
+ greedyLeaderDistribution);
+ mcfStatistics =
+ doBalancing(
+ dataNodeNum,
+ regionGroupNum,
+ MIN_COST_FLOW_LEADER_BALANCER,
+ regionReplicaSetMap,
+ mcfLeaderDistribution,
+ disabledDataNodeSet,
+ mcfLeaderDistribution);
+ if (isCommandLineMode) {
+ LOGGER.info("[Disaster test]");
+ LOGGER.info("Greedy balancer: {}", greedyStatistics);
+ LOGGER.info("MinCostFlow balancer: {}", mcfStatistics);
+ } else {
+ greedyStatistics.toFile();
+ mcfStatistics.toFile();
+ }
+
+ // Recovery test
+ greedyStatistics =
+ doBalancing(
+ dataNodeNum,
+ regionGroupNum,
+ GREEDY_LEADER_BALANCER,
+ regionReplicaSetMap,
+ greedyLeaderDistribution,
+ new HashSet<>(),
+ greedyLeaderDistribution);
+ mcfStatistics =
+ doBalancing(
+ dataNodeNum,
+ regionGroupNum,
+ MIN_COST_FLOW_LEADER_BALANCER,
+ regionReplicaSetMap,
+ mcfLeaderDistribution,
+ new HashSet<>(),
+ mcfLeaderDistribution);
+ if (isCommandLineMode) {
+ LOGGER.info("[Recovery test]");
+ LOGGER.info("Greedy balancer: {}", greedyStatistics);
+ LOGGER.info("MinCostFlow balancer: {}", mcfStatistics);
+ } else {
+ greedyStatistics.toFile();
+ mcfStatistics.toFile();
+ }
+ }
+ }
+
+ private void generateTestData(
+ int dataNodeNum,
+ int regionGroupNum,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap) {
+
+ Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
+ Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ for (int i = 0; i < dataNodeNum; i++) {
+ regionCounter.put(i, new AtomicInteger(0));
+ leaderCounter.put(i, new AtomicInteger(0));
+ }
+
+ int greedyNum = (int) (GREEDY_INIT_RATE * regionGroupNum);
+ int randomNum = regionGroupNum - greedyNum;
+ for (int index = 0; index < regionGroupNum; index++) {
+ int leaderId = -1;
+ TConsensusGroupId regionGroupId =
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, index);
+ TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet().setRegionId(regionGroupId);
+
+ int seed = RANDOM.nextInt(greedyNum + randomNum);
+ if (seed < greedyNum) {
+ // Greedy pick RegionReplicas and leader
+ int leaderWeight = Integer.MAX_VALUE;
+ PriorityQueue<Pair<Integer, Integer>> dataNodePriorityQueue =
+ new PriorityQueue<>(Comparator.comparingInt(Pair::getRight));
+ regionCounter.forEach(
+ (dataNodeId, regionGroupCount) ->
+ dataNodePriorityQueue.offer(new Pair<>(dataNodeId, regionGroupCount.get())));
+ for (int i = 0; i < TEST_REPLICA_NUM; i++) {
+ int dataNodeId = Objects.requireNonNull(dataNodePriorityQueue.poll()).getLeft();
+ regionReplicaSet.addToDataNodeLocations(
+ new TDataNodeLocation().setDataNodeId(dataNodeId));
+ if (leaderCounter.get(dataNodeId).get() < leaderWeight) {
+ leaderWeight = leaderCounter.get(dataNodeId).get();
+ leaderId = dataNodeId;
+ }
+ }
+ greedyNum -= 1;
+ } else {
+ // Random pick RegionReplicas and leader
+ Set<Integer> randomSet = new HashSet<>();
+ while (randomSet.size() < TEST_REPLICA_NUM) {
+ int dataNodeId = RANDOM.nextInt(dataNodeNum);
+ if (randomSet.contains(dataNodeId)) {
+ continue;
+ }
+
+ randomSet.add(dataNodeId);
+ regionReplicaSet.addToDataNodeLocations(
+ new TDataNodeLocation().setDataNodeId(dataNodeId));
+ }
+ leaderId = new ArrayList<>(randomSet).get(RANDOM.nextInt(TEST_REPLICA_NUM));
+ randomNum -= 1;
+ }
+
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ regionCounter.get(dataNodeLocation.getDataNodeId()).getAndIncrement());
+ regionLeaderMap.put(regionGroupId, leaderId);
+ leaderCounter.get(leaderId).getAndIncrement();
+ }
+ }
+
+ private Statistics doBalancing(
+ int dataNodeNum,
+ int regionGroupNum,
+ ILeaderBalancer leaderBalancer,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Set<Integer> disabledDataNodeSet,
+ Map<TConsensusGroupId, Integer> stableLeaderDistribution) {
+
+ Statistics result = new Statistics();
+ result.rounds = -1;
+ Map<TConsensusGroupId, Integer> lastDistribution = new ConcurrentHashMap<>(regionLeaderMap);
+ for (int rounds = 0; rounds < 1000; rounds++) {
+ Map<TConsensusGroupId, Integer> currentDistribution =
+ leaderBalancer.generateOptimalLeaderDistribution(
+ regionReplicaSetMap, lastDistribution, disabledDataNodeSet);
+ if (currentDistribution.equals(lastDistribution)) {
+ // The leader distribution is stable
+ result.rounds = rounds;
+ break;
+ }
+
+ AtomicInteger switchTimes = new AtomicInteger();
+ lastDistribution
+ .keySet()
+ .forEach(
+ regionGroupId -> {
+ if (!Objects.equals(
+ lastDistribution.get(regionGroupId), currentDistribution.get(regionGroupId))) {
+ switchTimes.getAndIncrement();
+ }
+ });
+
+ result.switchTimes += switchTimes.get();
+ lastDistribution.clear();
+ lastDistribution.putAll(currentDistribution);
+ }
+
+ stableLeaderDistribution.clear();
+ stableLeaderDistribution.putAll(lastDistribution);
+
+ double sum = 0;
+ double avg = (double) (regionGroupNum) / (double) (dataNodeNum);
+ int minLeaderCount = Integer.MAX_VALUE;
+ int maxLeaderCount = Integer.MIN_VALUE;
+ Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ lastDistribution.forEach(
+ (regionGroupId, leaderId) ->
+ leaderCounter
+ .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
+ .getAndIncrement());
+ for (Map.Entry<Integer, AtomicInteger> entry : leaderCounter.entrySet()) {
+ int leaderCount = entry.getValue().get();
+ sum += Math.pow((double) leaderCount - avg, 2);
+ minLeaderCount = Math.min(minLeaderCount, leaderCount);
+ maxLeaderCount = Math.max(maxLeaderCount, leaderCount);
+ }
+ result.range = maxLeaderCount - minLeaderCount;
+ result.variance = sum / (double) (dataNodeNum);
+
+ return result;
+ }
+
+ private static class Statistics {
+
+ // The number of execution rounds that the output of balance algorithm is stable
+ private int rounds;
+ // The number of change leader until the output of balance algorithm is stable
+ private int switchTimes;
+ // The range of the number of cluster leaders
+ private int range;
+ // The variance of the number of cluster leaders
+ private double variance;
+
+ private Statistics() {
+ this.rounds = 0;
+ this.switchTimes = 0;
+ this.range = 0;
+ this.variance = 0;
+ }
+
+ private void toFile() throws IOException {
+ WRITER.write(
+ rounds + "," + switchTimes + "," + range + "," + String.format("%.6f", variance) + "\n");
+ WRITER.flush();
+ }
+
+ @Override
+ public String toString() {
+ return "Statistics{"
+ + "rounds="
+ + rounds
+ + ", switchTimes="
+ + switchTimes
+ + ", range="
+ + range
+ + ", variance="
+ + variance
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Statistics that = (Statistics) o;
+ return rounds == that.rounds
+ && switchTimes == that.switchTimes
+ && range == that.range
+ && Math.abs(variance - that.variance) <= 0.1;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rounds, switchTimes, range, variance);
+ }
+ }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
similarity index 88%
rename from confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
index 4ce2e1843a..f676956780 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/mcf/MCFLeaderBalancerTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.load.balancer.router.mcf;
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
@@ -38,7 +38,9 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-public class MCFLeaderBalancerTest {
+public class MinCostFlowLeaderBalancerTest {
+
+ private static final MinCostFlowLeaderBalancer BALANCER = new MinCostFlowLeaderBalancer();
/** This test shows a simple case that greedy algorithm might fail */
@Test
@@ -83,18 +85,17 @@ public class MCFLeaderBalancerTest {
disabledDataNodeSet.add(0);
// Do balancing
- MCFLeaderBalancer mcfLeaderBalancer =
- new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
Map<TConsensusGroupId, Integer> leaderDistribution =
- mcfLeaderBalancer.generateOptimalLeaderDistribution();
+ BALANCER.generateOptimalLeaderDistribution(
+ regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
// All RegionGroup got a leader
Assert.assertEquals(3, leaderDistribution.size());
// Each DataNode occurs exactly once
Assert.assertEquals(3, new HashSet<>(leaderDistribution.values()).size());
// MaxFlow is 3
- Assert.assertEquals(3, mcfLeaderBalancer.getMaximumFlow());
+ Assert.assertEquals(3, BALANCER.getMaximumFlow());
// MinimumCost is 3(switch leader cost) + 3(load cost, 1 for each DataNode)
- Assert.assertEquals(3 + 3, mcfLeaderBalancer.getMinimumCost());
+ Assert.assertEquals(3 + 3, BALANCER.getMinimumCost());
}
/** The leader will remain the same if all DataNodes are disabled */
@@ -119,10 +120,9 @@ public class MCFLeaderBalancerTest {
disabledDataNodeSet.add(2);
// Do balancing
- MCFLeaderBalancer mcfLeaderBalancer =
- new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
Map<TConsensusGroupId, Integer> leaderDistribution =
- mcfLeaderBalancer.generateOptimalLeaderDistribution();
+ BALANCER.generateOptimalLeaderDistribution(
+ regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
Assert.assertEquals(1, leaderDistribution.size());
Assert.assertEquals(1, new HashSet<>(leaderDistribution.values()).size());
// Leader remains the same
@@ -130,9 +130,9 @@ public class MCFLeaderBalancerTest {
regionLeaderMap.get(regionReplicaSet.getRegionId()),
leaderDistribution.get(regionReplicaSet.getRegionId()));
// MaxFlow is 0
- Assert.assertEquals(0, mcfLeaderBalancer.getMaximumFlow());
+ Assert.assertEquals(0, BALANCER.getMaximumFlow());
// MinimumCost is 0
- Assert.assertEquals(0, mcfLeaderBalancer.getMinimumCost());
+ Assert.assertEquals(0, BALANCER.getMinimumCost());
}
/**
@@ -171,10 +171,9 @@ public class MCFLeaderBalancerTest {
}
// Do balancing
- MCFLeaderBalancer mcfLeaderBalancer =
- new MCFLeaderBalancer(regionReplicaSetMap, regionLeaderMap, new HashSet<>());
Map<TConsensusGroupId, Integer> leaderDistribution =
- mcfLeaderBalancer.generateOptimalLeaderDistribution();
+ BALANCER.generateOptimalLeaderDistribution(
+ regionReplicaSetMap, regionLeaderMap, new HashSet<>());
// All RegionGroup got a leader
Assert.assertEquals(regionGroupNum, leaderDistribution.size());
@@ -194,9 +193,9 @@ public class MCFLeaderBalancerTest {
.forEach(leaderNum -> Assert.assertEquals(regionGroupNum / dataNodeNum, leaderNum.get()));
// MaxFlow is regionGroupNum
- Assert.assertEquals(regionGroupNum, mcfLeaderBalancer.getMaximumFlow());
+ Assert.assertEquals(regionGroupNum, BALANCER.getMaximumFlow());
- int minimumCost = mcfLeaderBalancer.getMinimumCost();
+ int minimumCost = BALANCER.getMinimumCost();
Assert.assertTrue(minimumCost >= loadCost * dataNodeNum);
// The number of RegionGroups who have switched leader
int switchCost = minimumCost - loadCost * dataNodeNum;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
similarity index 97%
rename from confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
index 042a80ce23..e54fab7cd3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
@@ -39,7 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class LoadScoreGreedyRouterTest {
+public class GreedyPriorityTest {
@Test
public void testGenLoadScoreGreedyRoutingPolicy() {
@@ -92,8 +92,8 @@ public class LoadScoreGreedyRouterTest {
/* Check result */
Map<TConsensusGroupId, TRegionReplicaSet> result =
- new LoadScoreGreedyRouter()
- .getLatestRegionRouteMap(
+ new GreedyPriorityBalancer()
+ .generateOptimalRoutePriority(
Arrays.asList(regionReplicaSet1, regionReplicaSet2), new HashMap<>(), loadScoreMap);
Assert.assertEquals(2, result.size());
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
similarity index 96%
rename from confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index b369fedc6b..51a475f7a8 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.load.balancer.router;
+package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
@@ -39,7 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class LeaderRouterTest {
+public class LeaderPriorityBalancerTest {
@Test
public void testGenRealTimeRoutingPolicy() {
@@ -100,7 +100,8 @@ public class LeaderRouterTest {
// Check result
Map<TConsensusGroupId, TRegionReplicaSet> result =
- new LeaderRouter().getLatestRegionRouteMap(regionReplicaSets, leaderMap, loadScoreMap);
+ new LeaderPriorityBalancer()
+ .generateOptimalRoutePriority(regionReplicaSets, leaderMap, loadScoreMap);
TRegionReplicaSet result1 = result.get(groupId1);
// Leader first
Assert.assertEquals(dataNodeLocations.get(1), result1.getDataNodeLocations().get(0));
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index 7603c8552c..aa06f14d94 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -87,6 +87,15 @@ public abstract class AbstractEnv implements BaseEnv {
String targetConfigNode = seedConfigNodeWrapper.getIpAndPortString();
this.configNodeWrapperList.add(seedConfigNodeWrapper);
+ // Check if the Seed-ConfigNode started successfully
+ try (SyncConfigNodeIServiceClient ignored =
+ (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
+ // Do nothing
+ logger.info("The Seed-ConfigNode started successfully!");
+ } catch (Exception e) {
+ logger.error("Failed to get connection to the Seed-ConfigNode", e);
+ }
+
List<String> configNodeEndpoints = new ArrayList<>();
RequestDelegate<Void> configNodesDelegate = new SerialRequestDelegate<>(configNodeEndpoints);
for (int i = 1; i < configNodesNum; i++) {
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
index a94727984f..ae463159f1 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
@@ -25,8 +25,11 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
@@ -180,16 +183,15 @@ public class IoTDBClusterRegionLeaderBalancingIT {
status = client.setStorageGroup(setReq);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- // TODO: Create a SchemaRegionGroup for each StorageGroup
- // TODO: (The Ratis protocol class is now hard to change leader)
- // TSchemaPartitionTableResp schemaPartitionTableResp =
- // client.getOrCreateSchemaPartitionTable(
- // new TSchemaPartitionReq(
- // ConfigNodeTestUtils.generatePatternTreeBuffer(
- // new String[] {sg + i + "." + "d"})));
- // Assert.assertEquals(
- // TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- // schemaPartitionTableResp.getStatus().getCode());
+ // Create a SchemaRegionGroup for each StorageGroup
+ TSchemaPartitionTableResp schemaPartitionTableResp =
+ client.getOrCreateSchemaPartitionTable(
+ new TSchemaPartitionReq(
+ ConfigNodeTestUtils.generatePatternTreeBuffer(
+ new String[] {sg + i + "." + "d"})));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
// Create a DataRegionGroup for each StorageGroup
Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesSlotMap = new HashMap<>();
@@ -227,7 +229,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
isDistributionBalanced = leaderCounter.size() == testDataNodeNum;
// Each DataNode has exactly 4 Region-leader
for (AtomicInteger leaderCount : leaderCounter.values()) {
- if (leaderCount.get() != 2) {
+ if (leaderCount.get() != 4) {
isDistributionBalanced = false;
}
}
@@ -286,7 +288,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
isDistributionBalanced = leaderCounter.size() == testDataNodeNum - 1;
// Each Running DataNode has exactly 6 Region-leader
for (AtomicInteger leaderCount : leaderCounter.values()) {
- if (leaderCount.get() != 3) {
+ if (leaderCount.get() != 6) {
isDistributionBalanced = false;
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
index 2510e81e2a..e34e142815 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
@@ -69,9 +69,9 @@ public class IoTDBClusterRestartIT {
private static final String ratisConsensusProtocolClass =
"org.apache.iotdb.consensus.ratis.RatisConsensus";
- private static final int testConfigNodeNum = 3;
- private static final int testDataNodeNum = 3;
- private static final int testReplicationFactor = 3;
+ private static final int testConfigNodeNum = 2;
+ private static final int testDataNodeNum = 2;
+ private static final int testReplicationFactor = 2;
private static final long testTimePartitionInterval = 604800000;
protected static String originalConfigNodeConsensusProtocolClass;
protected static String originalSchemaRegionConsensusProtocolClass;
@@ -100,7 +100,7 @@ public class IoTDBClusterRestartIT {
originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
- // Init 3C3D cluster environment
+ // Init 2C2D cluster environment
EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
}
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index bb0b0a63e7..27f5958ff0 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -100,16 +100,13 @@
# Datatype: boolean
# enable_data_partition_inherit_policy=false
-# The routing policy of read/write requests
-# These routing policy are currently supported:
-# 1. leader(Default, routing to leader replica)
-# 2. greedy(Routing to replica with the lowest load, might cause read un-consistent)
+# The policy of cluster RegionGroups' leader distribution.
+# E.g. we should balance cluster RegionGroups' leader distribution when some DataNodes are shutdown or re-connected.
+# These policies are currently supported:
+# 1. GREEDY(Distribute leader through a simple greedy algorithm, might cause unbalance)
+# 2. MIN_COST_FLOW(Default, distribute leader through min cost flow algorithm)
# Datatype: string
-# routing_policy=leader
-
-# Whether enable ConfigNode-leader to balance RegionGroups' leader distribution automatically.
-# Datatype: boolean
-# enable_leader_balancing=false
+# leader_distribution_policy=MIN_COST_FLOW
####################
### Cluster management
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 7ca7555a8f..ba39d0fcea 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1167,17 +1167,30 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
}
- public TSStatus changeRegionLeader(TRegionLeaderChangeReq req) throws TException {
+ @Override
+ public TSStatus changeRegionLeader(TRegionLeaderChangeReq req) {
+ LOGGER.info("[ChangeRegionLeader] {}", req);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
TConsensusGroupId tgId = req.getRegionId();
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId);
TEndPoint newNode = getConsensusEndPoint(req.getNewLeaderNode(), regionId);
Peer newLeaderPeer = new Peer(regionId, req.getNewLeaderNode().getDataNodeId(), newNode);
- if (!isLeader(regionId)) {
- LOGGER.info("region {} is not leader, no need to change leader", regionId);
- return status;
- }
- LOGGER.info("region {} is leader, will change leader", regionId);
+
+ if (isLeader(regionId)) {
+ String msg =
+ "[ChangeRegionLeader] The current DataNode: "
+ + req.getNewLeaderNode().getDataNodeId()
+ + " is already the leader of RegionGroup: "
+ + regionId
+ + ", skip leader transfer.";
+ LOGGER.info(msg);
+ return status.setMessage(msg);
+ }
+
+ LOGGER.info(
+ "[ChangeRegionLeader] Start change the leader of RegionGroup: {} to DataNode: {}",
+ regionId,
+ req.getNewLeaderNode().getDataNodeId());
return transferLeader(regionId, newLeaderPeer);
}
@@ -1190,16 +1203,24 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
resp = SchemaRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
} else {
status.setCode(TSStatusCode.REGION_LEADER_CHANGE_ERROR.getStatusCode());
- status.setMessage("Error Region type. region: " + regionId);
+ status.setMessage("[ChangeRegionLeader] Error Region type: " + regionId);
return status;
}
+
if (!resp.isSuccess()) {
- LOGGER.error("change region {} leader failed", regionId, resp.getException());
+ LOGGER.error(
+ "[ChangeRegionLeader] Failed to change the leader of RegionGroup: {}",
+ regionId,
+ resp.getException());
status.setCode(TSStatusCode.REGION_LEADER_CHANGE_ERROR.getStatusCode());
status.setMessage(resp.getException().getMessage());
return status;
}
- status.setMessage("change region " + regionId + " leader succeed");
+ status.setMessage(
+ "[ChangeRegionLeader] Successfully change the leader of RegionGroup: "
+ + regionId
+ + " to "
+ + newLeaderPeer.getNodeId());
return status;
}
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index ec567aab99..eb6abc0364 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -415,9 +415,9 @@ service IDataNodeRPCService {
common.TSStatus deleteRegion(common.TConsensusGroupId consensusGroupId)
/**
- * Config node will change a region leader to other data node int same consensus group
- * if the region is not leader on the node, will do nothing
- * @param change a region leader to which node
+ * Change the leader of specified RegionGroup to another DataNode
+ *
+ * @param The specified RegionGroup and the new leader DataNode
*/
common.TSStatus changeRegionLeader(TRegionLeaderChangeReq req);