You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yo...@apache.org on 2024/03/08 03:44:50 UTC

(iotdb) branch region-multi-database updated: random leader balancer

This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch region-multi-database
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/region-multi-database by this push:
     new 5f12a0b3b45 random leader balancer
5f12a0b3b45 is described below

commit 5f12a0b3b452769aefef24af0c098f5244f765b7
Author: YongzaoDan <53...@qq.com>
AuthorDate: Fri Mar 8 11:44:34 2024 +0800

    random leader balancer
---
 .../confignode/conf/ConfigNodeDescriptor.java      |  1 +
 .../confignode/conf/ConfigNodeStartupCheck.java    |  1 +
 .../manager/load/balancer/RegionBalancer.java      |  1 +
 .../manager/load/balancer/RouteBalancer.java       |  4 +
 .../router/leader/GreedyLeaderBalancer.java        | 11 ++-
 .../balancer/router/leader/ILeaderBalancer.java    |  1 +
 .../router/leader/RandomLeaderBalancer.java        | 91 ++++++++++++++++++++++
 ...orAndLeaderBalancerCombinatorialManualTest.java |  8 +-
 .../resources/conf/iotdb-common.properties         | 19 ++---
 9 files changed, 120 insertions(+), 17 deletions(-)

diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 5a6f7ca7697..8d090f1b23a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -346,6 +346,7 @@ public class ConfigNodeDescriptor {
             .getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy())
             .trim();
     if (ILeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
+        || ILeaderBalancer.RANDOM_POLICY.equals(leaderDistributionPolicy)
         || ILeaderBalancer.MIN_COST_FLOW_POLICY.equals(leaderDistributionPolicy)) {
       conf.setLeaderDistributionPolicy(leaderDistributionPolicy);
     } else {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 000e08d348e..445d4d44df9 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -145,6 +145,7 @@ public class ConfigNodeStartupCheck extends StartupChecks {
 
     // The leader distribution policy is limited
     if (!ILeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
+        && !ILeaderBalancer.RANDOM_POLICY.equals(CONF.getLeaderDistributionPolicy())
         && !ILeaderBalancer.MIN_COST_FLOW_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
       throw new ConfigurationException(
           "leader_distribution_policy",
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index edb46050ea2..f31209c8503 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -153,6 +153,7 @@ public class RegionBalancer {
 
   public enum RegionGroupAllocatePolicy {
     GREEDY,
+    RANDOM,
     GREEDY_COPY_SET
   }
 }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 8acdc192a36..d887c164b9f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.RandomLeaderBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer;
@@ -96,6 +97,9 @@ public class RouteBalancer {
       case ILeaderBalancer.GREEDY_POLICY:
         this.leaderBalancer = new GreedyLeaderBalancer();
         break;
+      case ILeaderBalancer.RANDOM_POLICY:
+        this.leaderBalancer = new RandomLeaderBalancer();
+        break;
       case ILeaderBalancer.MIN_COST_FLOW_POLICY:
       default:
         this.leaderBalancer = new MinCostFlowLeaderBalancer();
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 5ad15c5a422..3f3a3701fe1 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -75,13 +75,17 @@ public class GreedyLeaderBalancer implements ILeaderBalancer {
   }
 
   private Map<TConsensusGroupId, Integer> constructGreedyDistribution() {
-    regionLeaderMap.clear();
     Map<Integer, Integer> leaderCounter = new TreeMap<>();
     regionReplicaSetMap.forEach(
         (regionGroupId, regionGroup) -> {
-          int minCount = Integer.MAX_VALUE, leaderId = -1;
+          int minCount = Integer.MAX_VALUE,
+              leaderId = regionLeaderMap.getOrDefault(regionGroupId, -1);
           for (TDataNodeLocation dataNodeLocation : regionGroup.getDataNodeLocations()) {
             int dataNodeId = dataNodeLocation.getDataNodeId();
+            if (disabledDataNodeSet.contains(dataNodeId)) {
+              continue;
+            }
+            // Select the DataNode with the minimal leader count as the new leader
             int count = leaderCounter.getOrDefault(dataNodeId, 0);
             if (count < minCount) {
               minCount = count;
@@ -91,6 +95,7 @@ public class GreedyLeaderBalancer implements ILeaderBalancer {
           regionLeaderMap.put(regionGroupId, leaderId);
           leaderCounter.merge(leaderId, 1, Integer::sum);
         });
+    return new ConcurrentHashMap<>(regionLeaderMap);
 
     //    /* Count the number of leaders that each DataNode have */
     //    // Map<DataNodeId, leader count>
@@ -172,8 +177,6 @@ public class GreedyLeaderBalancer implements ILeaderBalancer {
     //        regionLeaderMap.replace(regionGroupId, newLeaderId);
     //      }
     //    }
-
-    return new ConcurrentHashMap<>(regionLeaderMap);
   }
 
   private static class WeightEntry {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
index 8cd7fae7020..5b67854610b 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
@@ -28,6 +28,7 @@ import java.util.Set;
 public interface ILeaderBalancer {
 
   String GREEDY_POLICY = "GREEDY";
+  String RANDOM_POLICY = "RANDOM";
   String MIN_COST_FLOW_POLICY = "MIN_COST_FLOW";
 
   /**
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/RandomLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/RandomLeaderBalancer.java
new file mode 100644
index 00000000000..c1c0b8573e9
--- /dev/null
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/RandomLeaderBalancer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RandomLeaderBalancer implements ILeaderBalancer {
+
+  private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+  private final Map<TConsensusGroupId, Integer> regionLeaderMap;
+  private final Set<Integer> disabledDataNodeSet;
+
+  public RandomLeaderBalancer() {
+    this.regionReplicaSetMap = new HashMap<>();
+    this.regionLeaderMap = new ConcurrentHashMap<>();
+    this.disabledDataNodeSet = new HashSet<>();
+  }
+
+  @Override
+  public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+      Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap,
+      Set<Integer> disabledDataNodeSet) {
+
+    initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+
+    Map<TConsensusGroupId, Integer> result = constructRandomDistribution();
+
+    clear();
+    return result;
+  }
+
+  private void initialize(
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap,
+      Set<Integer> disabledDataNodeSet) {
+    this.regionReplicaSetMap.putAll(regionReplicaSetMap);
+    this.regionLeaderMap.putAll(regionLeaderMap);
+    this.disabledDataNodeSet.addAll(disabledDataNodeSet);
+  }
+
+  private void clear() {
+    this.regionReplicaSetMap.clear();
+    this.regionLeaderMap.clear();
+    this.disabledDataNodeSet.clear();
+  }
+
+  private Map<TConsensusGroupId, Integer> constructRandomDistribution() {
+    Random random = new Random();
+    regionReplicaSetMap.forEach(
+        (regionGroupId, regionGroup) -> {
+          int replicationFactor = regionGroup.getDataNodeLocations().size();
+          int leaderId =
+              regionGroup
+                  .getDataNodeLocations()
+                  .get(random.nextInt(replicationFactor))
+                  .getDataNodeId();
+          if (!disabledDataNodeSet.contains(leaderId)) {
+            regionLeaderMap.put(regionGroupId, leaderId);
+          }
+        });
+    return new ConcurrentHashMap<>(regionLeaderMap);
+  }
+}
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java
index 07fab2d10d6..593e204853e 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java
@@ -7,8 +7,8 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
+import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -30,8 +30,8 @@ public class RegionAllocatorAndLeaderBalancerCombinatorialManualTest {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(RegionAllocatorAndLeaderBalancerCombinatorialManualTest.class);
 
-  private static final int TEST_LOOP = 10;
-  private static final int TEST_DATA_NODE_NUM = 12;
+  private static final int TEST_LOOP = 100;
+  private static final int TEST_DATA_NODE_NUM = 100;
   private static final int DATA_REGION_PER_DATA_NODE = 4;
   private static final int DATA_REPLICATION_FACTOR = 3;
   private static final String DATABASE = "root.db";
@@ -43,7 +43,7 @@ public class RegionAllocatorAndLeaderBalancerCombinatorialManualTest {
   private static final IRegionGroupAllocator ALLOCATOR = new GreedyCopySetRegionGroupAllocator();
   //      new TieredReplicationAllocator(
   //          TEST_DATA_NODE_NUM, DATA_REPLICATION_FACTOR, DATA_REGION_PER_DATA_NODE);
-  private static final ILeaderBalancer BALANCER = new MinCostFlowLeaderBalancer();
+  private static final ILeaderBalancer BALANCER = new GreedyLeaderBalancer();
 
   @BeforeClass
   public static void setUp() {
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index eb959726c5c..71c0496cb41 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -39,7 +39,7 @@ cluster_name=defaultCluster
 # Default number of schema replicas
 # Can not be changed after the first start
 # Datatype: int
-schema_replication_factor=1
+schema_replication_factor=3
 
 # SchemaRegion consensus protocol type.
 # This parameter is unmodifiable after ConfigNode starts for the first time.
@@ -52,7 +52,7 @@ schema_replication_factor=1
 # Default number of data replicas
 # Can not be changed after the first start
 # Datatype: int
-data_replication_factor=1
+data_replication_factor=3
 
 # DataRegion consensus protocol type.
 # This parameter is unmodifiable after ConfigNode starts for the first time.
@@ -61,7 +61,7 @@ data_replication_factor=1
 # 2. org.apache.iotdb.consensus.iot.IoTConsensus
 # 3. org.apache.iotdb.consensus.ratis.RatisConsensus
 # Datatype: string
-# data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
+data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
 
 ####################
 ### Load balancing configuration
@@ -71,7 +71,7 @@ data_replication_factor=1
 # And these parameters should be consistent within the ConfigNodeGroup.
 # Number of SeriesPartitionSlots per Database
 # Datatype: Integer
-# series_slot_num=1000
+series_slot_num=1000
 
 # SeriesPartitionSlot executor class
 # These hashing algorithms are currently supported:
@@ -110,7 +110,7 @@ data_replication_factor=1
 # 1. CUSTOM(Each Database will allocate data_region_group_per_database DataRegionGroups as soon as created)
 # 2. AUTO(Each Database will automatically extend DataRegionGroups based on the data it has)
 # Datatype: String
-# data_region_group_extension_policy=AUTO
+data_region_group_extension_policy=AUTO
 
 # When set data_region_group_extension_policy=CUSTOM,
 # this parameter is the default number of DataRegionGroups for each Database.
@@ -124,16 +124,17 @@ data_replication_factor=1
 # Notice: Since each Database requires at least two DataRegionGroups to manage its data,
 # this parameter doesn't limit the upper bound of cluster DataRegions when there are too many Databases.
 # Datatype: Double
-# data_region_per_data_node=5.0
+data_region_per_data_node=4.0
 
 
 # The policy of cluster RegionGroups' leader distribution.
 # E.g. we should balance cluster RegionGroups' leader distribution when some DataNodes are shutdown or re-connected.
 # These policies are currently supported:
 # 1. GREEDY(Distribute leader through a simple greedy algorithm, might cause unbalance)
-# 2. MIN_COST_FLOW(Default, distribute leader through min cost flow algorithm)
+# 2. RANDOM(Distribute leader randomly, might cause unbalance)
+# 3. MIN_COST_FLOW(Default, distribute leader through min cost flow algorithm)
 # Datatype: String
-# leader_distribution_policy=MIN_COST_FLOW
+leader_distribution_policy=MIN_COST_FLOW
 
 # Whether to enable auto leader balance for Ratis consensus protocol.
 # The ConfigNode-leader will balance the leader of Ratis-RegionGroups by leader_distribution_policy if set true.
@@ -145,7 +146,7 @@ data_replication_factor=1
 # The ConfigNode-leader will balance the leader of IoTConsensus-RegionGroups by leader_distribution_policy if set true.
 # Notice: Default is true because the IoTConsensus depends on this function to distribute leader.
 # Datatype: Boolean
-# enable_auto_leader_balance_for_iot_consensus=true
+enable_auto_leader_balance_for_iot_consensus=true
 
 ####################
 ### Cluster management