You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/26 07:34:51 UTC
[iotdb] branch master updated: [IOTDB-4066] Allocate new RegionGroups through double keyword sort (#8140)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 992ae7f07e [IOTDB-4066] Allocate new RegionGroups through double keyword sort (#8140)
992ae7f07e is described below
commit 992ae7f07edc3c459b296999012171aa283067b5
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sat Nov 26 15:34:45 2022 +0800
[IOTDB-4066] Allocate new RegionGroups through double keyword sort (#8140)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 12 +-
.../confignode/conf/ConfigNodeDescriptor.java | 2 +-
.../iotdb/confignode/manager/load/LoadManager.java | 2 +-
.../manager/load/balancer/RegionBalancer.java | 85 ++++++-------
...cator.java => CopySetRegionGroupAllocator.java} | 15 +--
.../balancer/region/GreedyRegionAllocator.java | 68 ----------
.../region/GreedyRegionGroupAllocator.java | 101 +++++++++++++++
...onAllocator.java => IRegionGroupAllocator.java} | 24 ++--
.../iotdb/confignode/manager/node/NodeManager.java | 12 ++
.../node/heartbeat/DataNodeHeartbeatCache.java | 13 ++
.../node/heartbeat/NodeHeartbeatSample.java | 21 +---
.../balancer/region/GreedyRegionAllocatorTest.java | 92 --------------
.../region/GreedyRegionGroupAllocatorTest.java | 140 +++++++++++++++++++++
.../resources/conf/iotdb-common.properties | 7 --
.../impl/DataNodeInternalRPCServiceImpl.java | 1 +
thrift/src/main/thrift/datanode.thrift | 3 +
16 files changed, 346 insertions(+), 252 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 0414f8885d..03f0e1c7ba 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -72,8 +72,8 @@ public class ConfigNodeConfig {
private int leastDataRegionGroupNum = 5;
/** region allocate strategy. */
- private RegionBalancer.RegionAllocateStrategy regionAllocateStrategy =
- RegionBalancer.RegionAllocateStrategy.GREEDY;
+ private RegionBalancer.RegionGroupAllocateStrategy regionGroupAllocateStrategy =
+ RegionBalancer.RegionGroupAllocateStrategy.GREEDY;
/**
* DataPartition within the same SeriesPartitionSlot will inherit the allocation result of the
@@ -445,13 +445,13 @@ public class ConfigNodeConfig {
this.leastDataRegionGroupNum = leastDataRegionGroupNum;
}
- public RegionBalancer.RegionAllocateStrategy getRegionAllocateStrategy() {
- return regionAllocateStrategy;
+ public RegionBalancer.RegionGroupAllocateStrategy getRegionAllocateStrategy() {
+ return regionGroupAllocateStrategy;
}
public void setRegionAllocateStrategy(
- RegionBalancer.RegionAllocateStrategy regionAllocateStrategy) {
- this.regionAllocateStrategy = regionAllocateStrategy;
+ RegionBalancer.RegionGroupAllocateStrategy regionGroupAllocateStrategy) {
+ this.regionGroupAllocateStrategy = regionGroupAllocateStrategy;
}
public boolean isEnableDataPartitionInheritPolicy() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 2e7560581d..b4a91534c1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -220,7 +220,7 @@ public class ConfigNodeDescriptor {
try {
conf.setRegionAllocateStrategy(
- RegionBalancer.RegionAllocateStrategy.valueOf(
+ RegionBalancer.RegionGroupAllocateStrategy.valueOf(
properties
.getProperty("region_allocate_strategy", conf.getRegionAllocateStrategy().name())
.trim()));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index b601f03a2f..f37bf20d56 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -112,7 +112,7 @@ public class LoadManager {
public CreateRegionGroupsPlan allocateRegionGroups(
Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
throws NotEnoughDataNodeException, StorageGroupNotExistsException {
- return regionBalancer.genRegionsAllocationPlan(allotmentMap, consensusGroupType);
+ return regionBalancer.genRegionGroupsAllocationPlan(allotmentMap, consensusGroupType);
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index e16541d6bc..da366ab436 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -23,58 +23,65 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionAllocator;
-import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionAllocator;
-import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionAllocator;
+import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator;
+import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
+import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* The RegionBalancer provides interfaces to generate optimal Region allocation and migration plans
*/
public class RegionBalancer {
- private static final ConfigNodeConfig CONFIG_NODE_CONFIG =
- ConfigNodeDescriptor.getInstance().getConf();
-
private final IManager configManager;
+ private final IRegionGroupAllocator regionGroupAllocator;
public RegionBalancer(IManager configManager) {
this.configManager = configManager;
+
+ switch (ConfigNodeDescriptor.getInstance().getConf().getRegionAllocateStrategy()) {
+ case COPY_SET:
+ this.regionGroupAllocator = new CopySetRegionGroupAllocator();
+ break;
+ case GREEDY:
+ default:
+ this.regionGroupAllocator = new GreedyRegionGroupAllocator();
+ }
}
/**
- * Generate a Regions' allocation plan(CreateRegionsPlan)
+ * Generate a RegionGroups' allocation plan(CreateRegionGroupsPlan)
*
- * @param allotmentMap Map<StorageGroupName, Region allotment>
- * @param consensusGroupType TConsensusGroupType of the new Regions
- * @return CreateRegionsPlan
+ * @param allotmentMap Map<StorageGroupName, RegionGroup allotment>
+ * @param consensusGroupType TConsensusGroupType of the new RegionGroups
+ * @return CreateRegionGroupsPlan
* @throws NotEnoughDataNodeException When the number of DataNodes is not enough for allocation
* @throws StorageGroupNotExistsException When some StorageGroups don't exist
*/
- public CreateRegionGroupsPlan genRegionsAllocationPlan(
+ public CreateRegionGroupsPlan genRegionGroupsAllocationPlan(
Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
throws NotEnoughDataNodeException, StorageGroupNotExistsException {
- // The new Regions will occupy online DataNodes firstly
+ // The new RegionGroups will occupy online DataNodes firstly
List<TDataNodeConfiguration> onlineDataNodes =
getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
- // Some new Regions will have to occupy unknown DataNodes
+ // Some new RegionGroups will have to occupy unknown DataNodes
// if the number of online DataNodes is insufficient
List<TDataNodeConfiguration> availableDataNodes =
getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown);
- // Make sure the number of available DataNodes is enough for allocating new Regions
+ // Make sure the number of available DataNodes is enough for allocating new RegionGroups
for (String storageGroup : allotmentMap.keySet()) {
int replicationFactor =
getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType);
@@ -84,11 +91,9 @@ public class RegionBalancer {
}
CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
- IRegionAllocator regionAllocator = genRegionAllocator();
// Only considering the specified ConsensusGroupType when doing allocation
- List<TRegionReplicaSet> allocatedRegions = getPartitionManager().getAllReplicaSets();
- allocatedRegions.removeIf(
- allocateRegion -> allocateRegion.getRegionId().getType() != consensusGroupType);
+ List<TRegionReplicaSet> allocatedRegionGroups =
+ getPartitionManager().getAllReplicaSets(consensusGroupType);
for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
String storageGroup = entry.getKey();
@@ -99,38 +104,35 @@ public class RegionBalancer {
onlineDataNodes.size() >= replicationFactor ? onlineDataNodes : availableDataNodes;
for (int i = 0; i < allotment; i++) {
+ // Prepare input data
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap = new ConcurrentHashMap<>();
+ Map<Integer, Long> freeDiskSpaceMap = new ConcurrentHashMap<>();
+ targetDataNodes.forEach(
+ dataNodeConfiguration -> {
+ int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
+ availableDataNodeMap.put(dataNodeId, dataNodeConfiguration);
+ freeDiskSpaceMap.put(dataNodeId, getNodeManager().getFreeDiskSpace(dataNodeId));
+ });
+
// Generate allocation plan
- TRegionReplicaSet newRegion =
- regionAllocator.allocateRegion(
- targetDataNodes,
- allocatedRegions,
+ TRegionReplicaSet newRegionGroup =
+ regionGroupAllocator.generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedRegionGroups,
replicationFactor,
new TConsensusGroupId(
consensusGroupType, getPartitionManager().generateNextRegionGroupId()));
- createRegionGroupsPlan.addRegionGroup(storageGroup, newRegion);
+ createRegionGroupsPlan.addRegionGroup(storageGroup, newRegionGroup);
- allocatedRegions.add(newRegion);
+ // Mark the new RegionGroup as allocated
+ allocatedRegionGroups.add(newRegionGroup);
}
}
return createRegionGroupsPlan;
}
- private IRegionAllocator genRegionAllocator() {
- RegionBalancer.RegionAllocateStrategy regionAllocateStrategy =
- CONFIG_NODE_CONFIG.getRegionAllocateStrategy();
- if (regionAllocateStrategy == null) {
- return new GreedyRegionAllocator();
- }
- switch (regionAllocateStrategy) {
- case COPY_SET:
- return new CopySetRegionAllocator();
- case GREEDY:
- default:
- return new GreedyRegionAllocator();
- }
- }
-
private NodeManager getNodeManager() {
return configManager.getNodeManager();
}
@@ -143,8 +145,7 @@ public class RegionBalancer {
return configManager.getPartitionManager();
}
- /** region allocate strategy */
- public enum RegionAllocateStrategy {
+ public enum RegionGroupAllocateStrategy {
COPY_SET,
GREEDY
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionGroupAllocator.java
similarity index 91%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionGroupAllocator.java
index 2259717dff..13c21fcaf2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionGroupAllocator.java
@@ -36,7 +36,7 @@ import java.util.Set;
* Allocate Region by CopySet algorithm. Reference: <a
* href="https://www.usenix.org/conference/atc13/technical-sessions/presentation/cidon">...</a>
*/
-public class CopySetRegionAllocator implements IRegionAllocator {
+public class CopySetRegionGroupAllocator implements IRegionGroupAllocator {
private static final int maximumRandomNum = 10;
@@ -44,26 +44,27 @@ public class CopySetRegionAllocator implements IRegionAllocator {
private int intersectionSize = 0;
private final List<TDataNodeLocation> weightList;
- public CopySetRegionAllocator() {
+ public CopySetRegionGroupAllocator() {
this.weightList = new ArrayList<>();
}
@Override
- public TRegionReplicaSet allocateRegion(
- List<TDataNodeConfiguration> targetDataNodes,
- List<TRegionReplicaSet> allocatedRegions,
+ public TRegionReplicaSet generateOptimalRegionReplicasDistribution(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Long> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
int replicationFactor,
TConsensusGroupId consensusGroupId) {
TRegionReplicaSet result = null;
// Build weightList for weighted random
- buildWeightList(targetDataNodes, allocatedRegions);
+ buildWeightList(new ArrayList<>(availableDataNodeMap.values()), allocatedRegionGroups);
boolean accepted = false;
while (true) {
for (int retry = 0; retry < maximumRandomNum; retry++) {
result = genWeightedRandomRegion(replicationFactor);
- if (intersectionCheck(allocatedRegions, result)) {
+ if (intersectionCheck(allocatedRegionGroups, result)) {
accepted = true;
break;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java
deleted file mode 100644
index 97ac37fbae..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager.load.balancer.region;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static java.util.Map.Entry.comparingByValue;
-
-/** Allocate Region Greedily */
-public class GreedyRegionAllocator implements IRegionAllocator {
-
- public GreedyRegionAllocator() {}
-
- @Override
- public TRegionReplicaSet allocateRegion(
- List<TDataNodeConfiguration> targetDataNodes,
- List<TRegionReplicaSet> allocatedRegions,
- int replicationFactor,
- TConsensusGroupId consensusGroupId) {
- // Build weightList order by number of regions allocated asc
- List<TDataNodeLocation> weightList = buildWeightList(targetDataNodes, allocatedRegions);
- return new TRegionReplicaSet(
- consensusGroupId,
- weightList.stream().limit(replicationFactor).collect(Collectors.toList()));
- }
-
- private List<TDataNodeLocation> buildWeightList(
- List<TDataNodeConfiguration> onlineDataNodes, List<TRegionReplicaSet> allocatedRegions) {
- Map<TDataNodeLocation, Integer> countMap = new HashMap<>();
- for (TDataNodeConfiguration dataNodeInfo : onlineDataNodes) {
- countMap.put(dataNodeInfo.getLocation(), 0);
- }
-
- for (TRegionReplicaSet regionReplicaSet : allocatedRegions) {
- for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
- countMap.computeIfPresent(dataNodeLocation, (dataNode, count) -> (count + 1));
- }
- }
- return countMap.entrySet().stream()
- .sorted(comparingByValue())
- .map(e -> e.getKey().deepCopy())
- .collect(Collectors.toList());
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
new file mode 100644
index 0000000000..5c0fdc1724
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static java.util.Map.Entry.comparingByValue;
+
+/** Allocate Region Greedily */
+public class GreedyRegionGroupAllocator implements IRegionGroupAllocator {
+
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ public GreedyRegionGroupAllocator() {
+ // Empty constructor
+ }
+
+ @Override
+ public TRegionReplicaSet generateOptimalRegionReplicasDistribution(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Long> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ int replicationFactor,
+ TConsensusGroupId consensusGroupId) {
+ // Build weightList order by number of regions allocated asc
+ List<TDataNodeLocation> weightList =
+ buildWeightList(availableDataNodeMap, freeDiskSpaceMap, allocatedRegionGroups);
+ return new TRegionReplicaSet(
+ consensusGroupId,
+ weightList.stream().limit(replicationFactor).collect(Collectors.toList()));
+ }
+
+ private List<TDataNodeLocation> buildWeightList(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Long> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups) {
+ // Map<DataNodeId, Region count>
+ Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
+ allocatedRegionGroups.forEach(
+ regionReplicaSet ->
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ regionCounter
+ .computeIfAbsent(
+ dataNodeLocation.getDataNodeId(), empty -> new AtomicInteger(0))
+ .getAndIncrement()));
+
+ /* Construct priority map */
+ Map<TDataNodeLocation, Pair<Integer, Long>> priorityMap = new ConcurrentHashMap<>();
+ availableDataNodeMap
+ .keySet()
+ .forEach(
+ dataNodeId ->
+ priorityMap.put(
+ availableDataNodeMap.get(dataNodeId).getLocation(),
+ new Pair<>(
+ regionCounter.getOrDefault(dataNodeId, ZERO).get(),
+ freeDiskSpaceMap.getOrDefault(dataNodeId, 0L))));
+
+ return priorityMap.entrySet().stream()
+ .sorted(
+ comparingByValue(
+ (o1, o2) ->
+ !Objects.equals(o1.getLeft(), o2.getLeft())
+ // Compare the first key(The number of Regions) by ascending order
+ ? o1.getLeft() - o2.getLeft()
+ // Compare the second key(The free disk space) by descending order
+ : (int) (o2.getRight() - o1.getRight())))
+ .map(entry -> entry.getKey().deepCopy())
+ .collect(Collectors.toList());
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
similarity index 70%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
index 315ef74d33..c33f2ead2d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java
@@ -23,26 +23,24 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import java.util.List;
+import java.util.Map;
-/**
- * The IRegionAllocator is a functional interface, which means a new functional class who implements
- * the IRegionAllocator must be created for each Region allocation.
- */
-public interface IRegionAllocator {
+public interface IRegionGroupAllocator {
/**
- * Calculating the next optimal TRegionReplicaSet based on the current online DataNodes and
- * allocated Regions
+ * Generate an optimal RegionReplicas' distribution for a new RegionGroup
*
- * @param targetDataNodes DataNodes that can be used for allocation
- * @param allocatedRegions Allocated Regions
+ * @param availableDataNodeMap DataNodes that can be used for allocation
+ * @param freeDiskSpaceMap The free disk space of the DataNodes
+ * @param allocatedRegionGroups Allocated RegionGroups
* @param replicationFactor Replication factor of TRegionReplicaSet
* @param consensusGroupId TConsensusGroupId of result TRegionReplicaSet
- * @return The optimal TRegionReplicaSet derived by the specific algorithm
+ * @return The optimal TRegionReplicaSet derived by the specified algorithm
*/
- TRegionReplicaSet allocateRegion(
- List<TDataNodeConfiguration> targetDataNodes,
- List<TRegionReplicaSet> allocatedRegions,
+ TRegionReplicaSet generateOptimalRegionReplicasDistribution(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Long> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
int replicationFactor,
TConsensusGroupId consensusGroupId);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 19f07bb2b1..c6335cc30d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -925,6 +925,18 @@ public class NodeManager {
return result;
}
+ /**
+ * Get the free disk space of the specified DataNode
+ *
+ * @param dataNodeId The index of the specified DataNode
+ * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
+ */
+ public long getFreeDiskSpace(int dataNodeId) {
+ DataNodeHeartbeatCache dataNodeHeartbeatCache =
+ (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId);
+ return dataNodeHeartbeatCache == null ? 0 : dataNodeHeartbeatCache.getFreeDiskSpace();
+ }
+
/**
* Get the DataNodeLocation of the DataNode which has the lowest loadScore
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
index f9d5df7008..d35dce9d4e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
@@ -19,13 +19,17 @@
package org.apache.iotdb.confignode.manager.node.heartbeat;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
public class DataNodeHeartbeatCache extends BaseNodeCache {
+ private volatile TLoadSample latestLoadSample;
+
/** Constructor for create DataNodeHeartbeatCache with default NodeStatistics */
public DataNodeHeartbeatCache() {
super();
+ this.latestLoadSample = new TLoadSample((short) 0, 0, 0, 0);
}
@Override
@@ -38,6 +42,11 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
}
long lastSendTime = lastSample == null ? 0 : lastSample.getSendTimestamp();
+ /* Update load sample */
+ if (lastSample != null && lastSample.isSetLoadSample()) {
+ latestLoadSample = lastSample.getLoadSample();
+ }
+
/* Update Node status */
NodeStatus status = null;
String statusReason = null;
@@ -60,4 +69,8 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
currentStatistics = newStatistics;
}
}
+
+ public long getFreeDiskSpace() {
+ return latestLoadSample.getFreeDiskSpace();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
index 531a88dcc3..7510c50348 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
@@ -31,9 +31,7 @@ public class NodeHeartbeatSample {
private NodeStatus status;
private String statusReason;
- private short cpuUsageRate;
- private double memoryUsageRate;
- private double diskUsageRate;
+ private TLoadSample loadSample = null;
/** Constructor for ConfigNode sample */
public NodeHeartbeatSample(long sendTimestamp, long receiveTimestamp) {
@@ -50,10 +48,7 @@ public class NodeHeartbeatSample {
this.statusReason = heartbeatResp.isSetStatusReason() ? heartbeatResp.getStatusReason() : null;
if (heartbeatResp.isSetLoadSample()) {
- TLoadSample loadSample = heartbeatResp.getLoadSample();
- this.cpuUsageRate = loadSample.getCpuUsageRate();
- this.memoryUsageRate = loadSample.getMemoryUsageRate();
- this.diskUsageRate = loadSample.getDiskUsageRate();
+ this.loadSample = heartbeatResp.getLoadSample();
}
}
@@ -73,15 +68,11 @@ public class NodeHeartbeatSample {
return statusReason;
}
- public short getCpuUsageRate() {
- return cpuUsageRate;
+ public boolean isSetLoadSample() {
+ return loadSample != null;
}
- public double getMemoryUsageRate() {
- return memoryUsageRate;
- }
-
- public double getDiskUsageRate() {
- return diskUsageRate;
+ public TLoadSample getLoadSample() {
+ return loadSample;
}
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocatorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocatorTest.java
deleted file mode 100644
index bfab2fb281..0000000000
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocatorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager.load.balancer.region;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TNodeResource;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-
-import com.google.common.collect.Lists;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class GreedyRegionAllocatorTest {
-
- @Test
- public void testAllocateRegion() {
- GreedyRegionAllocator greedyRegionAllocator = new GreedyRegionAllocator();
- List<TDataNodeConfiguration> registeredDataNodes =
- Lists.newArrayList(
- new TDataNodeConfiguration(
- new TDataNodeLocation(1, null, null, null, null, null), new TNodeResource()),
- new TDataNodeConfiguration(
- new TDataNodeLocation(2, null, null, null, null, null), new TNodeResource()),
- new TDataNodeConfiguration(
- new TDataNodeLocation(3, null, null, null, null, null), new TNodeResource()));
- List<TRegionReplicaSet> allocatedRegions = new ArrayList<>();
- List<TConsensusGroupId> tConsensusGroupIds =
- Lists.newArrayList(
- new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0),
- new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1),
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 5));
- for (TConsensusGroupId tConsensusGroupId : tConsensusGroupIds) {
- TRegionReplicaSet newRegion =
- greedyRegionAllocator.allocateRegion(
- registeredDataNodes, allocatedRegions, 1, tConsensusGroupId);
- allocatedRegions.add(newRegion);
- }
-
- Map<TDataNodeLocation, Integer> countMap = new HashMap<>();
- for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
- countMap.put(dataNodeInfo.getLocation(), 0);
- }
-
- for (TRegionReplicaSet regionReplicaSet : allocatedRegions) {
- for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
- countMap.computeIfPresent(dataNodeLocation, (dataNode, count) -> (count + 1));
- }
- }
-
- Assert.assertTrue(countMap.values().stream().mapToInt(e -> e).max().getAsInt() <= 2);
- Assert.assertTrue(
- Collections.disjoint(
- allocatedRegions.get(0).getDataNodeLocations(),
- allocatedRegions.get(1).getDataNodeLocations()));
- Assert.assertTrue(
- Collections.disjoint(
- allocatedRegions.get(2).getDataNodeLocations(),
- allocatedRegions.get(3).getDataNodeLocations()));
- Assert.assertTrue(
- Collections.disjoint(
- allocatedRegions.get(4).getDataNodeLocations(),
- allocatedRegions.get(5).getDataNodeLocations()));
- }
-}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java
new file mode 100644
index 0000000000..fa9c2378ef
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class GreedyRegionGroupAllocatorTest {
+
+ private static final GreedyRegionGroupAllocator ALLOCATOR = new GreedyRegionGroupAllocator();
+ private static final int TEST_REPLICATION_FACTOR = 3;
+
+ @Test
+ public void testEvenDistribution() {
+ /* Construct input data */
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap = new ConcurrentHashMap<>();
+ Map<Integer, Long> freeSpaceMap = new ConcurrentHashMap<>();
+ Random random = new Random();
+ // Set 6 DataNodes
+ for (int i = 0; i < 6; i++) {
+ availableDataNodeMap.put(
+ i, new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i)));
+ freeSpaceMap.put(i, random.nextLong());
+ }
+
+ /* Allocate 6 RegionGroups */
+ List<TRegionReplicaSet> allocatedRegionGroups = new ArrayList<>();
+ for (int index = 0; index < 6; index++) {
+ TRegionReplicaSet newRegionGroup =
+ ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeSpaceMap,
+ allocatedRegionGroups,
+ TEST_REPLICATION_FACTOR,
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, index));
+ allocatedRegionGroups.add(newRegionGroup);
+ }
+
+ /* Check result */
+ Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
+ allocatedRegionGroups.forEach(
+ regionReplicaSet ->
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ regionCounter
+ .computeIfAbsent(
+ dataNodeLocation.getDataNodeId(), empty -> new AtomicInteger(0))
+ .getAndIncrement()));
+ // Each DataNode should have exactly 3 Regions since the all 18 Regions are distributed to 6
+ // DataNodes evenly
+ Assert.assertEquals(6, regionCounter.size());
+ regionCounter.forEach((dataNodeId, regionCount) -> Assert.assertEquals(3, regionCount.get()));
+ }
+
+ @Test
+ public void testUnevenDistribution() {
+ /* Construct input data */
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap = new ConcurrentHashMap<>();
+ // Set 4 DataNodes
+ for (int i = 0; i < 4; i++) {
+ availableDataNodeMap.put(
+ i, new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i)));
+ }
+ Map<Integer, Long> freeSpaceMap = new ConcurrentHashMap<>();
+ freeSpaceMap.put(0, 20000331L);
+ freeSpaceMap.put(1, 20000522L);
+ freeSpaceMap.put(2, 666L);
+ freeSpaceMap.put(3, 999L);
+
+ /* Allocate the first RegionGroup */
+ List<TRegionReplicaSet> allocatedRegionGroups = new ArrayList<>();
+ TRegionReplicaSet newRegionGroup =
+ ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeSpaceMap,
+ allocatedRegionGroups,
+ TEST_REPLICATION_FACTOR,
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0));
+ allocatedRegionGroups.add(newRegionGroup);
+ Set<Integer> dataNodeIdSet = new HashSet<>();
+ newRegionGroup
+ .getDataNodeLocations()
+ .forEach(dataNodeLocation -> dataNodeIdSet.add(dataNodeLocation.getDataNodeId()));
+ // The result should be the 3 DataNodes who have the maximum free disk space
+ Assert.assertTrue(dataNodeIdSet.contains(0));
+ Assert.assertTrue(dataNodeIdSet.contains(1));
+ Assert.assertTrue(dataNodeIdSet.contains(3));
+ dataNodeIdSet.clear();
+
+ /* Allocate the second RegionGroup */
+ newRegionGroup =
+ ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeSpaceMap,
+ allocatedRegionGroups,
+ TEST_REPLICATION_FACTOR,
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1));
+ newRegionGroup
+ .getDataNodeLocations()
+ .forEach(dataNodeLocation -> dataNodeIdSet.add(dataNodeLocation.getDataNodeId()));
+ // The result should contain the DataNode-2 and
+ // other 2 DataNodes who have the maximum free disk space
+ Assert.assertTrue(dataNodeIdSet.contains(0));
+ Assert.assertTrue(dataNodeIdSet.contains(1));
+ Assert.assertTrue(dataNodeIdSet.contains(2));
+ }
+}
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 15539ac8dd..7dbf13fc31 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -95,13 +95,6 @@
# Datatype: int
# least_data_region_group_num=5
-# Region allocate strategy
-# These allocate strategies are currently supported:
-# 1. GREEDY(Default, when region is allocated, always choose the dataNode that has been allocated the minimum regions)
-# 2. COPY_SET(Random replication according to weight calculated from number of regions on all online dataNodes, suitable for large clusters)
-# Datatype: String
-# region_allocate_strategy=GREEDY
-
# Whether to enable the DataPartition inherit policy.
# DataPartition within the same SeriesPartitionSlot will inherit
# the allocation result of the previous TimePartitionSlot if set true
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 3d76821ff8..591b5bde43 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1048,6 +1048,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
if (freeDisk != 0 && totalDisk != 0) {
double freeDiskRatio = (double) freeDisk / totalDisk;
+ loadSample.setFreeDiskSpace(freeDisk);
loadSample.setDiskUsageRate(1.0 - freeDiskRatio);
// Reset NodeStatus if necessary
if (freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index eb6abc0364..b1efcb63af 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -231,6 +231,9 @@ struct TLoadSample {
2: required double memoryUsageRate
// Percentage of occupied disk space in DataNode
3: required double diskUsageRate
+ // The size of free disk space
+ // Unit: Byte
+ 4: required i64 freeDiskSpace
}
struct TRegionRouteReq {