You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/07/12 03:27:11 UTC

[iotdb] branch master updated: [IOTDB-3728] Greedy Region Allocator (#6611)

This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b81a2b41e [IOTDB-3728] Greedy Region Allocator (#6611)
1b81a2b41e is described below

commit 1b81a2b41e6b3bf15829de2d6fb7fe27fd7b248d
Author: imquanke <39...@users.noreply.github.com>
AuthorDate: Tue Jul 12 11:27:07 2022 +0800

    [IOTDB-3728] Greedy Region Allocator (#6611)
    
    [IOTDB-3728] Greedy Region Allocator (#6611)
---
 .../resources/conf/iotdb-confignode.properties     |  7 ++
 .../iotdb/confignode/conf/ConfigNodeConfig.java    | 14 ++++
 .../confignode/conf/ConfigNodeDescriptor.java      | 11 +++
 .../manager/load/balancer/RegionBalancer.java      | 25 +++++-
 .../balancer/region/CopySetRegionAllocator.java    |  2 +-
 .../balancer/region/GreedyRegionAllocator.java     | 68 +++++++++++++++++
 .../balancer/region/GreedyRegionAllocatorTest.java | 88 ++++++++++++++++++++++
 .../Reference/ConfigNode-Config-Manual.md          | 10 +++
 .../Reference/ConfigNode-Config-Manual.md          |  9 +++
 9 files changed, 231 insertions(+), 3 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 0c343e7a15..4ab901bf82 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -81,6 +81,13 @@ target_config_nodes=0.0.0.0:22277
 # Datatype: Double
 # data_region_per_processor=0.5
 
+# Region allocate strategy
+# These allocate strategies are currently supported:
+# 1. GREEDY(Default, when region is allocated, always choose the dataNode that has bean allocated the least regions)
+# 2. COPY_SET(Random replication according to wight calculated from number of regions on all online dataNodes, suitable for large clusters)
+# Datatype: String
+# region_allocate_strategy=GREEDY
+
 ####################
 ### PartitionSlot configuration
 ####################
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index e9b74c10d8..b17694f12d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.conf;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -62,6 +63,10 @@ public class ConfigNodeConfig {
   /** The maximum number of SchemaRegion expected to be managed by each DataNode. */
   private double dataRegionPerProcessor = 0.5;
 
+  /** region allocate strategy. */
+  private RegionBalancer.RegionAllocateStrategy regionAllocateStrategy =
+      RegionBalancer.RegionAllocateStrategy.GREEDY;
+
   /**
    * ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
    * clients.
@@ -337,6 +342,15 @@ public class ConfigNodeConfig {
     this.dataRegionPerProcessor = dataRegionPerProcessor;
   }
 
+  public RegionBalancer.RegionAllocateStrategy getRegionAllocateStrategy() {
+    return regionAllocateStrategy;
+  }
+
+  public void setRegionAllocateStrategy(
+      RegionBalancer.RegionAllocateStrategy regionAllocateStrategy) {
+    this.regionAllocateStrategy = regionAllocateStrategy;
+  }
+
   public int getThriftServerAwaitTimeForStopService() {
     return thriftServerAwaitTimeForStopService;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 2fb8c9ef95..277defe805 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 
 import org.slf4j.Logger;
@@ -159,6 +160,16 @@ public class ConfigNodeDescriptor {
               properties.getProperty(
                   "data_region_per_processor", String.valueOf(conf.getDataRegionPerProcessor()))));
 
+      try {
+        conf.setRegionAllocateStrategy(
+            RegionBalancer.RegionAllocateStrategy.valueOf(
+                properties.getProperty(
+                    "region_allocate_strategy", conf.getRegionAllocateStrategy().name())));
+      } catch (IllegalArgumentException e) {
+        LOGGER.warn(
+            "The configured region allocate strategy does not exist, use the default: GREEDY!");
+      }
+
       conf.setRpcAdvancedCompressionEnable(
           Boolean.parseBoolean(
               properties.getProperty(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index c028f61845..430c9f2722 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
@@ -30,6 +32,7 @@ import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.NodeManager;
 import org.apache.iotdb.confignode.manager.PartitionManager;
 import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionAllocator;
+import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionAllocator;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 
@@ -41,6 +44,9 @@ import java.util.Map;
  */
 public class RegionBalancer {
 
+  private static final ConfigNodeConfig CONFIG_NODE_CONFIG =
+      ConfigNodeDescriptor.getInstance().getConf();
+
   private final IManager configManager;
 
   public RegionBalancer(IManager configManager) {
@@ -103,8 +109,17 @@ public class RegionBalancer {
   }
 
   private IRegionAllocator genRegionAllocator() {
-    // TODO: The RegionAllocator should be configurable
-    return new CopySetRegionAllocator();
+    RegionBalancer.RegionAllocateStrategy regionAllocateStrategy =
+        CONFIG_NODE_CONFIG.getRegionAllocateStrategy();
+    if (regionAllocateStrategy == null) {
+      return new GreedyRegionAllocator();
+    }
+    switch (regionAllocateStrategy) {
+      case COPY_SET:
+        return new CopySetRegionAllocator();
+      default:
+        return new GreedyRegionAllocator();
+    }
   }
 
   private NodeManager getNodeManager() {
@@ -118,4 +133,10 @@ public class RegionBalancer {
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
+
+  /** region allocate strategy */
+  public enum RegionAllocateStrategy {
+    COPY_SET,
+    GREEDY
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
index 384c919e83..faed7f3a55 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
@@ -82,7 +82,7 @@ public class CopySetRegionAllocator implements IRegionAllocator {
       List<TDataNodeInfo> onlineDataNodes, List<TRegionReplicaSet> allocatedRegions) {
 
     // TODO: The remaining disk capacity of DataNode can also be calculated into the weightList
-
+    this.weightList.clear();
     int maximumRegionNum = 0;
     Map<TDataNodeLocation, Integer> countMap = new HashMap<>();
     for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java
new file mode 100644
index 0000000000..7dd47003a4
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static java.util.Map.Entry.comparingByValue;
+
+/** Allocate Region Greedily */
+public class GreedyRegionAllocator implements IRegionAllocator {
+
+  public GreedyRegionAllocator() {}
+
+  @Override
+  public TRegionReplicaSet allocateRegion(
+      List<TDataNodeInfo> onlineDataNodes,
+      List<TRegionReplicaSet> allocatedRegions,
+      int replicationFactor,
+      TConsensusGroupId consensusGroupId) {
+    // Build weightList order by number of regions allocated asc
+    List<TDataNodeLocation> weightList = buildWeightList(onlineDataNodes, allocatedRegions);
+    return new TRegionReplicaSet(
+        consensusGroupId,
+        weightList.stream().limit(replicationFactor).collect(Collectors.toList()));
+  }
+
+  private List<TDataNodeLocation> buildWeightList(
+      List<TDataNodeInfo> onlineDataNodes, List<TRegionReplicaSet> allocatedRegions) {
+    Map<TDataNodeLocation, Integer> countMap = new HashMap<>();
+    for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+      countMap.put(dataNodeInfo.getLocation(), 0);
+    }
+
+    for (TRegionReplicaSet regionReplicaSet : allocatedRegions) {
+      for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+        countMap.computeIfPresent(dataNodeLocation, (dataNode, count) -> (count + 1));
+      }
+    }
+    return countMap.entrySet().stream()
+        .sorted(comparingByValue())
+        .map(e -> e.getKey().deepCopy())
+        .collect(Collectors.toList());
+  }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocatorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocatorTest.java
new file mode 100644
index 0000000000..9752c01bd2
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocatorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GreedyRegionAllocatorTest {
+
+  @Test
+  public void testAllocateRegion() {
+    GreedyRegionAllocator greedyRegionAllocator = new GreedyRegionAllocator();
+    List<TDataNodeInfo> registeredDataNodes =
+        Lists.newArrayList(
+            new TDataNodeInfo(new TDataNodeLocation(1, null, null, null, null, null), 0, 0),
+            new TDataNodeInfo(new TDataNodeLocation(2, null, null, null, null, null), 0, 0),
+            new TDataNodeInfo(new TDataNodeLocation(3, null, null, null, null, null), 0, 0));
+    List<TRegionReplicaSet> allocatedRegions = new ArrayList<>();
+    List<TConsensusGroupId> tConsensusGroupIds =
+        Lists.newArrayList(
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0),
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1),
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 5));
+    for (TConsensusGroupId tConsensusGroupId : tConsensusGroupIds) {
+      TRegionReplicaSet newRegion =
+          greedyRegionAllocator.allocateRegion(
+              registeredDataNodes, allocatedRegions, 1, tConsensusGroupId);
+      allocatedRegions.add(newRegion);
+    }
+
+    Map<TDataNodeLocation, Integer> countMap = new HashMap<>();
+    for (TDataNodeInfo dataNodeInfo : registeredDataNodes) {
+      countMap.put(dataNodeInfo.getLocation(), 0);
+    }
+
+    for (TRegionReplicaSet regionReplicaSet : allocatedRegions) {
+      for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+        countMap.computeIfPresent(dataNodeLocation, (dataNode, count) -> (count + 1));
+      }
+    }
+
+    Assert.assertTrue(countMap.values().stream().mapToInt(e -> e).max().getAsInt() <= 2);
+    Assert.assertTrue(
+        Collections.disjoint(
+            allocatedRegions.get(0).getDataNodeLocations(),
+            allocatedRegions.get(1).getDataNodeLocations()));
+    Assert.assertTrue(
+        Collections.disjoint(
+            allocatedRegions.get(2).getDataNodeLocations(),
+            allocatedRegions.get(3).getDataNodeLocations()));
+    Assert.assertTrue(
+        Collections.disjoint(
+            allocatedRegions.get(4).getDataNodeLocations(),
+            allocatedRegions.get(5).getDataNodeLocations()));
+  }
+}
diff --git a/docs/UserGuide/Reference/ConfigNode-Config-Manual.md b/docs/UserGuide/Reference/ConfigNode-Config-Manual.md
index 1e474458a6..b02fe6c7ca 100644
--- a/docs/UserGuide/Reference/ConfigNode-Config-Manual.md
+++ b/docs/UserGuide/Reference/ConfigNode-Config-Manual.md
@@ -190,6 +190,16 @@ The global configuration of cluster is in ConfigNode.
 |Default| org.apache.iotdb.consensus.standalone.StandAloneConsensus |
 |Effective|Only allowed to be modified in first start up|
 
+
+* region\_allocate\_strategy
+
+|Name| region\_allocate\_strategy |
+|:---:|:---|
+|Description| Region allocate strategy, COPY_SET is suitable for large clusters, GREEDY is suitable for small clusters  |
+|Type| String |
+|Default| GREEDY |
+|Effective|After restarting system |
+
 ### HeartBeat 
 
 * heartbeat\_interval
diff --git a/docs/zh/UserGuide/Reference/ConfigNode-Config-Manual.md b/docs/zh/UserGuide/Reference/ConfigNode-Config-Manual.md
index 9ba96f9e55..5c6aa96745 100644
--- a/docs/zh/UserGuide/Reference/ConfigNode-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/ConfigNode-Config-Manual.md
@@ -186,6 +186,15 @@ IoTDB 集群的全局配置通过 ConfigNode 配置。
 |默认值| org.apache.iotdb.consensus.standalone.StandAloneConsensus |
 |改后生效方式|仅允许在第一次启动服务前修改|
 
+* region\_allocate\_strategy
+
+|名字| region\_allocate\_strategy |
+|:---:|:---|
+|描述| 元数据和数据的节点分配策略,COPY_SET适用于大集群;当数据节点数量较少时,GREEDY表现更佳|
+|类型| String |
+|默认值| GREEDY |
+|改后生效方式|重启服务生效|
+
 ### 心跳配置
 
 * heartbeat\_interval