You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/11/27 06:55:55 UTC
[iotdb] branch master updated: [IOTDB-5058] Add custom RegionGroup extension policy (#8199)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1966a070f6 [IOTDB-5058] Add custom RegionGroup extension policy (#8199)
1966a070f6 is described below
commit 1966a070f66144dc0df831cc5dd034ef873367be
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sun Nov 27 14:55:50 2022 +0800
[IOTDB-5058] Add custom RegionGroup extension policy (#8199)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 88 ++++++----
.../confignode/conf/ConfigNodeDescriptor.java | 60 ++++---
.../confignode/manager/ClusterSchemaManager.java | 14 ++
.../manager/load/balancer/RegionBalancer.java | 4 +-
.../manager/load/balancer/RouteBalancer.java | 4 +-
.../partition/DataRegionGroupExtensionPolicy.java | 47 +++++
.../manager/partition/PartitionManager.java | 190 +++++++++++++--------
.../java/org/apache/iotdb/it/env/MppConfig.java | 7 +
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +
.../it/partition/IoTDBRegionGroupExtensionIT.java | 136 +++++++++++++++
.../resources/conf/iotdb-common.properties | 49 ++++--
11 files changed, 463 insertions(+), 144 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 98d37ba2e4..3a235f02dd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
+import org.apache.iotdb.confignode.manager.partition.DataRegionGroupExtensionPolicy;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.rpc.RpcUtils;
@@ -56,24 +57,44 @@ public class ConfigNodeConfig {
/** Schema region consensus protocol */
private String schemaRegionConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
- /** The maximum number of SchemaRegion expected to be managed by each DataNode. */
- private double schemaRegionPerDataNode = 1.0;
+ /** Default number of SchemaRegion replicas */
+ private int schemaReplicationFactor = 1;
/** Data region consensus protocol */
private String dataRegionConsensusProtocolClass = ConsensusFactory.IOT_CONSENSUS;
- /** The maximum number of DataRegion expected to be managed by each DataNode. */
- private double dataRegionPerProcessor = 0.5;
+ /** Default number of DataRegion replicas */
+ private int dataReplicationFactor = 1;
+
+ /** Number of SeriesPartitionSlots per StorageGroup */
+ private int seriesPartitionSlotNum = 10000;
+
+ /** SeriesPartitionSlot executor class */
+ private String seriesPartitionExecutorClass =
+ "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
+
+ /** The maximum number of SchemaRegions expected to be managed by each DataNode. */
+ private double schemaRegionPerDataNode = schemaReplicationFactor;
+
+ /** The policy of extension DataRegionGroup for each Database. */
+ private DataRegionGroupExtensionPolicy dataRegionGroupExtensionPolicy =
+ DataRegionGroupExtensionPolicy.AUTO;
+
+ /** The number of DataRegionGroups for each Database */
+ private int dataRegionGroupPerDatabase = 10;
- /** The least number of SchemaRegionGroup for each StorageGroup. */
+ /** The maximum number of DataRegions expected to be managed by each DataNode. */
+ private double dataRegionPerProcessor = 1.0;
+
+ /** The least number of SchemaRegionGroup for each Database. */
private int leastSchemaRegionGroupNum = 1;
- /** The least number of DataRegionGroup for each StorageGroup. */
+ /** The least number of DataRegionGroup for each Database. */
private int leastDataRegionGroupNum = 5;
- /** region allocate strategy. */
- private RegionBalancer.RegionGroupAllocateStrategy regionGroupAllocateStrategy =
- RegionBalancer.RegionGroupAllocateStrategy.GREEDY;
+ /** RegionGroup allocate policy. */
+ private RegionBalancer.RegionGroupAllocatePolicy regionGroupAllocatePolicy =
+ RegionBalancer.RegionGroupAllocatePolicy.GREEDY;
/**
* DataPartition within the same SeriesPartitionSlot will inherit the allocation result of the
@@ -81,13 +102,6 @@ public class ConfigNodeConfig {
*/
private boolean enableDataPartitionInheritPolicy = false;
- /** Number of SeriesPartitionSlots per StorageGroup */
- private int seriesPartitionSlotNum = 10000;
-
- /** SeriesPartitionSlot executor class */
- private String seriesPartitionExecutorClass =
- "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
-
/** Max concurrent client number */
private int rpcMaxConcurrentClientNum = 65535;
@@ -132,12 +146,6 @@ public class ConfigNodeConfig {
/** Time partition interval in milliseconds */
private long timePartitionInterval = 604_800_000;
- /** Default number of SchemaRegion replicas */
- private int schemaReplicationFactor = 1;
-
- /** Default number of DataRegion replicas */
- private int dataReplicationFactor = 1;
-
/** Procedure Evict ttl */
private int procedureCompletedEvictTTL = 800;
@@ -158,7 +166,7 @@ public class ConfigNodeConfig {
private String leaderDistributionPolicy = ILeaderBalancer.MIN_COST_FLOW_POLICY;
/** Whether to enable auto leader balance for Ratis consensus protocol */
- private boolean enableAutoLeaderBalanceForRatis = false;
+ private boolean enableAutoLeaderBalanceForRatisConsensus = false;
/** Whether to enable auto leader balance for IoTConsensus protocol */
private boolean enableAutoLeaderBalanceForIoTConsensus = true;
@@ -405,6 +413,23 @@ public class ConfigNodeConfig {
this.schemaRegionConsensusProtocolClass = schemaRegionConsensusProtocolClass;
}
+ public DataRegionGroupExtensionPolicy getDataRegionGroupExtensionPolicy() {
+ return dataRegionGroupExtensionPolicy;
+ }
+
+ public void setDataRegionGroupExtensionPolicy(
+ DataRegionGroupExtensionPolicy dataRegionGroupExtensionPolicy) {
+ this.dataRegionGroupExtensionPolicy = dataRegionGroupExtensionPolicy;
+ }
+
+ public int getDataRegionGroupPerDatabase() {
+ return dataRegionGroupPerDatabase;
+ }
+
+ public void setDataRegionGroupPerDatabase(int dataRegionGroupPerDatabase) {
+ this.dataRegionGroupPerDatabase = dataRegionGroupPerDatabase;
+ }
+
public double getSchemaRegionPerDataNode() {
return schemaRegionPerDataNode;
}
@@ -445,13 +470,13 @@ public class ConfigNodeConfig {
this.leastDataRegionGroupNum = leastDataRegionGroupNum;
}
- public RegionBalancer.RegionGroupAllocateStrategy getRegionAllocateStrategy() {
- return regionGroupAllocateStrategy;
+ public RegionBalancer.RegionGroupAllocatePolicy getRegionGroupAllocatePolicy() {
+ return regionGroupAllocatePolicy;
}
public void setRegionAllocateStrategy(
- RegionBalancer.RegionGroupAllocateStrategy regionGroupAllocateStrategy) {
- this.regionGroupAllocateStrategy = regionGroupAllocateStrategy;
+ RegionBalancer.RegionGroupAllocatePolicy regionGroupAllocatePolicy) {
+ this.regionGroupAllocatePolicy = regionGroupAllocatePolicy;
}
public boolean isEnableDataPartitionInheritPolicy() {
@@ -584,12 +609,13 @@ public class ConfigNodeConfig {
this.leaderDistributionPolicy = leaderDistributionPolicy;
}
- public boolean isEnableAutoLeaderBalanceForRatis() {
- return enableAutoLeaderBalanceForRatis;
+ public boolean isEnableAutoLeaderBalanceForRatisConsensus() {
+ return enableAutoLeaderBalanceForRatisConsensus;
}
- public void setEnableAutoLeaderBalanceForRatis(boolean enableAutoLeaderBalanceForRatis) {
- this.enableAutoLeaderBalanceForRatis = enableAutoLeaderBalanceForRatis;
+ public void setEnableAutoLeaderBalanceForRatisConsensus(
+ boolean enableAutoLeaderBalanceForRatisConsensus) {
+ this.enableAutoLeaderBalanceForRatisConsensus = enableAutoLeaderBalanceForRatisConsensus;
}
public boolean isEnableAutoLeaderBalanceForIoTConsensus() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index d635fa41f4..dd0041fabb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
+import org.apache.iotdb.confignode.manager.partition.DataRegionGroupExtensionPolicy;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.slf4j.Logger;
@@ -192,12 +193,11 @@ public class ConfigNodeDescriptor {
conf.getSchemaRegionConsensusProtocolClass())
.trim());
- conf.setSchemaRegionPerDataNode(
- Double.parseDouble(
+ conf.setSchemaReplicationFactor(
+ Integer.parseInt(
properties
.getProperty(
- "schema_region_per_data_node",
- String.valueOf(conf.getSchemaRegionPerDataNode()))
+ "schema_replication_factor", String.valueOf(conf.getSchemaReplicationFactor()))
.trim()));
conf.setDataRegionConsensusProtocolClass(
@@ -206,6 +206,33 @@ public class ConfigNodeDescriptor {
"data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass())
.trim());
+ conf.setDataReplicationFactor(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "data_replication_factor", String.valueOf(conf.getDataReplicationFactor()))
+ .trim()));
+
+ conf.setSchemaRegionPerDataNode(
+ Double.parseDouble(
+ properties
+ .getProperty(
+ "schema_region_per_data_node",
+ String.valueOf(conf.getSchemaReplicationFactor()))
+ .trim()));
+
+ conf.setDataRegionGroupExtensionPolicy(
+ DataRegionGroupExtensionPolicy.parse(
+ properties.getProperty(
+ "data_region_group_extension_policy",
+ conf.getDataRegionGroupExtensionPolicy().getPolicy().trim())));
+
+ conf.setDataRegionGroupPerDatabase(
+ Integer.parseInt(
+ properties.getProperty(
+ "data_region_group_per_database",
+ String.valueOf(conf.getDataRegionGroupPerDatabase()).trim())));
+
conf.setDataRegionPerProcessor(
Double.parseDouble(
properties
@@ -220,9 +247,10 @@ public class ConfigNodeDescriptor {
try {
conf.setRegionAllocateStrategy(
- RegionBalancer.RegionGroupAllocateStrategy.valueOf(
+ RegionBalancer.RegionGroupAllocatePolicy.valueOf(
properties
- .getProperty("region_allocate_strategy", conf.getRegionAllocateStrategy().name())
+ .getProperty(
+ "region_group_allocate_policy", conf.getRegionGroupAllocatePolicy().name())
.trim()));
} catch (IllegalArgumentException e) {
LOGGER.warn(
@@ -281,20 +309,6 @@ public class ConfigNodeDescriptor {
"time_partition_interval", String.valueOf(conf.getTimePartitionInterval()))
.trim()));
- conf.setSchemaReplicationFactor(
- Integer.parseInt(
- properties
- .getProperty(
- "schema_replication_factor", String.valueOf(conf.getSchemaReplicationFactor()))
- .trim()));
-
- conf.setDataReplicationFactor(
- Integer.parseInt(
- properties
- .getProperty(
- "data_replication_factor", String.valueOf(conf.getDataReplicationFactor()))
- .trim()));
-
conf.setHeartbeatIntervalInMs(
Long.parseLong(
properties
@@ -316,12 +330,12 @@ public class ConfigNodeDescriptor {
leaderDistributionPolicy));
}
- conf.setEnableAutoLeaderBalanceForRatis(
+ conf.setEnableAutoLeaderBalanceForRatisConsensus(
Boolean.parseBoolean(
properties
.getProperty(
- "enable_auto_leader_balance_for_ratis",
- String.valueOf(conf.isEnableAutoLeaderBalanceForRatis()))
+ "enable_auto_leader_balance_for_ratis_consensus",
+ String.valueOf(conf.isEnableAutoLeaderBalanceForRatisConsensus()))
.trim()));
conf.setEnableAutoLeaderBalanceForIoTConsensus(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index cf5c5e4c42..55bcd1eaca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -319,6 +319,20 @@ public class ClusterSchemaManager {
int totalCpuCoreNum = getNodeManager().getTotalCpuCoreCount();
int storageGroupNum = storageGroupSchemaMap.size();
+ // Adjust least_data_region_group_num
+ // TODO: The least_data_region_group_num should be maintained separately by different
+ // StorageGroup
+ int leastDataRegionGroupNum =
+ (int)
+ Math.ceil(
+ (double) totalCpuCoreNum
+ / (double) (storageGroupNum * CONF.getDataReplicationFactor()));
+ if (leastDataRegionGroupNum < CONF.getLeastDataRegionGroupNum()) {
+ // The leastDataRegionGroupNum should be the maximum integer that satisfy:
+ // 1 <= leastDataRegionGroupNum <= 5(default)
+ CONF.setLeastDataRegionGroupNum(leastDataRegionGroupNum);
+ }
+
AdjustMaxRegionGroupNumPlan adjustMaxRegionGroupNumPlan = new AdjustMaxRegionGroupNumPlan();
for (TStorageGroupSchema storageGroupSchema : storageGroupSchemaMap.values()) {
try {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index da366ab436..7b95777536 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -50,7 +50,7 @@ public class RegionBalancer {
public RegionBalancer(IManager configManager) {
this.configManager = configManager;
- switch (ConfigNodeDescriptor.getInstance().getConf().getRegionAllocateStrategy()) {
+ switch (ConfigNodeDescriptor.getInstance().getConf().getRegionGroupAllocatePolicy()) {
case COPY_SET:
this.regionGroupAllocator = new CopySetRegionGroupAllocator();
break;
@@ -145,7 +145,7 @@ public class RegionBalancer {
return configManager.getPartitionManager();
}
- public enum RegionGroupAllocateStrategy {
+ public enum RegionGroupAllocatePolicy {
COPY_SET,
GREEDY
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index ed4573d1c2..c1d84de2ce 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -78,12 +78,12 @@ public class RouteBalancer {
CONF.getDataRegionConsensusProtocolClass();
private static final boolean IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION =
- (CONF.isEnableAutoLeaderBalanceForRatis()
+ (CONF.isEnableAutoLeaderBalanceForRatisConsensus()
&& ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS));
private static final boolean IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION =
- (CONF.isEnableAutoLeaderBalanceForRatis()
+ (CONF.isEnableAutoLeaderBalanceForRatisConsensus()
&& ConsensusFactory.RATIS_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.IOT_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/DataRegionGroupExtensionPolicy.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/DataRegionGroupExtensionPolicy.java
new file mode 100644
index 0000000000..f6db6fbcf2
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/DataRegionGroupExtensionPolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.partition;
+
+import java.io.IOException;
+
+public enum DataRegionGroupExtensionPolicy {
+ CUSTOM("CUSTOM"),
+
+ AUTO("AUTO");
+
+ private final String policy;
+
+ DataRegionGroupExtensionPolicy(String policy) {
+ this.policy = policy;
+ }
+
+ public String getPolicy() {
+ return policy;
+ }
+
+ public static DataRegionGroupExtensionPolicy parse(String policy) throws IOException {
+ for (DataRegionGroupExtensionPolicy extensionPolicy : DataRegionGroupExtensionPolicy.values()) {
+ if (extensionPolicy.policy.equals(policy)) {
+ return extensionPolicy;
+ }
+ }
+ throw new IOException(
+ String.format("DataRegionGroupExtensionPolicy %s doesn't exist.", policy));
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index de89381bdc..42ca73b766 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -103,6 +103,9 @@ public class PartitionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+ private static final DataRegionGroupExtensionPolicy DATA_REGION_GROUP_EXTENSION_POLICY =
+ CONF.getDataRegionGroupExtensionPolicy();
+ private static final int DATA_REGION_GROUP_PER_DATABASE = CONF.getDataRegionGroupPerDatabase();
private final IManager configManager;
private final PartitionInfo partitionInfo;
@@ -328,80 +331,24 @@ public class PartitionManager {
private TSStatus extendRegionGroupIfNecessary(
Map<String, Integer> unassignedPartitionSlotsCountMap,
TConsensusGroupType consensusGroupType) {
+
TSStatus result = new TSStatus();
try {
- // Map<StorageGroup, Region allotment>
- Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
-
- for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
- final String storageGroup = entry.getKey();
- final int unassignedPartitionSlotsCount = entry.getValue();
-
- float allocatedRegionGroupCount =
- partitionInfo.getRegionGroupCount(storageGroup, consensusGroupType);
- // The slotCount equals to the sum of assigned slot count and unassigned slot count
- float slotCount =
- (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(storageGroup)
- + unassignedPartitionSlotsCount;
- float maxRegionGroupCount =
- getClusterSchemaManager().getMaxRegionGroupNum(storageGroup, consensusGroupType);
- float maxSlotCount = CONF.getSeriesPartitionSlotNum();
-
- /* RegionGroup extension is required in the following cases */
- // 1. The number of current RegionGroup of the StorageGroup is less than the least number
- int leastRegionGroupNum =
- TConsensusGroupType.SchemaRegion.equals(consensusGroupType)
- ? CONF.getLeastSchemaRegionGroupNum()
- : CONF.getLeastDataRegionGroupNum();
- if (allocatedRegionGroupCount < leastRegionGroupNum) {
- // Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount
- // no less than the leastRegionGroupNum
- int delta =
- (int)
- Math.min(
- unassignedPartitionSlotsCount,
- leastRegionGroupNum - allocatedRegionGroupCount);
- allotmentMap.put(storageGroup, delta);
- continue;
- }
-
- // 2. The average number of partitions held by each Region will be greater than the
- // expected average number after the partition allocation is completed
- if (allocatedRegionGroupCount < maxRegionGroupCount
- && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupCount) {
- // The delta is equal to the smallest integer solution that satisfies the inequality:
- // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupCount
- int delta =
- Math.min(
- (int) (maxRegionGroupCount - allocatedRegionGroupCount),
- Math.max(
- 1,
- (int)
- Math.ceil(
- slotCount * maxRegionGroupCount / maxSlotCount
- - allocatedRegionGroupCount)));
- allotmentMap.put(storageGroup, delta);
- continue;
- }
-
- // 3. All RegionGroups in the specified StorageGroup are disabled currently
- if (allocatedRegionGroupCount
- == filterRegionGroupThroughStatus(storageGroup, RegionGroupStatus.Disabled).size()
- && allocatedRegionGroupCount < maxRegionGroupCount) {
- allotmentMap.put(storageGroup, 1);
- }
- }
-
- if (!allotmentMap.isEmpty()) {
- CreateRegionGroupsPlan createRegionGroupsPlan =
- getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType);
- LOGGER.info("[CreateRegionGroups] Starting to create the following RegionGroups:");
- createRegionGroupsPlan.planLog(LOGGER);
- result =
- getProcedureManager().createRegionGroups(consensusGroupType, createRegionGroupsPlan);
+ if (TConsensusGroupType.SchemaRegion.equals(consensusGroupType)) {
+ // The SchemaRegionGroup always use AUTO policy currently.
+ return autoExtendRegionGroupIfNecessary(
+ unassignedPartitionSlotsCountMap, consensusGroupType);
} else {
- result = RpcUtils.SUCCESS_STATUS;
+ switch (DATA_REGION_GROUP_EXTENSION_POLICY) {
+ case CUSTOM:
+ return customExtendRegionGroupIfNecessary(
+ unassignedPartitionSlotsCountMap, consensusGroupType);
+ case AUTO:
+ default:
+ return autoExtendRegionGroupIfNecessary(
+ unassignedPartitionSlotsCountMap, consensusGroupType);
+ }
}
} catch (NotEnoughDataNodeException e) {
String prompt = "ConfigNode failed to extend Region because there are not enough DataNodes";
@@ -418,6 +365,109 @@ public class PartitionManager {
return result;
}
+ private TSStatus customExtendRegionGroupIfNecessary(
+ Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType)
+ throws StorageGroupNotExistsException, NotEnoughDataNodeException {
+
+ // Map<StorageGroup, Region allotment>
+ Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
+
+ for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
+ final String storageGroup = entry.getKey();
+ float allocatedRegionGroupCount =
+ partitionInfo.getRegionGroupCount(storageGroup, consensusGroupType);
+
+ if (allocatedRegionGroupCount == 0) {
+ // Only for DataRegionGroup currently
+ allotmentMap.put(storageGroup, DATA_REGION_GROUP_PER_DATABASE);
+ }
+ }
+
+ return generateAndAllocateRegionGroups(allotmentMap, consensusGroupType);
+ }
+
+ private TSStatus autoExtendRegionGroupIfNecessary(
+ Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType)
+ throws NotEnoughDataNodeException, StorageGroupNotExistsException {
+
+ // Map<StorageGroup, Region allotment>
+ Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
+
+ for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
+ final String storageGroup = entry.getKey();
+ final int unassignedPartitionSlotsCount = entry.getValue();
+
+ float allocatedRegionGroupCount =
+ partitionInfo.getRegionGroupCount(storageGroup, consensusGroupType);
+ // The slotCount equals to the sum of assigned slot count and unassigned slot count
+ float slotCount =
+ (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(storageGroup)
+ + unassignedPartitionSlotsCount;
+ float maxRegionGroupCount =
+ getClusterSchemaManager().getMaxRegionGroupNum(storageGroup, consensusGroupType);
+ float maxSlotCount = CONF.getSeriesPartitionSlotNum();
+
+ /* RegionGroup extension is required in the following cases */
+ // 1. The number of current RegionGroup of the StorageGroup is less than the least number
+ int leastRegionGroupNum =
+ TConsensusGroupType.SchemaRegion.equals(consensusGroupType)
+ ? CONF.getLeastSchemaRegionGroupNum()
+ : CONF.getLeastDataRegionGroupNum();
+ if (allocatedRegionGroupCount < leastRegionGroupNum) {
+ // Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount
+ // no less than the leastRegionGroupNum
+ int delta =
+ (int)
+ Math.min(
+ unassignedPartitionSlotsCount, leastRegionGroupNum - allocatedRegionGroupCount);
+ allotmentMap.put(storageGroup, delta);
+ continue;
+ }
+
+ // 2. The average number of partitions held by each Region will be greater than the
+ // expected average number after the partition allocation is completed
+ if (allocatedRegionGroupCount < maxRegionGroupCount
+ && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupCount) {
+ // The delta is equal to the smallest integer solution that satisfies the inequality:
+ // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupCount
+ int delta =
+ Math.min(
+ (int) (maxRegionGroupCount - allocatedRegionGroupCount),
+ Math.max(
+ 1,
+ (int)
+ Math.ceil(
+ slotCount * maxRegionGroupCount / maxSlotCount
+ - allocatedRegionGroupCount)));
+ allotmentMap.put(storageGroup, delta);
+ continue;
+ }
+
+ // 3. All RegionGroups in the specified StorageGroup are disabled currently
+ if (allocatedRegionGroupCount
+ == filterRegionGroupThroughStatus(storageGroup, RegionGroupStatus.Disabled).size()
+ && allocatedRegionGroupCount < maxRegionGroupCount) {
+ allotmentMap.put(storageGroup, 1);
+ }
+ }
+
+ return generateAndAllocateRegionGroups(allotmentMap, consensusGroupType);
+ }
+
+ private TSStatus generateAndAllocateRegionGroups(
+ Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
+ throws NotEnoughDataNodeException, StorageGroupNotExistsException {
+ if (!allotmentMap.isEmpty()) {
+ CreateRegionGroupsPlan createRegionGroupsPlan =
+ getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType);
+ LOGGER.info("[CreateRegionGroups] Starting to create the following RegionGroups:");
+ createRegionGroupsPlan.planLog(LOGGER);
+ return getProcedureManager().createRegionGroups(consensusGroupType, createRegionGroupsPlan);
+ } else {
+ return RpcUtils.SUCCESS_STATUS;
+ }
+ }
+
/**
* Only leader use this interface. Checks whether the specified DataPartition has a predecessor
* and returns if it does
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index 425f1c34ab..016d6490f4 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -221,6 +221,13 @@ public class MppConfig implements BaseConfig {
return this;
}
+ @Override
+ public BaseConfig setDataRegionGroupExtensionPolicy(String dataRegionGroupExtensionPolicy) {
+ confignodeProperties.setProperty(
+ "data_region_group_extension_policy", dataRegionGroupExtensionPolicy);
+ return this;
+ }
+
@Override
public BaseConfig setSchemaReplicationFactor(int schemaReplicationFactor) {
confignodeProperties.setProperty(
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index f9bd94a661..5ece90edc5 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -231,6 +231,14 @@ public interface BaseConfig {
return false;
}
+ default BaseConfig setDataRegionGroupExtensionPolicy(String dataRegionGroupExtensionPolicy) {
+ return this;
+ }
+
+ default String getDataRegionGroupExtensionPolicy() {
+ return "AUTO";
+ }
+
default BaseConfig setSchemaReplicationFactor(int schemaReplicationFactor) {
return this;
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBRegionGroupExtensionIT.java
new file mode 100644
index 0000000000..f229f722e2
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBRegionGroupExtensionIT.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.env.BaseConfig;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBRegionGroupExtensionIT {
+
+ private static final BaseConfig CONF = ConfigFactory.getConfig();
+
+ private static String originalDataRegionGroupExtensionPolicy;
+ private static final String testDataRegionGroupExtensionPolicy = "CUSTOM";
+
+ private static String originalSchemaRegionConsensusProtocolClass;
+ private static String originalDataRegionConsensusProtocolClass;
+ private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
+
+ private static int originalSchemaReplicationFactor;
+ private static int originalDataReplicationFactor;
+ private static final int testReplicationFactor = 3;
+
+ private static long originalTimePartitionInterval;
+
+ private static final String sg = "root.sg";
+
+ @Before
+ public void setUp() throws Exception {
+ originalSchemaRegionConsensusProtocolClass = CONF.getSchemaRegionConsensusProtocolClass();
+ originalDataRegionConsensusProtocolClass = CONF.getDataRegionConsensusProtocolClass();
+ CONF.setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
+ CONF.setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
+
+ originalSchemaReplicationFactor = CONF.getSchemaReplicationFactor();
+ originalDataReplicationFactor = CONF.getDataReplicationFactor();
+ CONF.setSchemaReplicationFactor(testReplicationFactor);
+ CONF.setDataReplicationFactor(testReplicationFactor);
+
+ originalTimePartitionInterval = CONF.getTimePartitionInterval();
+
+ originalDataRegionGroupExtensionPolicy = CONF.getDataRegionGroupExtensionPolicy();
+ CONF.setDataRegionGroupExtensionPolicy(testDataRegionGroupExtensionPolicy);
+
+ // Init 1C3D environment
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+
+ CONF.setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
+ CONF.setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+ CONF.setSchemaReplicationFactor(originalSchemaReplicationFactor);
+ CONF.setDataReplicationFactor(originalDataReplicationFactor);
+ CONF.setDataRegionGroupExtensionPolicy(originalDataRegionGroupExtensionPolicy);
+ }
+
+ @Test
+ public void testCustomDataRegionGroupExtensionPolicy()
+ throws IOException, InterruptedException, TException {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ /* Set StorageGroup */
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg));
+ TSStatus status = client.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+ /* Insert a DataPartition to create DataRegionGroups */
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ sg, 0, 10, 0, 10, originalTimePartitionInterval);
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getOrCreateDataPartitionTable(new TDataPartitionReq(partitionSlotsMap));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+
+ /* Check the number of DataRegionGroups */
+ TShowRegionResp showRegionReq = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionReq.getStatus().getCode());
+ AtomicInteger regionCount = new AtomicInteger(0);
+ showRegionReq.getRegionInfoList().forEach(regionInfo -> regionCount.getAndIncrement());
+ Assert.assertEquals(10 * testReplicationFactor, regionCount.get());
+ }
+ }
+}
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 4bcb805431..f1ed03d725 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -56,13 +56,13 @@
# data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
####################
-### Partition (Load balancing) configuration
+### Load balancing configuration
####################
# All parameters in Partition configuration is unmodifiable after ConfigNode starts for the first time.
# And these parameters should be consistent within the ConfigNodeGroup.
-# Number of SeriesPartitionSlots per StorageGroup
-# Datatype: int
+# Number of SeriesPartitionSlots per Database
+# Datatype: Integer
# series_partition_slot_num=10000
# SeriesPartitionSlot executor class
@@ -73,20 +73,35 @@
# 4. SDBMHashExecutor
# Also, if you want to implement your own SeriesPartition executor, you can inherit the SeriesPartitionExecutor class and
# modify this parameter to correspond to your Java class
-# Datatype: string
+# Datatype: String
# series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor
-# The maximum number of SchemaRegion expected to be managed by each DataNode.
-# Notice: Since each StorageGroup requires at least one SchemaRegion to manage its schema,
-# this parameter doesn't limit the number of SchemaRegions when there are too many StorageGroups.
-# Datatype: double
-# schema_region_per_data_node=1.0
-# The maximum number of DataRegion expected to be managed by each processor.
-# Notice: Since each StorageGroup requires at least two DataRegions to manage its data,
-# this parameter doesn't limit the number of DataRegions when there are too many StorageGroups.
-# Datatype: double
-# data_region_per_processor=0.5
+# The maximum number of SchemaRegions expected to be managed by each DataNode.
+# Notice: Since each Database requires at least one SchemaRegionGroup to manage its schema,
+# this parameter doesn't limit the number of SchemaRegions when there are too many Databases.
+# Default is equal to the schema_replication_factor.
+# Datatype: Double
+# schema_region_per_data_node=
+
+# The policy of extension DataRegionGroup for each Database.
+# These policies are currently supported:
+# 1. CUSTOM(Each Database will allocate data_region_group_per_database DataRegionGroups as soon as created)
+# 2. AUTO(Each Database will automatically extend DataRegionGroups based on the data it has)
+# Datatype: String
+# data_region_group_extension_policy=AUTO
+
+# The number of DataRegionGroups for each Database when using CUSTOM data_region_group_extension_policy.
+# Notice: Each Database will allocate data_region_group_per_database DataRegionGroups as soon as created.
+# Datatype: Integer
+# data_region_group_per_database=10
+
+# The maximum number of DataRegions expected to be managed by each processor
+# when using AUTO data_region_group_extension_policy.
+# Notice: Since each Database requires at least two DataRegionGroups to manage its data,
+# this parameter doesn't limit the number of DataRegions when there are too many Databases.
+# Datatype: Double
+# data_region_per_processor=1.0
# The least number of DataRegionGroup for each StorageGroup.
# The ConfigNode-leader will create a DataRegionGroup for each newborn SeriesPartitionSlot
@@ -95,25 +110,27 @@
# Datatype: int
# least_data_region_group_num=5
+
# Whether to enable the DataPartition inherit policy.
# DataPartition within the same SeriesPartitionSlot will inherit
# the allocation result of the previous TimePartitionSlot if set true
# Datatype: Boolean
# enable_data_partition_inherit_policy=false
+
# The policy of cluster RegionGroups' leader distribution.
# E.g. we should balance cluster RegionGroups' leader distribution when some DataNodes are shutdown or re-connected.
# These policies are currently supported:
# 1. GREEDY(Distribute leader through a simple greedy algorithm, might cause unbalance)
# 2. MIN_COST_FLOW(Default, distribute leader through min cost flow algorithm)
-# Datatype: string
+# Datatype: String
# leader_distribution_policy=MIN_COST_FLOW
# Whether to enable auto leader balance for Ratis consensus protocol.
# The ConfigNode-leader will balance the leader of Ratis-RegionGroups by leader_distribution_policy if set true.
# Notice: Default is false because the Ratis is unstable for this function.
# Datatype: Boolean
-# enable_auto_leader_balance_for_ratis=false
+# enable_auto_leader_balance_for_ratis_consensus=false
# Whether to enable auto leader balance for IoTConsensus protocol.
# The ConfigNode-leader will balance the leader of IoTConsensus-RegionGroups by leader_distribution_policy if set true.