You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yo...@apache.org on 2024/03/08 03:44:50 UTC
(iotdb) branch region-multi-database updated: random leader balancer
This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch region-multi-database
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/region-multi-database by this push:
new 5f12a0b3b45 random leader balancer
5f12a0b3b45 is described below
commit 5f12a0b3b452769aefef24af0c098f5244f765b7
Author: YongzaoDan <53...@qq.com>
AuthorDate: Fri Mar 8 11:44:34 2024 +0800
random leader balancer
---
.../confignode/conf/ConfigNodeDescriptor.java | 1 +
.../confignode/conf/ConfigNodeStartupCheck.java | 1 +
.../manager/load/balancer/RegionBalancer.java | 1 +
.../manager/load/balancer/RouteBalancer.java | 4 +
.../router/leader/GreedyLeaderBalancer.java | 11 ++-
.../balancer/router/leader/ILeaderBalancer.java | 1 +
.../router/leader/RandomLeaderBalancer.java | 91 ++++++++++++++++++++++
...orAndLeaderBalancerCombinatorialManualTest.java | 8 +-
.../resources/conf/iotdb-common.properties | 19 ++---
9 files changed, 120 insertions(+), 17 deletions(-)
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 5a6f7ca7697..8d090f1b23a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -346,6 +346,7 @@ public class ConfigNodeDescriptor {
.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy())
.trim();
if (ILeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
+ || ILeaderBalancer.RANDOM_POLICY.equals(leaderDistributionPolicy)
|| ILeaderBalancer.MIN_COST_FLOW_POLICY.equals(leaderDistributionPolicy)) {
conf.setLeaderDistributionPolicy(leaderDistributionPolicy);
} else {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 000e08d348e..445d4d44df9 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -145,6 +145,7 @@ public class ConfigNodeStartupCheck extends StartupChecks {
// The leader distribution policy is limited
if (!ILeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
+ && !ILeaderBalancer.RANDOM_POLICY.equals(CONF.getLeaderDistributionPolicy())
&& !ILeaderBalancer.MIN_COST_FLOW_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
throw new ConfigurationException(
"leader_distribution_policy",
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index edb46050ea2..f31209c8503 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -153,6 +153,7 @@ public class RegionBalancer {
public enum RegionGroupAllocatePolicy {
GREEDY,
+ RANDOM,
GREEDY_COPY_SET
}
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 8acdc192a36..d887c164b9f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.RandomLeaderBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer;
@@ -96,6 +97,9 @@ public class RouteBalancer {
case ILeaderBalancer.GREEDY_POLICY:
this.leaderBalancer = new GreedyLeaderBalancer();
break;
+ case ILeaderBalancer.RANDOM_POLICY:
+ this.leaderBalancer = new RandomLeaderBalancer();
+ break;
case ILeaderBalancer.MIN_COST_FLOW_POLICY:
default:
this.leaderBalancer = new MinCostFlowLeaderBalancer();
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 5ad15c5a422..3f3a3701fe1 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -75,13 +75,17 @@ public class GreedyLeaderBalancer implements ILeaderBalancer {
}
private Map<TConsensusGroupId, Integer> constructGreedyDistribution() {
- regionLeaderMap.clear();
Map<Integer, Integer> leaderCounter = new TreeMap<>();
regionReplicaSetMap.forEach(
(regionGroupId, regionGroup) -> {
- int minCount = Integer.MAX_VALUE, leaderId = -1;
+ int minCount = Integer.MAX_VALUE,
+ leaderId = regionLeaderMap.getOrDefault(regionGroupId, -1);
for (TDataNodeLocation dataNodeLocation : regionGroup.getDataNodeLocations()) {
int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ continue;
+ }
+ // Select the DataNode with the minimal leader count as the new leader
int count = leaderCounter.getOrDefault(dataNodeId, 0);
if (count < minCount) {
minCount = count;
@@ -91,6 +95,7 @@ public class GreedyLeaderBalancer implements ILeaderBalancer {
regionLeaderMap.put(regionGroupId, leaderId);
leaderCounter.merge(leaderId, 1, Integer::sum);
});
+ return new ConcurrentHashMap<>(regionLeaderMap);
// /* Count the number of leaders that each DataNode have */
// // Map<DataNodeId, leader count>
@@ -172,8 +177,6 @@ public class GreedyLeaderBalancer implements ILeaderBalancer {
// regionLeaderMap.replace(regionGroupId, newLeaderId);
// }
// }
-
- return new ConcurrentHashMap<>(regionLeaderMap);
}
private static class WeightEntry {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
index 8cd7fae7020..5b67854610b 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
@@ -28,6 +28,7 @@ import java.util.Set;
public interface ILeaderBalancer {
String GREEDY_POLICY = "GREEDY";
+ String RANDOM_POLICY = "RANDOM";
String MIN_COST_FLOW_POLICY = "MIN_COST_FLOW";
/**
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/RandomLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/RandomLeaderBalancer.java
new file mode 100644
index 00000000000..c1c0b8573e9
--- /dev/null
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/RandomLeaderBalancer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RandomLeaderBalancer implements ILeaderBalancer {
+
+ private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+ private final Map<TConsensusGroupId, Integer> regionLeaderMap;
+ private final Set<Integer> disabledDataNodeSet;
+
+ public RandomLeaderBalancer() {
+ this.regionReplicaSetMap = new HashMap<>();
+ this.regionLeaderMap = new ConcurrentHashMap<>();
+ this.disabledDataNodeSet = new HashSet<>();
+ }
+
+ @Override
+ public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Set<Integer> disabledDataNodeSet) {
+
+ initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+
+ Map<TConsensusGroupId, Integer> result = constructRandomDistribution();
+
+ clear();
+ return result;
+ }
+
+ private void initialize(
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Set<Integer> disabledDataNodeSet) {
+ this.regionReplicaSetMap.putAll(regionReplicaSetMap);
+ this.regionLeaderMap.putAll(regionLeaderMap);
+ this.disabledDataNodeSet.addAll(disabledDataNodeSet);
+ }
+
+ private void clear() {
+ this.regionReplicaSetMap.clear();
+ this.regionLeaderMap.clear();
+ this.disabledDataNodeSet.clear();
+ }
+
+ private Map<TConsensusGroupId, Integer> constructRandomDistribution() {
+ Random random = new Random();
+ regionReplicaSetMap.forEach(
+ (regionGroupId, regionGroup) -> {
+ int replicationFactor = regionGroup.getDataNodeLocations().size();
+ int leaderId =
+ regionGroup
+ .getDataNodeLocations()
+ .get(random.nextInt(replicationFactor))
+ .getDataNodeId();
+ if (!disabledDataNodeSet.contains(leaderId)) {
+ regionLeaderMap.put(regionGroupId, leaderId);
+ }
+ });
+ return new ConcurrentHashMap<>(regionLeaderMap);
+ }
+}
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java
index 07fab2d10d6..593e204853e 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java
@@ -7,8 +7,8 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -30,8 +30,8 @@ public class RegionAllocatorAndLeaderBalancerCombinatorialManualTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(RegionAllocatorAndLeaderBalancerCombinatorialManualTest.class);
- private static final int TEST_LOOP = 10;
- private static final int TEST_DATA_NODE_NUM = 12;
+ private static final int TEST_LOOP = 100;
+ private static final int TEST_DATA_NODE_NUM = 100;
private static final int DATA_REGION_PER_DATA_NODE = 4;
private static final int DATA_REPLICATION_FACTOR = 3;
private static final String DATABASE = "root.db";
@@ -43,7 +43,7 @@ public class RegionAllocatorAndLeaderBalancerCombinatorialManualTest {
private static final IRegionGroupAllocator ALLOCATOR = new GreedyCopySetRegionGroupAllocator();
// new TieredReplicationAllocator(
// TEST_DATA_NODE_NUM, DATA_REPLICATION_FACTOR, DATA_REGION_PER_DATA_NODE);
- private static final ILeaderBalancer BALANCER = new MinCostFlowLeaderBalancer();
+ private static final ILeaderBalancer BALANCER = new GreedyLeaderBalancer();
@BeforeClass
public static void setUp() {
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index eb959726c5c..71c0496cb41 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -39,7 +39,7 @@ cluster_name=defaultCluster
# Default number of schema replicas
# Can not be changed after the first start
# Datatype: int
-schema_replication_factor=1
+schema_replication_factor=3
# SchemaRegion consensus protocol type.
# This parameter is unmodifiable after ConfigNode starts for the first time.
@@ -52,7 +52,7 @@ schema_replication_factor=1
# Default number of data replicas
# Can not be changed after the first start
# Datatype: int
-data_replication_factor=1
+data_replication_factor=3
# DataRegion consensus protocol type.
# This parameter is unmodifiable after ConfigNode starts for the first time.
@@ -61,7 +61,7 @@ data_replication_factor=1
# 2. org.apache.iotdb.consensus.iot.IoTConsensus
# 3. org.apache.iotdb.consensus.ratis.RatisConsensus
# Datatype: string
-# data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
+data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
####################
### Load balancing configuration
@@ -71,7 +71,7 @@ data_replication_factor=1
# And these parameters should be consistent within the ConfigNodeGroup.
# Number of SeriesPartitionSlots per Database
# Datatype: Integer
-# series_slot_num=1000
+series_slot_num=1000
# SeriesPartitionSlot executor class
# These hashing algorithms are currently supported:
@@ -110,7 +110,7 @@ data_replication_factor=1
# 1. CUSTOM(Each Database will allocate data_region_group_per_database DataRegionGroups as soon as created)
# 2. AUTO(Each Database will automatically extend DataRegionGroups based on the data it has)
# Datatype: String
-# data_region_group_extension_policy=AUTO
+data_region_group_extension_policy=AUTO
# When set data_region_group_extension_policy=CUSTOM,
# this parameter is the default number of DataRegionGroups for each Database.
@@ -124,16 +124,17 @@ data_replication_factor=1
# Notice: Since each Database requires at least two DataRegionGroups to manage its data,
# this parameter doesn't limit the upper bound of cluster DataRegions when there are too many Databases.
# Datatype: Double
-# data_region_per_data_node=5.0
+data_region_per_data_node=4.0
# The policy of cluster RegionGroups' leader distribution.
# E.g. we should balance cluster RegionGroups' leader distribution when some DataNodes are shutdown or re-connected.
# These policies are currently supported:
# 1. GREEDY(Distribute leader through a simple greedy algorithm, might cause unbalance)
-# 2. MIN_COST_FLOW(Default, distribute leader through min cost flow algorithm)
+# 2. RANDOM(Distribute leader randomly, might cause unbalance)
+# 3. MIN_COST_FLOW(Default, distribute leader through min cost flow algorithm)
# Datatype: String
-# leader_distribution_policy=MIN_COST_FLOW
+leader_distribution_policy=MIN_COST_FLOW
# Whether to enable auto leader balance for Ratis consensus protocol.
# The ConfigNode-leader will balance the leader of Ratis-RegionGroups by leader_distribution_policy if set true.
@@ -145,7 +146,7 @@ data_replication_factor=1
# The ConfigNode-leader will balance the leader of IoTConsensus-RegionGroups by leader_distribution_policy if set true.
# Notice: Default is true because the IoTConsensus depends on this function to distribute leader.
# Datatype: Boolean
-# enable_auto_leader_balance_for_iot_consensus=true
+enable_auto_leader_balance_for_iot_consensus=true
####################
### Cluster management