You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/11/27 06:55:55 UTC

[iotdb] branch master updated: [IOTDB-5058] Add custom RegionGroup extension policy (#8199)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1966a070f6 [IOTDB-5058] Add custom RegionGroup extension policy (#8199)
1966a070f6 is described below

commit 1966a070f66144dc0df831cc5dd034ef873367be
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sun Nov 27 14:55:50 2022 +0800

    [IOTDB-5058] Add custom RegionGroup extension policy (#8199)
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  88 ++++++----
 .../confignode/conf/ConfigNodeDescriptor.java      |  60 ++++---
 .../confignode/manager/ClusterSchemaManager.java   |  14 ++
 .../manager/load/balancer/RegionBalancer.java      |   4 +-
 .../manager/load/balancer/RouteBalancer.java       |   4 +-
 .../partition/DataRegionGroupExtensionPolicy.java  |  47 +++++
 .../manager/partition/PartitionManager.java        | 190 +++++++++++++--------
 .../java/org/apache/iotdb/it/env/MppConfig.java    |   7 +
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |   8 +
 .../it/partition/IoTDBRegionGroupExtensionIT.java  | 136 +++++++++++++++
 .../resources/conf/iotdb-common.properties         |  49 ++++--
 11 files changed, 463 insertions(+), 144 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 98d37ba2e4..3a235f02dd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
+import org.apache.iotdb.confignode.manager.partition.DataRegionGroupExtensionPolicy;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 
@@ -56,24 +57,44 @@ public class ConfigNodeConfig {
   /** Schema region consensus protocol */
   private String schemaRegionConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
 
-  /** The maximum number of SchemaRegion expected to be managed by each DataNode. */
-  private double schemaRegionPerDataNode = 1.0;
+  /** Default number of SchemaRegion replicas */
+  private int schemaReplicationFactor = 1;
 
   /** Data region consensus protocol */
   private String dataRegionConsensusProtocolClass = ConsensusFactory.IOT_CONSENSUS;
 
-  /** The maximum number of DataRegion expected to be managed by each DataNode. */
-  private double dataRegionPerProcessor = 0.5;
+  /** Default number of DataRegion replicas */
+  private int dataReplicationFactor = 1;
+
+  /** Number of SeriesPartitionSlots per StorageGroup */
+  private int seriesPartitionSlotNum = 10000;
+
+  /** SeriesPartitionSlot executor class */
+  private String seriesPartitionExecutorClass =
+      "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
+
+  /** The maximum number of SchemaRegions expected to be managed by each DataNode. */
+  private double schemaRegionPerDataNode = schemaReplicationFactor;
+
+  /** The policy of extension DataRegionGroup for each Database. */
+  private DataRegionGroupExtensionPolicy dataRegionGroupExtensionPolicy =
+      DataRegionGroupExtensionPolicy.AUTO;
+
+  /** The number of DataRegionGroups for each Database */
+  private int dataRegionGroupPerDatabase = 10;
 
-  /** The least number of SchemaRegionGroup for each StorageGroup. */
+  /** The maximum number of DataRegions expected to be managed by each DataNode. */
+  private double dataRegionPerProcessor = 1.0;
+
+  /** The least number of SchemaRegionGroup for each Database. */
   private int leastSchemaRegionGroupNum = 1;
 
-  /** The least number of DataRegionGroup for each StorageGroup. */
+  /** The least number of DataRegionGroup for each Database. */
   private int leastDataRegionGroupNum = 5;
 
-  /** region allocate strategy. */
-  private RegionBalancer.RegionGroupAllocateStrategy regionGroupAllocateStrategy =
-      RegionBalancer.RegionGroupAllocateStrategy.GREEDY;
+  /** RegionGroup allocate policy. */
+  private RegionBalancer.RegionGroupAllocatePolicy regionGroupAllocatePolicy =
+      RegionBalancer.RegionGroupAllocatePolicy.GREEDY;
 
   /**
    * DataPartition within the same SeriesPartitionSlot will inherit the allocation result of the
@@ -81,13 +102,6 @@ public class ConfigNodeConfig {
    */
   private boolean enableDataPartitionInheritPolicy = false;
 
-  /** Number of SeriesPartitionSlots per StorageGroup */
-  private int seriesPartitionSlotNum = 10000;
-
-  /** SeriesPartitionSlot executor class */
-  private String seriesPartitionExecutorClass =
-      "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
-
   /** Max concurrent client number */
   private int rpcMaxConcurrentClientNum = 65535;
 
@@ -132,12 +146,6 @@ public class ConfigNodeConfig {
   /** Time partition interval in milliseconds */
   private long timePartitionInterval = 604_800_000;
 
-  /** Default number of SchemaRegion replicas */
-  private int schemaReplicationFactor = 1;
-
-  /** Default number of DataRegion replicas */
-  private int dataReplicationFactor = 1;
-
   /** Procedure Evict ttl */
   private int procedureCompletedEvictTTL = 800;
 
@@ -158,7 +166,7 @@ public class ConfigNodeConfig {
   private String leaderDistributionPolicy = ILeaderBalancer.MIN_COST_FLOW_POLICY;
 
   /** Whether to enable auto leader balance for Ratis consensus protocol */
-  private boolean enableAutoLeaderBalanceForRatis = false;
+  private boolean enableAutoLeaderBalanceForRatisConsensus = false;
 
   /** Whether to enable auto leader balance for IoTConsensus protocol */
   private boolean enableAutoLeaderBalanceForIoTConsensus = true;
@@ -405,6 +413,23 @@ public class ConfigNodeConfig {
     this.schemaRegionConsensusProtocolClass = schemaRegionConsensusProtocolClass;
   }
 
+  public DataRegionGroupExtensionPolicy getDataRegionGroupExtensionPolicy() {
+    return dataRegionGroupExtensionPolicy;
+  }
+
+  public void setDataRegionGroupExtensionPolicy(
+      DataRegionGroupExtensionPolicy dataRegionGroupExtensionPolicy) {
+    this.dataRegionGroupExtensionPolicy = dataRegionGroupExtensionPolicy;
+  }
+
+  public int getDataRegionGroupPerDatabase() {
+    return dataRegionGroupPerDatabase;
+  }
+
+  public void setDataRegionGroupPerDatabase(int dataRegionGroupPerDatabase) {
+    this.dataRegionGroupPerDatabase = dataRegionGroupPerDatabase;
+  }
+
   public double getSchemaRegionPerDataNode() {
     return schemaRegionPerDataNode;
   }
@@ -445,13 +470,13 @@ public class ConfigNodeConfig {
     this.leastDataRegionGroupNum = leastDataRegionGroupNum;
   }
 
-  public RegionBalancer.RegionGroupAllocateStrategy getRegionAllocateStrategy() {
-    return regionGroupAllocateStrategy;
+  public RegionBalancer.RegionGroupAllocatePolicy getRegionGroupAllocatePolicy() {
+    return regionGroupAllocatePolicy;
   }
 
   public void setRegionAllocateStrategy(
-      RegionBalancer.RegionGroupAllocateStrategy regionGroupAllocateStrategy) {
-    this.regionGroupAllocateStrategy = regionGroupAllocateStrategy;
+      RegionBalancer.RegionGroupAllocatePolicy regionGroupAllocatePolicy) {
+    this.regionGroupAllocatePolicy = regionGroupAllocatePolicy;
   }
 
   public boolean isEnableDataPartitionInheritPolicy() {
@@ -584,12 +609,13 @@ public class ConfigNodeConfig {
     this.leaderDistributionPolicy = leaderDistributionPolicy;
   }
 
-  public boolean isEnableAutoLeaderBalanceForRatis() {
-    return enableAutoLeaderBalanceForRatis;
+  public boolean isEnableAutoLeaderBalanceForRatisConsensus() {
+    return enableAutoLeaderBalanceForRatisConsensus;
   }
 
-  public void setEnableAutoLeaderBalanceForRatis(boolean enableAutoLeaderBalanceForRatis) {
-    this.enableAutoLeaderBalanceForRatis = enableAutoLeaderBalanceForRatis;
+  public void setEnableAutoLeaderBalanceForRatisConsensus(
+      boolean enableAutoLeaderBalanceForRatisConsensus) {
+    this.enableAutoLeaderBalanceForRatisConsensus = enableAutoLeaderBalanceForRatisConsensus;
   }
 
   public boolean isEnableAutoLeaderBalanceForIoTConsensus() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index d635fa41f4..dd0041fabb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
+import org.apache.iotdb.confignode.manager.partition.DataRegionGroupExtensionPolicy;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 
 import org.slf4j.Logger;
@@ -192,12 +193,11 @@ public class ConfigNodeDescriptor {
                 conf.getSchemaRegionConsensusProtocolClass())
             .trim());
 
-    conf.setSchemaRegionPerDataNode(
-        Double.parseDouble(
+    conf.setSchemaReplicationFactor(
+        Integer.parseInt(
             properties
                 .getProperty(
-                    "schema_region_per_data_node",
-                    String.valueOf(conf.getSchemaRegionPerDataNode()))
+                    "schema_replication_factor", String.valueOf(conf.getSchemaReplicationFactor()))
                 .trim()));
 
     conf.setDataRegionConsensusProtocolClass(
@@ -206,6 +206,33 @@ public class ConfigNodeDescriptor {
                 "data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass())
             .trim());
 
+    conf.setDataReplicationFactor(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "data_replication_factor", String.valueOf(conf.getDataReplicationFactor()))
+                .trim()));
+
+    conf.setSchemaRegionPerDataNode(
+        Double.parseDouble(
+            properties
+                .getProperty(
+                    "schema_region_per_data_node",
+                    String.valueOf(conf.getSchemaReplicationFactor()))
+                .trim()));
+
+    conf.setDataRegionGroupExtensionPolicy(
+        DataRegionGroupExtensionPolicy.parse(
+            properties.getProperty(
+                "data_region_group_extension_policy",
+                conf.getDataRegionGroupExtensionPolicy().getPolicy().trim())));
+
+    conf.setDataRegionGroupPerDatabase(
+        Integer.parseInt(
+            properties.getProperty(
+                "data_region_group_per_database",
+                String.valueOf(conf.getDataRegionGroupPerDatabase()).trim())));
+
     conf.setDataRegionPerProcessor(
         Double.parseDouble(
             properties
@@ -220,9 +247,10 @@ public class ConfigNodeDescriptor {
 
     try {
       conf.setRegionAllocateStrategy(
-          RegionBalancer.RegionGroupAllocateStrategy.valueOf(
+          RegionBalancer.RegionGroupAllocatePolicy.valueOf(
               properties
-                  .getProperty("region_allocate_strategy", conf.getRegionAllocateStrategy().name())
+                  .getProperty(
+                      "region_group_allocate_policy", conf.getRegionGroupAllocatePolicy().name())
                   .trim()));
     } catch (IllegalArgumentException e) {
       LOGGER.warn(
@@ -281,20 +309,6 @@ public class ConfigNodeDescriptor {
                     "time_partition_interval", String.valueOf(conf.getTimePartitionInterval()))
                 .trim()));
 
-    conf.setSchemaReplicationFactor(
-        Integer.parseInt(
-            properties
-                .getProperty(
-                    "schema_replication_factor", String.valueOf(conf.getSchemaReplicationFactor()))
-                .trim()));
-
-    conf.setDataReplicationFactor(
-        Integer.parseInt(
-            properties
-                .getProperty(
-                    "data_replication_factor", String.valueOf(conf.getDataReplicationFactor()))
-                .trim()));
-
     conf.setHeartbeatIntervalInMs(
         Long.parseLong(
             properties
@@ -316,12 +330,12 @@ public class ConfigNodeDescriptor {
               leaderDistributionPolicy));
     }
 
-    conf.setEnableAutoLeaderBalanceForRatis(
+    conf.setEnableAutoLeaderBalanceForRatisConsensus(
         Boolean.parseBoolean(
             properties
                 .getProperty(
-                    "enable_auto_leader_balance_for_ratis",
-                    String.valueOf(conf.isEnableAutoLeaderBalanceForRatis()))
+                    "enable_auto_leader_balance_for_ratis_consensus",
+                    String.valueOf(conf.isEnableAutoLeaderBalanceForRatisConsensus()))
                 .trim()));
 
     conf.setEnableAutoLeaderBalanceForIoTConsensus(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index cf5c5e4c42..55bcd1eaca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -319,6 +319,20 @@ public class ClusterSchemaManager {
     int totalCpuCoreNum = getNodeManager().getTotalCpuCoreCount();
     int storageGroupNum = storageGroupSchemaMap.size();
 
+    // Adjust least_data_region_group_num
+    // TODO: The least_data_region_group_num should be maintained separately by different
+    // StorageGroup
+    int leastDataRegionGroupNum =
+        (int)
+            Math.ceil(
+                (double) totalCpuCoreNum
+                    / (double) (storageGroupNum * CONF.getDataReplicationFactor()));
+    if (leastDataRegionGroupNum < CONF.getLeastDataRegionGroupNum()) {
+      // The leastDataRegionGroupNum should be the maximum integer that satisfy:
+      // 1 <= leastDataRegionGroupNum <= 5(default)
+      CONF.setLeastDataRegionGroupNum(leastDataRegionGroupNum);
+    }
+
     AdjustMaxRegionGroupNumPlan adjustMaxRegionGroupNumPlan = new AdjustMaxRegionGroupNumPlan();
     for (TStorageGroupSchema storageGroupSchema : storageGroupSchemaMap.values()) {
       try {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index da366ab436..7b95777536 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -50,7 +50,7 @@ public class RegionBalancer {
   public RegionBalancer(IManager configManager) {
     this.configManager = configManager;
 
-    switch (ConfigNodeDescriptor.getInstance().getConf().getRegionAllocateStrategy()) {
+    switch (ConfigNodeDescriptor.getInstance().getConf().getRegionGroupAllocatePolicy()) {
       case COPY_SET:
         this.regionGroupAllocator = new CopySetRegionGroupAllocator();
         break;
@@ -145,7 +145,7 @@ public class RegionBalancer {
     return configManager.getPartitionManager();
   }
 
-  public enum RegionGroupAllocateStrategy {
+  public enum RegionGroupAllocatePolicy {
     COPY_SET,
     GREEDY
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index ed4573d1c2..c1d84de2ce 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -78,12 +78,12 @@ public class RouteBalancer {
       CONF.getDataRegionConsensusProtocolClass();
 
   private static final boolean IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION =
-      (CONF.isEnableAutoLeaderBalanceForRatis()
+      (CONF.isEnableAutoLeaderBalanceForRatisConsensus()
               && ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
           || (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
               && ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS));
   private static final boolean IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION =
-      (CONF.isEnableAutoLeaderBalanceForRatis()
+      (CONF.isEnableAutoLeaderBalanceForRatisConsensus()
               && ConsensusFactory.RATIS_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS))
           || (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
               && ConsensusFactory.IOT_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/DataRegionGroupExtensionPolicy.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/DataRegionGroupExtensionPolicy.java
new file mode 100644
index 0000000000..f6db6fbcf2
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/DataRegionGroupExtensionPolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.partition;
+
+import java.io.IOException;
+
+public enum DataRegionGroupExtensionPolicy {
+  CUSTOM("CUSTOM"),
+
+  AUTO("AUTO");
+
+  private final String policy;
+
+  DataRegionGroupExtensionPolicy(String policy) {
+    this.policy = policy;
+  }
+
+  public String getPolicy() {
+    return policy;
+  }
+
+  public static DataRegionGroupExtensionPolicy parse(String policy) throws IOException {
+    for (DataRegionGroupExtensionPolicy extensionPolicy : DataRegionGroupExtensionPolicy.values()) {
+      if (extensionPolicy.policy.equals(policy)) {
+        return extensionPolicy;
+      }
+    }
+    throw new IOException(
+        String.format("DataRegionGroupExtensionPolicy %s doesn't exist.", policy));
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index de89381bdc..42ca73b766 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -103,6 +103,9 @@ public class PartitionManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class);
 
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+  private static final DataRegionGroupExtensionPolicy DATA_REGION_GROUP_EXTENSION_POLICY =
+      CONF.getDataRegionGroupExtensionPolicy();
+  private static final int DATA_REGION_GROUP_PER_DATABASE = CONF.getDataRegionGroupPerDatabase();
 
   private final IManager configManager;
   private final PartitionInfo partitionInfo;
@@ -328,80 +331,24 @@ public class PartitionManager {
   private TSStatus extendRegionGroupIfNecessary(
       Map<String, Integer> unassignedPartitionSlotsCountMap,
       TConsensusGroupType consensusGroupType) {
+
     TSStatus result = new TSStatus();
 
     try {
-      // Map<StorageGroup, Region allotment>
-      Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
-
-      for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
-        final String storageGroup = entry.getKey();
-        final int unassignedPartitionSlotsCount = entry.getValue();
-
-        float allocatedRegionGroupCount =
-            partitionInfo.getRegionGroupCount(storageGroup, consensusGroupType);
-        // The slotCount equals to the sum of assigned slot count and unassigned slot count
-        float slotCount =
-            (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(storageGroup)
-                + unassignedPartitionSlotsCount;
-        float maxRegionGroupCount =
-            getClusterSchemaManager().getMaxRegionGroupNum(storageGroup, consensusGroupType);
-        float maxSlotCount = CONF.getSeriesPartitionSlotNum();
-
-        /* RegionGroup extension is required in the following cases */
-        // 1. The number of current RegionGroup of the StorageGroup is less than the least number
-        int leastRegionGroupNum =
-            TConsensusGroupType.SchemaRegion.equals(consensusGroupType)
-                ? CONF.getLeastSchemaRegionGroupNum()
-                : CONF.getLeastDataRegionGroupNum();
-        if (allocatedRegionGroupCount < leastRegionGroupNum) {
-          // Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount
-          // no less than the leastRegionGroupNum
-          int delta =
-              (int)
-                  Math.min(
-                      unassignedPartitionSlotsCount,
-                      leastRegionGroupNum - allocatedRegionGroupCount);
-          allotmentMap.put(storageGroup, delta);
-          continue;
-        }
-
-        // 2. The average number of partitions held by each Region will be greater than the
-        // expected average number after the partition allocation is completed
-        if (allocatedRegionGroupCount < maxRegionGroupCount
-            && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupCount) {
-          // The delta is equal to the smallest integer solution that satisfies the inequality:
-          // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupCount
-          int delta =
-              Math.min(
-                  (int) (maxRegionGroupCount - allocatedRegionGroupCount),
-                  Math.max(
-                      1,
-                      (int)
-                          Math.ceil(
-                              slotCount * maxRegionGroupCount / maxSlotCount
-                                  - allocatedRegionGroupCount)));
-          allotmentMap.put(storageGroup, delta);
-          continue;
-        }
-
-        // 3. All RegionGroups in the specified StorageGroup are disabled currently
-        if (allocatedRegionGroupCount
-                == filterRegionGroupThroughStatus(storageGroup, RegionGroupStatus.Disabled).size()
-            && allocatedRegionGroupCount < maxRegionGroupCount) {
-          allotmentMap.put(storageGroup, 1);
-        }
-      }
-
-      if (!allotmentMap.isEmpty()) {
-        CreateRegionGroupsPlan createRegionGroupsPlan =
-            getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType);
-        LOGGER.info("[CreateRegionGroups] Starting to create the following RegionGroups:");
-        createRegionGroupsPlan.planLog(LOGGER);
-        result =
-            getProcedureManager().createRegionGroups(consensusGroupType, createRegionGroupsPlan);
+      if (TConsensusGroupType.SchemaRegion.equals(consensusGroupType)) {
+        // The SchemaRegionGroup always use AUTO policy currently.
+        return autoExtendRegionGroupIfNecessary(
+            unassignedPartitionSlotsCountMap, consensusGroupType);
       } else {
-        result = RpcUtils.SUCCESS_STATUS;
+        switch (DATA_REGION_GROUP_EXTENSION_POLICY) {
+          case CUSTOM:
+            return customExtendRegionGroupIfNecessary(
+                unassignedPartitionSlotsCountMap, consensusGroupType);
+          case AUTO:
+          default:
+            return autoExtendRegionGroupIfNecessary(
+                unassignedPartitionSlotsCountMap, consensusGroupType);
+        }
       }
     } catch (NotEnoughDataNodeException e) {
       String prompt = "ConfigNode failed to extend Region because there are not enough DataNodes";
@@ -418,6 +365,109 @@ public class PartitionManager {
     return result;
   }
 
+  private TSStatus customExtendRegionGroupIfNecessary(
+      Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType)
+      throws StorageGroupNotExistsException, NotEnoughDataNodeException {
+
+    // Map<StorageGroup, Region allotment>
+    Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
+
+    for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
+      final String storageGroup = entry.getKey();
+      float allocatedRegionGroupCount =
+          partitionInfo.getRegionGroupCount(storageGroup, consensusGroupType);
+
+      if (allocatedRegionGroupCount == 0) {
+        // Only for DataRegionGroup currently
+        allotmentMap.put(storageGroup, DATA_REGION_GROUP_PER_DATABASE);
+      }
+    }
+
+    return generateAndAllocateRegionGroups(allotmentMap, consensusGroupType);
+  }
+
+  private TSStatus autoExtendRegionGroupIfNecessary(
+      Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType)
+      throws NotEnoughDataNodeException, StorageGroupNotExistsException {
+
+    // Map<StorageGroup, Region allotment>
+    Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
+
+    for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
+      final String storageGroup = entry.getKey();
+      final int unassignedPartitionSlotsCount = entry.getValue();
+
+      float allocatedRegionGroupCount =
+          partitionInfo.getRegionGroupCount(storageGroup, consensusGroupType);
+      // The slotCount equals to the sum of assigned slot count and unassigned slot count
+      float slotCount =
+          (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(storageGroup)
+              + unassignedPartitionSlotsCount;
+      float maxRegionGroupCount =
+          getClusterSchemaManager().getMaxRegionGroupNum(storageGroup, consensusGroupType);
+      float maxSlotCount = CONF.getSeriesPartitionSlotNum();
+
+      /* RegionGroup extension is required in the following cases */
+      // 1. The number of current RegionGroup of the StorageGroup is less than the least number
+      int leastRegionGroupNum =
+          TConsensusGroupType.SchemaRegion.equals(consensusGroupType)
+              ? CONF.getLeastSchemaRegionGroupNum()
+              : CONF.getLeastDataRegionGroupNum();
+      if (allocatedRegionGroupCount < leastRegionGroupNum) {
+        // Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount
+        // no less than the leastRegionGroupNum
+        int delta =
+            (int)
+                Math.min(
+                    unassignedPartitionSlotsCount, leastRegionGroupNum - allocatedRegionGroupCount);
+        allotmentMap.put(storageGroup, delta);
+        continue;
+      }
+
+      // 2. The average number of partitions held by each Region will be greater than the
+      // expected average number after the partition allocation is completed
+      if (allocatedRegionGroupCount < maxRegionGroupCount
+          && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupCount) {
+        // The delta is equal to the smallest integer solution that satisfies the inequality:
+        // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupCount
+        int delta =
+            Math.min(
+                (int) (maxRegionGroupCount - allocatedRegionGroupCount),
+                Math.max(
+                    1,
+                    (int)
+                        Math.ceil(
+                            slotCount * maxRegionGroupCount / maxSlotCount
+                                - allocatedRegionGroupCount)));
+        allotmentMap.put(storageGroup, delta);
+        continue;
+      }
+
+      // 3. All RegionGroups in the specified StorageGroup are disabled currently
+      if (allocatedRegionGroupCount
+              == filterRegionGroupThroughStatus(storageGroup, RegionGroupStatus.Disabled).size()
+          && allocatedRegionGroupCount < maxRegionGroupCount) {
+        allotmentMap.put(storageGroup, 1);
+      }
+    }
+
+    return generateAndAllocateRegionGroups(allotmentMap, consensusGroupType);
+  }
+
+  private TSStatus generateAndAllocateRegionGroups(
+      Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
+      throws NotEnoughDataNodeException, StorageGroupNotExistsException {
+    if (!allotmentMap.isEmpty()) {
+      CreateRegionGroupsPlan createRegionGroupsPlan =
+          getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType);
+      LOGGER.info("[CreateRegionGroups] Starting to create the following RegionGroups:");
+      createRegionGroupsPlan.planLog(LOGGER);
+      return getProcedureManager().createRegionGroups(consensusGroupType, createRegionGroupsPlan);
+    } else {
+      return RpcUtils.SUCCESS_STATUS;
+    }
+  }
+
   /**
    * Only leader use this interface. Checks whether the specified DataPartition has a predecessor
    * and returns if it does
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index 425f1c34ab..016d6490f4 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -221,6 +221,13 @@ public class MppConfig implements BaseConfig {
     return this;
   }
 
+  @Override
+  public BaseConfig setDataRegionGroupExtensionPolicy(String dataRegionGroupExtensionPolicy) {
+    confignodeProperties.setProperty(
+        "data_region_group_extension_policy", dataRegionGroupExtensionPolicy);
+    return this;
+  }
+
   @Override
   public BaseConfig setSchemaReplicationFactor(int schemaReplicationFactor) {
     confignodeProperties.setProperty(
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index f9bd94a661..5ece90edc5 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -231,6 +231,14 @@ public interface BaseConfig {
     return false;
   }
 
+  default BaseConfig setDataRegionGroupExtensionPolicy(String dataRegionGroupExtensionPolicy) {
+    return this;
+  }
+
+  default String getDataRegionGroupExtensionPolicy() {
+    return "AUTO";
+  }
+
   default BaseConfig setSchemaReplicationFactor(int schemaReplicationFactor) {
     return this;
   }
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBRegionGroupExtensionIT.java
new file mode 100644
index 0000000000..f229f722e2
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBRegionGroupExtensionIT.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.env.BaseConfig;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBRegionGroupExtensionIT {
+
+  private static final BaseConfig CONF = ConfigFactory.getConfig();
+
+  private static String originalDataRegionGroupExtensionPolicy;
+  private static final String testDataRegionGroupExtensionPolicy = "CUSTOM";
+
+  private static String originalSchemaRegionConsensusProtocolClass;
+  private static String originalDataRegionConsensusProtocolClass;
+  private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
+
+  private static int originalSchemaReplicationFactor;
+  private static int originalDataReplicationFactor;
+  private static final int testReplicationFactor = 3;
+
+  private static long originalTimePartitionInterval;
+
+  private static final String sg = "root.sg";
+
+  @Before
+  public void setUp() throws Exception {
+    originalSchemaRegionConsensusProtocolClass = CONF.getSchemaRegionConsensusProtocolClass();
+    originalDataRegionConsensusProtocolClass = CONF.getDataRegionConsensusProtocolClass();
+    CONF.setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
+    CONF.setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
+
+    originalSchemaReplicationFactor = CONF.getSchemaReplicationFactor();
+    originalDataReplicationFactor = CONF.getDataReplicationFactor();
+    CONF.setSchemaReplicationFactor(testReplicationFactor);
+    CONF.setDataReplicationFactor(testReplicationFactor);
+
+    originalTimePartitionInterval = CONF.getTimePartitionInterval();
+
+    originalDataRegionGroupExtensionPolicy = CONF.getDataRegionGroupExtensionPolicy();
+    CONF.setDataRegionGroupExtensionPolicy(testDataRegionGroupExtensionPolicy);
+
+    // Init 1C3D environment
+    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+  }
+
+  @After
+  public void tearDown() {
+    EnvFactory.getEnv().cleanAfterClass();
+
+    CONF.setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
+    CONF.setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+    CONF.setSchemaReplicationFactor(originalSchemaReplicationFactor);
+    CONF.setDataReplicationFactor(originalDataReplicationFactor);
+    CONF.setDataRegionGroupExtensionPolicy(originalDataRegionGroupExtensionPolicy);
+  }
+
+  @Test
+  public void testCustomDataRegionGroupExtensionPolicy()
+      throws IOException, InterruptedException, TException {
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+      /* Set StorageGroup */
+      TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg));
+      TSStatus status = client.setStorageGroup(setReq);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+      /* Insert a DataPartition to create DataRegionGroups */
+      Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+          ConfigNodeTestUtils.constructPartitionSlotsMap(
+              sg, 0, 10, 0, 10, originalTimePartitionInterval);
+      TDataPartitionTableResp dataPartitionTableResp =
+          client.getOrCreateDataPartitionTable(new TDataPartitionReq(partitionSlotsMap));
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          dataPartitionTableResp.getStatus().getCode());
+
+      /* Check the number of DataRegionGroups */
+      TShowRegionResp showRegionReq = client.showRegion(new TShowRegionReq());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionReq.getStatus().getCode());
+      AtomicInteger regionCount = new AtomicInteger(0);
+      showRegionReq.getRegionInfoList().forEach(regionInfo -> regionCount.getAndIncrement());
+      Assert.assertEquals(10 * testReplicationFactor, regionCount.get());
+    }
+  }
+}
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 4bcb805431..f1ed03d725 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -56,13 +56,13 @@
 # data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
 
 ####################
-### Partition (Load balancing) configuration
+### Load balancing configuration
 ####################
 
 # All parameters in Partition configuration is unmodifiable after ConfigNode starts for the first time.
 # And these parameters should be consistent within the ConfigNodeGroup.
-# Number of SeriesPartitionSlots per StorageGroup
-# Datatype: int
+# Number of SeriesPartitionSlots per Database
+# Datatype: Integer
 # series_partition_slot_num=10000
 
 # SeriesPartitionSlot executor class
@@ -73,20 +73,35 @@
 # 4. SDBMHashExecutor
 # Also, if you want to implement your own SeriesPartition executor, you can inherit the SeriesPartitionExecutor class and
 # modify this parameter to correspond to your Java class
-# Datatype: string
+# Datatype: String
 # series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor
 
-# The maximum number of SchemaRegion expected to be managed by each DataNode.
-# Notice: Since each StorageGroup requires at least one SchemaRegion to manage its schema,
-# this parameter doesn't limit the number of SchemaRegions when there are too many StorageGroups.
-# Datatype: double
-# schema_region_per_data_node=1.0
 
-# The maximum number of DataRegion expected to be managed by each processor.
-# Notice: Since each StorageGroup requires at least two DataRegions to manage its data,
-# this parameter doesn't limit the number of DataRegions when there are too many StorageGroups.
-# Datatype: double
-# data_region_per_processor=0.5
+# The maximum number of SchemaRegions expected to be managed by each DataNode.
+# Notice: Since each Database requires at least one SchemaRegionGroup to manage its schema,
+# this parameter doesn't limit the number of SchemaRegions when there are too many Databases.
+# Default is equal to the schema_replication_factor.
+# Datatype: Double
+# schema_region_per_data_node=
+
+# The policy of extension DataRegionGroup for each Database.
+# These policies are currently supported:
+# 1. CUSTOM(Each Database will allocate data_region_group_per_database DataRegionGroups as soon as created)
+# 2. AUTO(Each Database will automatically extend DataRegionGroups based on the data it has)
+# Datatype: String
+# data_region_group_extension_policy=AUTO
+
+# The number of DataRegionGroups for each Database when using CUSTOM data_region_group_extension_policy.
+# Notice: Each Database will allocate data_region_group_per_database DataRegionGroups as soon as created.
+# Datatype: Integer
+# data_region_group_per_database=10
+
+# The maximum number of DataRegions expected to be managed by each processor
+# when using AUTO data_region_group_extension_policy.
+# Notice: Since each Database requires at least two DataRegionGroups to manage its data,
+# this parameter doesn't limit the number of DataRegions when there are too many Databases.
+# Datatype: Double
+# data_region_per_processor=1.0
 
 # The least number of DataRegionGroup for each StorageGroup.
 # The ConfigNode-leader will create a DataRegionGroup for each newborn SeriesPartitionSlot
@@ -95,25 +110,27 @@
 # Datatype: int
 # least_data_region_group_num=5
 
+
 # Whether to enable the DataPartition inherit policy.
 # DataPartition within the same SeriesPartitionSlot will inherit
 # the allocation result of the previous TimePartitionSlot if set true
 # Datatype: Boolean
 # enable_data_partition_inherit_policy=false
 
+
 # The policy of cluster RegionGroups' leader distribution.
 # E.g. we should balance cluster RegionGroups' leader distribution when some DataNodes are shutdown or re-connected.
 # These policies are currently supported:
 # 1. GREEDY(Distribute leader through a simple greedy algorithm, might cause unbalance)
 # 2. MIN_COST_FLOW(Default, distribute leader through min cost flow algorithm)
-# Datatype: string
+# Datatype: String
 # leader_distribution_policy=MIN_COST_FLOW
 
 # Whether to enable auto leader balance for Ratis consensus protocol.
 # The ConfigNode-leader will balance the leader of Ratis-RegionGroups by leader_distribution_policy if set true.
 # Notice: Default is false because the Ratis is unstable for this function.
 # Datatype: Boolean
-# enable_auto_leader_balance_for_ratis=false
+# enable_auto_leader_balance_for_ratis_consensus=false
 
 # Whether to enable auto leader balance for IoTConsensus protocol.
 # The ConfigNode-leader will balance the leader of IoTConsensus-RegionGroups by leader_distribution_policy if set true.