You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/08/18 12:29:03 UTC

[iotdb] branch master updated: [IOTDB-4092] Protecting Region creation process by adding CreateRegionGroupsProcedure (#7006)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 544c60ff87 [IOTDB-4092] Protecting Region creation process by adding CreateRegionGroupsProcedure (#7006)
544c60ff87 is described below

commit 544c60ff877ecc0c7e7b2924814572adfacdc228
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Thu Aug 18 20:28:58 2022 +0800

    [IOTDB-4092] Protecting Region creation process by adding CreateRegionGroupsProcedure (#7006)
---
 .../async/datanode/AsyncDataNodeClientPool.java    |  68 +++++++----
 .../sync/datanode/SyncDataNodeClientPool.java      |  13 +-
 .../consensus/request/ConfigPhysicalPlan.java      |   6 +-
 .../consensus/request/ConfigPhysicalPlanType.java  |   2 +-
 .../request/write/CreateRegionGroupsPlan.java      |  11 +-
 ...egionsPlan.java => DeleteRegionGroupsPlan.java} |  48 ++++----
 .../iotdb/confignode/manager/ConfigManager.java    |  17 +--
 .../iotdb/confignode/manager/ConsensusManager.java |  26 ++++
 .../iotdb/confignode/manager/PartitionManager.java |  67 ++++++----
 .../iotdb/confignode/manager/ProcedureManager.java |  57 +++++++--
 .../iotdb/confignode/manager/load/LoadManager.java |  28 ++---
 .../persistence/executor/ConfigPlanExecutor.java   |  63 +++++-----
 .../persistence/partition/PartitionInfo.java       |  47 +++++--
 .../partition/StorageGroupPartitionTable.java      |   9 ++
 .../procedure/env/ConfigNodeProcedureEnv.java      |  50 ++++++++
 .../impl/CreateRegionGroupsProcedure.java          | 135 +++++++++++++++++++++
 .../procedure/state/CreateRegionGroupsState.java   |  27 +++++
 .../procedure/store/ProcedureFactory.java          |   6 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  41 +++++--
 19 files changed, 538 insertions(+), 183 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index 08bca91b6f..cfc0df60fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.confignode.client.async.datanode;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -50,6 +51,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -231,18 +233,18 @@ public class AsyncDataNodeClientPool {
   }
 
   /**
-   * Execute CreateRegionsReq asynchronously
+   * Execute CreateRegionGroupsPlan asynchronously
    *
-   * @param createRegionGroupsPlan CreateRegionsReq
    * @param ttlMap Map<StorageGroupName, TTL>
+   * @return Those RegionGroups that failed to create
    */
-  public void createRegions(
+  public Map<TConsensusGroupId, TRegionReplicaSet> createRegionGroups(
       CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
     // Because different requests will be sent to the same node when createRegions,
     // so for CreateRegions use Map<index, TDataNodeLocation>
     Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
     int index = 0;
-    // Count the datanodes to be sent
+    // Count the DataNodes to be sent
     for (List<TRegionReplicaSet> regionReplicaSets :
         createRegionGroupsPlan.getRegionGroupMap().values()) {
       for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
@@ -252,7 +254,7 @@ public class AsyncDataNodeClientPool {
       }
     }
     if (dataNodeLocationMap.isEmpty()) {
-      return;
+      return new HashMap<>();
     }
     for (int retry = 0; retry < retryNum; retry++) {
       index = 0;
@@ -298,7 +300,7 @@ public class AsyncDataNodeClientPool {
                       retry);
                   break;
                 default:
-                  return;
+                  break;
               }
             } else {
               index++;
@@ -317,6 +319,43 @@ public class AsyncDataNodeClientPool {
         break;
       }
     }
+
+    // Filter RegionGroups that weren't created successfully
+    index = 0;
+    Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
+    for (List<TRegionReplicaSet> regionReplicaSets :
+        createRegionGroupsPlan.getRegionGroupMap().values()) {
+      for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+        for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+          if (dataNodeLocationMap.containsKey(index)) {
+            failedRegions
+                .computeIfAbsent(
+                    regionReplicaSet.getRegionId(),
+                    empty -> new TRegionReplicaSet().setRegionId(regionReplicaSet.getRegionId()))
+                .addToDataNodeLocations(dataNodeLocation);
+          }
+          index += 1;
+        }
+      }
+    }
+    return failedRegions;
+  }
+
+  private TCreateSchemaRegionReq genCreateSchemaRegionReq(
+      String storageGroup, TRegionReplicaSet regionReplicaSet) {
+    TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
+    req.setStorageGroup(storageGroup);
+    req.setRegionReplicaSet(regionReplicaSet);
+    return req;
+  }
+
+  private TCreateDataRegionReq genCreateDataRegionReq(
+      String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
+    TCreateDataRegionReq req = new TCreateDataRegionReq();
+    req.setStorageGroup(storageGroup);
+    req.setRegionReplicaSet(regionReplicaSet);
+    req.setTtl(TTL);
+    return req;
   }
 
   /**
@@ -341,23 +380,6 @@ public class AsyncDataNodeClientPool {
     }
   }
 
-  private TCreateSchemaRegionReq genCreateSchemaRegionReq(
-      String storageGroup, TRegionReplicaSet regionReplicaSet) {
-    TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
-    req.setStorageGroup(storageGroup);
-    req.setRegionReplicaSet(regionReplicaSet);
-    return req;
-  }
-
-  private TCreateDataRegionReq genCreateDataRegionReq(
-      String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
-    TCreateDataRegionReq req = new TCreateDataRegionReq();
-    req.setStorageGroup(storageGroup);
-    req.setRegionReplicaSet(regionReplicaSet);
-    req.setTtl(TTL);
-    return req;
-  }
-
   /**
    * Always call this interface when a DataNode is restarted or removed
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
index 792ffd4640..85c21f56e3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
@@ -134,14 +134,21 @@ public class SyncDataNodeClientPool {
       List<TConsensusGroupId> regionIds,
       Set<TRegionReplicaSet> deletedRegionSet) {
     for (TConsensusGroupId regionId : regionIds) {
-      LOGGER.debug("Delete region {} ", regionId);
+      LOGGER.info("Try to delete RegionReplica: {} on DataNode: {}", regionId, endPoint);
       final TSStatus status =
           sendSyncRequestToDataNodeWithRetry(
               endPoint, regionId, DataNodeRequestType.DELETE_REGIONS);
+
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.info("DELETE Region {} successfully", regionId);
-        deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
+        LOGGER.info("Delete RegionReplica: {} on DataNode: {} successfully", regionId, endPoint);
+      } else {
+        LOGGER.warn(
+            "Failed to delete RegionReplica: {} on DataNode: {}. You might need to delete it manually",
+            regionId,
+            endPoint);
       }
+
+      deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 92b03b8f1f..701fcdf67f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -40,7 +40,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedurePlan;
-import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsPlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
@@ -143,8 +143,8 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
         case CreateRegionGroups:
           req = new CreateRegionGroupsPlan();
           break;
-        case DeleteRegions:
-          req = new DeleteRegionsPlan();
+        case DeleteRegionGroups:
+          req = new DeleteRegionGroupsPlan();
           break;
         case GetSchemaPartition:
           req = new GetSchemaPartitionPlan();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index aa08063bd5..c579f1d3f3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -32,7 +32,7 @@ public enum ConfigPhysicalPlanType {
   GetStorageGroup,
   CountStorageGroup,
   CreateRegionGroups,
-  DeleteRegions,
+  DeleteRegionGroups,
   GetSchemaPartition,
   CreateSchemaPartition,
   GetOrCreateSchemaPartition,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
index fa0d288d72..ca7e3fde39 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
@@ -28,21 +28,26 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.TreeMap;
 
 /** Create regions for specific StorageGroups */
 public class CreateRegionGroupsPlan extends ConfigPhysicalPlan {
 
   // Map<StorageGroupName, List<TRegionReplicaSet>>
-  private final Map<String, List<TRegionReplicaSet>> regionGroupMap;
+  protected final Map<String, List<TRegionReplicaSet>> regionGroupMap;
 
   public CreateRegionGroupsPlan() {
     super(ConfigPhysicalPlanType.CreateRegionGroups);
-    this.regionGroupMap = new TreeMap<>();
+    this.regionGroupMap = new HashMap<>();
+  }
+
+  public CreateRegionGroupsPlan(ConfigPhysicalPlanType type) {
+    super(type);
+    this.regionGroupMap = new HashMap<>();
   }
 
   public Map<String, List<TRegionReplicaSet>> getRegionGroupMap() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionGroupsPlan.java
similarity index 55%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionsPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionGroupsPlan.java
index 83a00bc58c..f1a4da0332 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionsPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionGroupsPlan.java
@@ -18,62 +18,60 @@
  */
 package org.apache.iotdb.confignode.consensus.request.write;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class DeleteRegionsPlan extends ConfigPhysicalPlan {
+public class DeleteRegionGroupsPlan extends CreateRegionGroupsPlan {
 
-  private final Map<String, List<TConsensusGroupId>> deleteRegionMap;
+  boolean needsDeleteInPartitionTable = true;
 
-  public DeleteRegionsPlan() {
-    super(ConfigPhysicalPlanType.DeleteRegions);
-    this.deleteRegionMap = new HashMap<>();
+  public DeleteRegionGroupsPlan() {
+    super(ConfigPhysicalPlanType.DeleteRegionGroups);
   }
 
-  public void addDeleteRegion(String name, TConsensusGroupId consensusGroupId) {
-    deleteRegionMap.computeIfAbsent(name, empty -> new ArrayList<>()).add(consensusGroupId);
+  public boolean isNeedsDeleteInPartitionTable() {
+    return needsDeleteInPartitionTable;
   }
 
-  public Map<String, List<TConsensusGroupId>> getDeleteRegionMap() {
-    return deleteRegionMap;
+  public void setNeedsDeleteInPartitionTable(boolean needsDeleteInPartitionTable) {
+    this.needsDeleteInPartitionTable = needsDeleteInPartitionTable;
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
-    stream.writeInt(ConfigPhysicalPlanType.DeleteRegions.ordinal());
+    stream.writeInt(ConfigPhysicalPlanType.DeleteRegionGroups.ordinal());
 
-    stream.writeInt(deleteRegionMap.size());
-    for (Map.Entry<String, List<TConsensusGroupId>> consensusGroupIdsEntry :
-        deleteRegionMap.entrySet()) {
-      BasicStructureSerDeUtil.write(consensusGroupIdsEntry.getKey(), stream);
-      stream.writeInt(consensusGroupIdsEntry.getValue().size());
-      for (TConsensusGroupId consensusGroupId : consensusGroupIdsEntry.getValue()) {
-        ThriftCommonsSerDeUtils.serializeTConsensusGroupId(consensusGroupId, stream);
+    stream.writeByte(needsDeleteInPartitionTable ? 1 : 0);
+    stream.writeInt(regionGroupMap.size());
+    for (Map.Entry<String, List<TRegionReplicaSet>> entry : regionGroupMap.entrySet()) {
+      BasicStructureSerDeUtil.write(entry.getKey(), stream);
+      stream.writeInt(entry.getValue().size());
+      for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
+        ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, stream);
       }
     }
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    needsDeleteInPartitionTable = buffer.get() > 0;
     int length = buffer.getInt();
     for (int i = 0; i < length; i++) {
       String name = BasicStructureSerDeUtil.readString(buffer);
-      deleteRegionMap.put(name, new ArrayList<>());
+      regionGroupMap.put(name, new ArrayList<>());
       int regionNum = buffer.getInt();
       for (int j = 0; j < regionNum; j++) {
-        deleteRegionMap.get(name).add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
+        regionGroupMap.get(name).add(ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer));
       }
     }
   }
@@ -82,12 +80,12 @@ public class DeleteRegionsPlan extends ConfigPhysicalPlan {
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;
-    DeleteRegionsPlan that = (DeleteRegionsPlan) o;
-    return deleteRegionMap.equals(that.deleteRegionMap);
+    DeleteRegionGroupsPlan that = (DeleteRegionGroupsPlan) o;
+    return regionGroupMap.equals(that.regionGroupMap);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(deleteRegionMap);
+    return Objects.hash(regionGroupMap);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 8a825ca760..295cad7127 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -616,22 +616,7 @@ public class ConfigManager implements IManager {
   }
 
   private TSStatus confirmLeader() {
-    TSStatus result = new TSStatus();
-
-    if (getConsensusManager().isLeader()) {
-      return result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } else {
-      result.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
-      result.setMessage(
-          "The current ConfigNode is not leader, please redirect to a new ConfigNode.");
-
-      TConfigNodeLocation leaderLocation = consensusManager.getLeader();
-      if (leaderLocation != null) {
-        result.setRedirectNode(leaderLocation.getInternalEndPoint());
-      }
-
-      return result;
-    }
+    return getConsensusManager().confirmLeader();
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index f3d57f31ff..3ba78e601d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
@@ -36,6 +37,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -202,6 +204,30 @@ public class ConsensusManager {
     return null;
   }
 
+  /**
+   * Confirm the current ConfigNode's leadership
+   *
+   * @return SUCCESS_STATUS if the current ConfigNode is leader, NEED_REDIRECTION otherwise
+   */
+  public TSStatus confirmLeader() {
+    TSStatus result = new TSStatus();
+
+    if (isLeader()) {
+      return result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } else {
+      result.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+      result.setMessage(
+          "The current ConfigNode is not leader, please redirect to a new ConfigNode.");
+
+      TConfigNodeLocation leaderLocation = getLeader();
+      if (leaderLocation != null) {
+        result.setRedirectNode(leaderLocation.getInternalEndPoint());
+      }
+
+      return result;
+    }
+  }
+
   public ConsensusGroupId getConsensusGroupId() {
     return consensusGroupId;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index c66e9a72ae..6eba977594 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
 import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
@@ -54,6 +55,7 @@ import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -177,13 +179,22 @@ public class PartitionManager {
         return resp;
       }
 
-      // Allocate SchemaPartitions
-      Map<String, SchemaPartitionTable> assignedSchemaPartition =
-          getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
-      // Cache allocating result
-      CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
-      createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
-      getConsensusManager().write(createPlan);
+      status = getConsensusManager().confirmLeader();
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // Here we check the leadership second time
+        // since the RegionGroup creating process might take some time
+        resp.setStatus(status);
+        return resp;
+      } else {
+        // Allocate SchemaPartitions only if
+        // the current ConfigNode still holds its leadership
+        Map<String, SchemaPartitionTable> assignedSchemaPartition =
+            getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+        // Cache allocating result
+        CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
+        createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
+        getConsensusManager().write(createPlan);
+      }
     }
 
     return getSchemaPartition(req);
@@ -242,13 +253,22 @@ public class PartitionManager {
         return resp;
       }
 
-      // Allocate DataPartitions
-      Map<String, DataPartitionTable> assignedDataPartition =
-          getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
-      // Cache allocating result
-      CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
-      createPlan.setAssignedDataPartition(assignedDataPartition);
-      getConsensusManager().write(createPlan);
+      status = getConsensusManager().confirmLeader();
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // Here we check the leadership second time
+        // since the RegionGroup creating process might take some time
+        resp.setStatus(status);
+        return resp;
+      } else {
+        // Allocate DataPartitions only if
+        // the current ConfigNode still holds its leadership
+        Map<String, DataPartitionTable> assignedDataPartition =
+            getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
+        // Cache allocating result
+        CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
+        createPlan.setAssignedDataPartition(assignedDataPartition);
+        getConsensusManager().write(createPlan);
+      }
     }
 
     return getDataPartition(req);
@@ -316,13 +336,13 @@ public class PartitionManager {
         }
       }
 
-      // TODO: Use procedure to protect the following process
       if (!allotmentMap.isEmpty()) {
-        // Do Region allocation and creation for StorageGroups based on the allotment
-        getLoadManager().doRegionCreation(allotmentMap, consensusGroupType);
+        CreateRegionGroupsPlan createRegionGroupsPlan =
+            getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType);
+        result = getProcedureManager().createRegionGroups(createRegionGroupsPlan);
+      } else {
+        result = RpcUtils.SUCCESS_STATUS;
       }
-
-      result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (NotEnoughDataNodeException e) {
       LOGGER.error("ConfigNode failed to extend Region because there are not enough DataNodes");
       result.setCode(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode());
@@ -467,10 +487,7 @@ public class PartitionManager {
     return partitionInfo.getRegionStorageGroup(regionId);
   }
 
-  /**
-   * Called by {@link PartitionManager#regionCleaner} Delete regions of logical deleted storage
-   * groups periodically.
-   */
+  /** Called by {@link PartitionManager#regionCleaner} Delete RegionGroups periodically. */
   public void clearDeletedRegions() {
     if (getConsensusManager().isLeader()) {
       final Set<TRegionReplicaSet> deletedRegionSet = partitionInfo.getDeletedRegionSet();
@@ -565,4 +582,8 @@ public class PartitionManager {
   private LoadManager getLoadManager() {
     return configManager.getLoadManager();
   }
+
+  private ProcedureManager getProcedureManager() {
+    return configManager.getProcedureManager();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index addbeb65bc..61027c1343 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
@@ -30,6 +31,7 @@ import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
@@ -43,11 +45,13 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -98,15 +102,15 @@ public class ProcedureManager {
   }
 
   public TSStatus deleteStorageGroups(ArrayList<TStorageGroupSchema> deleteSgSchemaList) {
-    List<Long> procIdList = new ArrayList<>();
+    List<Long> procedureIds = new ArrayList<>();
     for (TStorageGroupSchema storageGroupSchema : deleteSgSchemaList) {
       DeleteStorageGroupProcedure deleteStorageGroupProcedure =
           new DeleteStorageGroupProcedure(storageGroupSchema);
-      long procId = this.executor.submitProcedure(deleteStorageGroupProcedure);
-      procIdList.add(procId);
+      long procedureId = this.executor.submitProcedure(deleteStorageGroupProcedure);
+      procedureIds.add(procedureId);
     }
     List<TSStatus> procedureStatus = new ArrayList<>();
-    boolean isSucceed = getProcedureStatus(this.executor, procIdList, procedureStatus);
+    boolean isSucceed = waitingProcedureFinished(procedureIds, procedureStatus);
     // clear the previously deleted regions
     final PartitionManager partitionManager = getConfigManager().getPartitionManager();
     partitionManager.getRegionCleaner().submit(partitionManager::clearDeletedRegions);
@@ -146,24 +150,51 @@ public class ProcedureManager {
     return true;
   }
 
-  private static boolean getProcedureStatus(
-      ProcedureExecutor executor, List<Long> procIds, List<TSStatus> statusList) {
+  /**
+   * Generate CreateRegionGroupsProcedure and wait for it finished
+   *
+   * @return SUCCESS_STATUS if all RegionGroups created successfully, CREATE_REGION_ERROR otherwise
+   */
+  public TSStatus createRegionGroups(CreateRegionGroupsPlan createRegionGroupsPlan) {
+    long procedureId =
+        executor.submitProcedure(new CreateRegionGroupsProcedure(createRegionGroupsPlan));
+    List<TSStatus> statusList = new ArrayList<>();
+    boolean isSucceed =
+        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+    if (isSucceed) {
+      return RpcUtils.SUCCESS_STATUS;
+    } else {
+      return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode())
+          .setMessage(statusList.get(0).getMessage());
+    }
+  }
+
+  /**
+   * Waiting until the specific procedures finished
+   *
+   * @param procedureIds The specific procedures' index
+   * @param statusList The corresponding running results of these procedures
+   * @return True if all Procedures finished successfully, false otherwise
+   */
+  private boolean waitingProcedureFinished(List<Long> procedureIds, List<TSStatus> statusList) {
     boolean isSucceed = true;
-    for (long procId : procIds) {
-      long startTimeForProcId = System.currentTimeMillis();
+    for (long procedureId : procedureIds) {
+      long startTimeForCurrentProcedure = System.currentTimeMillis();
       while (executor.isRunning()
-          && !executor.isFinished(procId)
-          && TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTimeForProcId)
+          && !executor.isFinished(procedureId)
+          && TimeUnit.MILLISECONDS.toSeconds(
+                  System.currentTimeMillis() - startTimeForCurrentProcedure)
               < procedureWaitTimeOut) {
         sleepWithoutInterrupt(procedureWaitRetryTimeout);
       }
-      Procedure finishedProc = executor.getResultOrProcedure(procId);
-      if (finishedProc.isSuccess()) {
+      Procedure<ConfigNodeProcedureEnv> finishedProcedure =
+          executor.getResultOrProcedure(procedureId);
+      if (finishedProcedure.isSuccess()) {
         statusList.add(StatusUtils.OK);
       } else {
         statusList.add(
             StatusUtils.EXECUTE_STATEMENT_ERROR.setMessage(
-                finishedProc.getException().getMessage()));
+                finishedProcedure.getException().getMessage()));
         isSucceed = false;
       }
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index abebafa3c8..27b0a14ef5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -51,7 +51,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -98,31 +97,18 @@ public class LoadManager {
   }
 
   /**
-   * Allocate and create Regions for each StorageGroup.
+   * Generate an optimal CreateRegionGroupsPlan
    *
    * @param allotmentMap Map<StorageGroupName, Region allotment>
-   * @param consensusGroupType TConsensusGroupType of Region to be allocated
+   * @param consensusGroupType TConsensusGroupType of RegionGroup to be allocated
+   * @return CreateRegionGroupsPlan
+   * @throws NotEnoughDataNodeException If there are not enough DataNodes
+   * @throws StorageGroupNotExistsException If some specific StorageGroups don't exist
    */
-  public void doRegionCreation(
+  public CreateRegionGroupsPlan allocateRegionGroups(
       Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
       throws NotEnoughDataNodeException, StorageGroupNotExistsException {
-    CreateRegionGroupsPlan createRegionGroupsPlan =
-        regionBalancer.genRegionsAllocationPlan(allotmentMap, consensusGroupType);
-
-    // TODO: Use procedure to protect the following process
-    // Create Regions on DataNodes
-    Map<String, Long> ttlMap = new HashMap<>();
-    for (String storageGroup : createRegionGroupsPlan.getRegionGroupMap().keySet()) {
-      ttlMap.put(
-          storageGroup,
-          getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL());
-    }
-    AsyncDataNodeClientPool.getInstance().createRegions(createRegionGroupsPlan, ttlMap);
-
-    // Persist the allocation result
-    getConsensusManager().write(createRegionGroupsPlan);
-    // Broadcast the latest RegionRouteMap
-    broadcastLatestRegionRouteMap();
+    return regionBalancer.genRegionsAllocationPlan(allotmentMap, consensusGroupType);
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index b37baa8f66..83bd5c38f4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedurePlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
@@ -157,44 +158,50 @@ public class ConfigPlanExecutor {
     }
   }
 
-  public TSStatus executeNonQueryPlan(ConfigPhysicalPlan req)
+  public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
       throws UnknownPhysicalPlanTypeException, AuthException {
-    switch (req.getType()) {
+    switch (physicalPlan.getType()) {
       case RegisterDataNode:
-        return nodeInfo.registerDataNode((RegisterDataNodePlan) req);
+        return nodeInfo.registerDataNode((RegisterDataNodePlan) physicalPlan);
       case RemoveDataNode:
-        return nodeInfo.removeDataNode((RemoveDataNodePlan) req);
+        return nodeInfo.removeDataNode((RemoveDataNodePlan) physicalPlan);
       case SetStorageGroup:
-        TSStatus status = clusterSchemaInfo.setStorageGroup((SetStorageGroupPlan) req);
+        TSStatus status = clusterSchemaInfo.setStorageGroup((SetStorageGroupPlan) physicalPlan);
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           return status;
         }
-        return partitionInfo.setStorageGroup((SetStorageGroupPlan) req);
+        return partitionInfo.setStorageGroup((SetStorageGroupPlan) physicalPlan);
       case AdjustMaxRegionGroupCount:
-        return clusterSchemaInfo.adjustMaxRegionGroupCount((AdjustMaxRegionGroupCountPlan) req);
+        return clusterSchemaInfo.adjustMaxRegionGroupCount(
+            (AdjustMaxRegionGroupCountPlan) physicalPlan);
       case DeleteStorageGroup:
-        partitionInfo.deleteStorageGroup((DeleteStorageGroupPlan) req);
-        return clusterSchemaInfo.deleteStorageGroup((DeleteStorageGroupPlan) req);
+        partitionInfo.deleteStorageGroup((DeleteStorageGroupPlan) physicalPlan);
+        return clusterSchemaInfo.deleteStorageGroup((DeleteStorageGroupPlan) physicalPlan);
       case PreDeleteStorageGroup:
-        return partitionInfo.preDeleteStorageGroup((PreDeleteStorageGroupPlan) req);
+        return partitionInfo.preDeleteStorageGroup((PreDeleteStorageGroupPlan) physicalPlan);
       case SetTTL:
-        return clusterSchemaInfo.setTTL((SetTTLPlan) req);
+        return clusterSchemaInfo.setTTL((SetTTLPlan) physicalPlan);
       case SetSchemaReplicationFactor:
-        return clusterSchemaInfo.setSchemaReplicationFactor((SetSchemaReplicationFactorPlan) req);
+        return clusterSchemaInfo.setSchemaReplicationFactor(
+            (SetSchemaReplicationFactorPlan) physicalPlan);
       case SetDataReplicationFactor:
-        return clusterSchemaInfo.setDataReplicationFactor((SetDataReplicationFactorPlan) req);
+        return clusterSchemaInfo.setDataReplicationFactor(
+            (SetDataReplicationFactorPlan) physicalPlan);
       case SetTimePartitionInterval:
-        return clusterSchemaInfo.setTimePartitionInterval((SetTimePartitionIntervalPlan) req);
+        return clusterSchemaInfo.setTimePartitionInterval(
+            (SetTimePartitionIntervalPlan) physicalPlan);
       case CreateRegionGroups:
-        return partitionInfo.createRegionGroups((CreateRegionGroupsPlan) req);
+        return partitionInfo.createRegionGroups((CreateRegionGroupsPlan) physicalPlan);
+      case DeleteRegionGroups:
+        return partitionInfo.deleteRegionGroups((DeleteRegionGroupsPlan) physicalPlan);
       case CreateSchemaPartition:
-        return partitionInfo.createSchemaPartition((CreateSchemaPartitionPlan) req);
+        return partitionInfo.createSchemaPartition((CreateSchemaPartitionPlan) physicalPlan);
       case CreateDataPartition:
-        return partitionInfo.createDataPartition((CreateDataPartitionPlan) req);
+        return partitionInfo.createDataPartition((CreateDataPartitionPlan) physicalPlan);
       case UpdateProcedure:
-        return procedureInfo.updateProcedure((UpdateProcedurePlan) req);
+        return procedureInfo.updateProcedure((UpdateProcedurePlan) physicalPlan);
       case DeleteProcedure:
-        return procedureInfo.deleteProcedure((DeleteProcedurePlan) req);
+        return procedureInfo.deleteProcedure((DeleteProcedurePlan) physicalPlan);
       case CreateUser:
       case CreateRole:
       case DropUser:
@@ -206,23 +213,23 @@ public class ConfigPlanExecutor {
       case RevokeRole:
       case RevokeRoleFromUser:
       case UpdateUser:
-        return authorInfo.authorNonQuery((AuthorPlan) req);
+        return authorInfo.authorNonQuery((AuthorPlan) physicalPlan);
       case ApplyConfigNode:
-        return nodeInfo.applyConfigNode((ApplyConfigNodePlan) req);
+        return nodeInfo.applyConfigNode((ApplyConfigNodePlan) physicalPlan);
       case RemoveConfigNode:
-        return nodeInfo.removeConfigNode((RemoveConfigNodePlan) req);
+        return nodeInfo.removeConfigNode((RemoveConfigNodePlan) physicalPlan);
       case CreateFunction:
-        return udfInfo.createFunction((CreateFunctionPlan) req);
+        return udfInfo.createFunction((CreateFunctionPlan) physicalPlan);
       case DropFunction:
-        return udfInfo.dropFunction((DropFunctionPlan) req);
+        return udfInfo.dropFunction((DropFunctionPlan) physicalPlan);
       case CreateSchemaTemplate:
-        return clusterSchemaInfo.createSchemaTemplate((CreateSchemaTemplatePlan) req);
+        return clusterSchemaInfo.createSchemaTemplate((CreateSchemaTemplatePlan) physicalPlan);
       case UpdateRegionLocation:
-        return partitionInfo.updateRegionLocation((UpdateRegionLocationPlan) req);
+        return partitionInfo.updateRegionLocation((UpdateRegionLocationPlan) physicalPlan);
       case SetSchemaTemplate:
-        return clusterSchemaInfo.setSchemaTemplate((SetSchemaTemplatePlan) req);
+        return clusterSchemaInfo.setSchemaTemplate((SetSchemaTemplatePlan) physicalPlan);
       default:
-        throw new UnknownPhysicalPlanTypeException(req.getType());
+        throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 7aa02188e7..2f9b23a882 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
 import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupPlan;
@@ -103,9 +104,6 @@ public class PartitionInfo implements SnapshotProcessor {
   public PartitionInfo() {
     this.storageGroupPartitionTables = new ConcurrentHashMap<>();
     this.nextRegionGroupId = new AtomicInteger(-1);
-
-    // Ensure that the PartitionTables of the StorageGroups who've been logically deleted
-    // are unreadable and un-writable
     // For RegionCleaner
     this.deletedRegionSet = Collections.synchronizedSet(new HashSet<>());
   }
@@ -195,6 +193,42 @@ public class PartitionInfo implements SnapshotProcessor {
     return result;
   }
 
+  /**
+   * Synchronously delete RegionGroups in PartitionTable and asynchronously delete RegionReplica on
+   * remote DataNodes
+   *
+   * @return SUCCESS_STATUS
+   */
+  public TSStatus deleteRegionGroups(DeleteRegionGroupsPlan deleteRegionGroupsPlan) {
+    // Delete RegionGroups' in PartitionTable if necessary
+    if (deleteRegionGroupsPlan.isNeedsDeleteInPartitionTable()) {
+      deleteRegionGroupsPlan
+          .getRegionGroupMap()
+          .forEach(
+              (storageGroup, deleteRegionGroups) -> {
+                if (isStorageGroupExisted(storageGroup)) {
+                  storageGroupPartitionTables
+                      .get(storageGroup)
+                      .deleteRegionGroups(deleteRegionGroups);
+                }
+              });
+    }
+
+    // Delete RegionReplicaSets on remote DataNodes asynchronously
+    synchronized (deletedRegionSet) {
+      deleteRegionGroupsPlan.getRegionGroupMap().values().forEach(deletedRegionSet::addAll);
+    }
+
+    return RpcUtils.SUCCESS_STATUS;
+  }
+
+  /** @return The Regions that should be deleted among the DataNodes */
+  public Set<TRegionReplicaSet> getDeletedRegionSet() {
+    synchronized (deletedRegionSet) {
+      return deletedRegionSet;
+    }
+  }
+
   /**
    * Thread-safely pre-delete the specific StorageGroup
    *
@@ -244,13 +278,6 @@ public class PartitionInfo implements SnapshotProcessor {
     }
   }
 
-  /** @return The Regions that should be deleted among the DataNodes */
-  public Set<TRegionReplicaSet> getDeletedRegionSet() {
-    synchronized (deletedRegionSet) {
-      return deletedRegionSet;
-    }
-  }
-
   /**
    * Thread-safely get SchemaPartition
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 37690984b0..1cffbb9b72 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -148,6 +148,15 @@ public class StorageGroupPartitionTable {
         replicaSet -> regionGroupMap.put(replicaSet.getRegionId(), new RegionGroup(replicaSet)));
   }
 
+  /**
+   * Delete RegionGroups' cache
+   *
+   * @param replicaSets List<TRegionReplicaSet>
+   */
+  public void deleteRegionGroups(List<TRegionReplicaSet> replicaSets) {
+    replicaSets.forEach(replicaSet -> regionGroupMap.remove(replicaSet.getRegionId()));
+  }
+
   /** @return All Regions' RegionReplicaSet within one StorageGroup */
   public List<TRegionReplicaSet> getAllReplicaSets() {
     List<TRegionReplicaSet> result = new ArrayList<>();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 9daa2d4ea7..b54f9132e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -20,20 +20,27 @@
 package org.apache.iotdb.confignode.procedure.env;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
 import org.apache.iotdb.confignode.exception.AddConsensusGroupException;
 import org.apache.iotdb.confignode.exception.AddPeerException;
+import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
+import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
@@ -48,7 +55,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class ConfigNodeProcedureEnv {
@@ -288,6 +297,39 @@ public class ConfigNodeProcedureEnv {
             configManager.getNodeManager().getRegisteredConfigNodes());
   }
 
+  /**
+   * Broadcast the CreateRegionGroupsPlan
+   *
+   * @return Those RegionGroups that failed to create
+   */
+  public Map<TConsensusGroupId, TRegionReplicaSet> doRegionCreation(
+      CreateRegionGroupsPlan createRegionGroupsPlan) {
+    Map<String, Long> ttlMap = new HashMap<>();
+    for (String storageGroup : createRegionGroupsPlan.getRegionGroupMap().keySet()) {
+      try {
+        ttlMap.put(
+            storageGroup,
+            getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL());
+      } catch (StorageGroupNotExistsException e) {
+        // Notice: This line will never
+        LOG.error("StorageGroup doesn't exist", e);
+      }
+    }
+    return AsyncDataNodeClientPool.getInstance().createRegionGroups(createRegionGroupsPlan, ttlMap);
+  }
+
+  public void persistAndBroadcastRegionGroup(CreateRegionGroupsPlan createRegionGroupsPlan) {
+    // Persist the allocation result
+    getConsensusManager().write(createRegionGroupsPlan);
+    // Broadcast the latest RegionRouteMap
+    getLoadManager().broadcastLatestRegionRouteMap();
+  }
+
+  /** Submit the RegionReplicas to the RegionCleaner when there are creation failures */
+  public void submitFailedRegionReplicas(DeleteRegionGroupsPlan deleteRegionGroupsPlan) {
+    getConsensusManager().write(deleteRegionGroupsPlan);
+  }
+
   public LockQueue getNodeLock() {
     return nodeLock;
   }
@@ -311,4 +353,12 @@ public class ConfigNodeProcedureEnv {
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
+
+  private ClusterSchemaManager getClusterSchemaManager() {
+    return configManager.getClusterSchemaManager();
+  }
+
+  private LoadManager getLoadManager() {
+    return configManager.getLoadManager();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
new file mode 100644
index 0000000000..32877571bb
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
+import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState;
+
+import java.util.Map;
+
+public class CreateRegionGroupsProcedure
+    extends StateMachineProcedure<ConfigNodeProcedureEnv, CreateRegionGroupsState> {
+
+  private final CreateRegionGroupsPlan createRegionGroupsPlan;
+  // Map<TConsensusGroupId, Failed RegionReplicas>
+  private Map<TConsensusGroupId, TRegionReplicaSet> failedRegions;
+
+  public CreateRegionGroupsProcedure(CreateRegionGroupsPlan createRegionGroupsPlan) {
+    this.createRegionGroupsPlan = createRegionGroupsPlan;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateRegionGroupsState state) {
+    switch (state) {
+      case CREATE_REGION_GROUPS_PREPARE:
+        setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS);
+        break;
+      case CREATE_REGION_GROUPS:
+        failedRegions = env.doRegionCreation(createRegionGroupsPlan);
+        setNextState(CreateRegionGroupsState.PERSIST_AND_BROADCAST);
+        break;
+      case PERSIST_AND_BROADCAST:
+        // Filter those RegionGroups that created successfully
+        CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
+        createRegionGroupsPlan
+            .getRegionGroupMap()
+            .forEach(
+                (storageGroup, regionReplicaSets) ->
+                    regionReplicaSets.forEach(
+                        regionReplicaSet -> {
+                          if (!failedRegions.containsKey(regionReplicaSet.getRegionId())) {
+                            persistPlan.addRegionGroup(storageGroup, regionReplicaSet);
+                          }
+                        }));
+        env.persistAndBroadcastRegionGroup(persistPlan);
+        setNextState(
+            failedRegions.size() > 0
+                ? CreateRegionGroupsState.DELETE_FAILED_REGION_GROUPS
+                : CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
+        break;
+      case DELETE_FAILED_REGION_GROUPS:
+        DeleteRegionGroupsPlan deletePlan = new DeleteRegionGroupsPlan();
+        // We don't need to wipe the PartitionTable here
+        // since the failed RegionGroups are not recorded
+        deletePlan.setNeedsDeleteInPartitionTable(false);
+        createRegionGroupsPlan
+            .getRegionGroupMap()
+            .forEach(
+                (storageGroup, regionReplicaSets) ->
+                    regionReplicaSets.forEach(
+                        regionReplicaSet -> {
+                          if (failedRegions.containsKey(regionReplicaSet.getRegionId())) {
+                            TRegionReplicaSet failedReplicaSet =
+                                failedRegions.get(regionReplicaSet.getRegionId());
+                            TRegionReplicaSet redundantReplicaSet =
+                                new TRegionReplicaSet().setRegionId(regionReplicaSet.getRegionId());
+                            regionReplicaSet
+                                .getDataNodeLocations()
+                                .forEach(
+                                    dataNodeLocation -> {
+                                      if (!failedReplicaSet
+                                          .getDataNodeLocations()
+                                          .contains(dataNodeLocation)) {
+                                        redundantReplicaSet.addToDataNodeLocations(
+                                            dataNodeLocation);
+                                      }
+                                    });
+                            deletePlan.addRegionGroup(storageGroup, redundantReplicaSet);
+                          }
+                        }));
+        env.submitFailedRegionReplicas(deletePlan);
+        setFailure(
+            new ProcedureException(
+                "There are some RegionGroups failed to create, please check former logs in ConfigNode-leader."));
+        return Flow.NO_MORE_STATE;
+      case CREATE_REGION_GROUPS_FINISH:
+        return Flow.NO_MORE_STATE;
+    }
+
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(
+      ConfigNodeProcedureEnv configNodeProcedureEnv,
+      CreateRegionGroupsState createRegionGroupsState) {
+    // Do nothing
+  }
+
+  @Override
+  protected CreateRegionGroupsState getState(int stateId) {
+    return CreateRegionGroupsState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(CreateRegionGroupsState createRegionGroupsState) {
+    return createRegionGroupsState.ordinal();
+  }
+
+  @Override
+  protected CreateRegionGroupsState getInitialState() {
+    return CreateRegionGroupsState.CREATE_REGION_GROUPS;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
new file mode 100644
index 0000000000..35855420a0
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.state;
+
+public enum CreateRegionGroupsState {
+  CREATE_REGION_GROUPS_PREPARE,
+  CREATE_REGION_GROUPS,
+  PERSIST_AND_BROADCAST,
+  DELETE_FAILED_REGION_GROUPS,
+  CREATE_REGION_GROUPS_FINISH
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index dd28aebfdb..59f0eedaa9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure.store;
 
 import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
@@ -73,6 +74,8 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.REMOVE_DATA_NODE_PROCEDURE;
     } else if (procedure instanceof RegionMigrateProcedure) {
       return ProcedureType.REGION_MIGRATE_PROCEDURE;
+    } else if (procedure instanceof CreateRegionGroupsProcedure) {
+      return ProcedureType.CREATE_REGION_GROUPS;
     }
     return null;
   }
@@ -82,7 +85,8 @@ public class ProcedureFactory implements IProcedureFactory {
     ADD_CONFIG_NODE_PROCEDURE,
     REMOVE_CONFIG_NODE_PROCEDURE,
     REMOVE_DATA_NODE_PROCEDURE,
-    REGION_MIGRATE_PROCEDURE
+    REGION_MIGRATE_PROCEDURE,
+    CREATE_REGION_GROUPS
   }
 
   private static class ProcedureFactoryHolder {
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 64a2ec3472..32429cec1f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -53,7 +53,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPl
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedurePlan;
-import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsPlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
@@ -235,12 +235,23 @@ public class ConfigPhysicalPlanSerDeTest {
 
   @Test
   public void DeleteRegionsPlanTest() throws IOException {
-    DeleteRegionsPlan req0 = new DeleteRegionsPlan();
-    req0.addDeleteRegion("sg", new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0));
-    req0.addDeleteRegion("sg", new TConsensusGroupId(TConsensusGroupType.DataRegion, 1));
+    TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+    dataNodeLocation.setDataNodeId(0);
+    dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+    dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
+    dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
 
-    DeleteRegionsPlan req1 =
-        (DeleteRegionsPlan) ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer());
+    DeleteRegionGroupsPlan req0 = new DeleteRegionGroupsPlan();
+    req0.setNeedsDeleteInPartitionTable(false);
+    TRegionReplicaSet dataRegionSet = new TRegionReplicaSet();
+    dataRegionSet.setRegionId(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
+    dataRegionSet.setDataNodeLocations(Collections.singletonList(dataNodeLocation));
+    req0.addRegionGroup("root.sg0", dataRegionSet);
+
+    DeleteRegionGroupsPlan req1 =
+        (DeleteRegionGroupsPlan) ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer());
     Assert.assertEquals(req0, req1);
   }
 
@@ -691,16 +702,20 @@ public class ConfigPhysicalPlanSerDeTest {
   private CreateSchemaTemplateStatement newCreateSchemaTemplateStatement(String name) {
     List<List<String>> measurements =
         Arrays.asList(
-            Arrays.asList(name + "_" + "temperature"), Arrays.asList(name + "_" + "status"));
+            Collections.singletonList(name + "_" + "temperature"),
+            Collections.singletonList(name + "_" + "status"));
     List<List<TSDataType>> dataTypes =
-        Arrays.asList(Arrays.asList(TSDataType.FLOAT), Arrays.asList(TSDataType.BOOLEAN));
+        Arrays.asList(
+            Collections.singletonList(TSDataType.FLOAT),
+            Collections.singletonList(TSDataType.BOOLEAN));
     List<List<TSEncoding>> encodings =
-        Arrays.asList(Arrays.asList(TSEncoding.RLE), Arrays.asList(TSEncoding.PLAIN));
+        Arrays.asList(
+            Collections.singletonList(TSEncoding.RLE), Collections.singletonList(TSEncoding.PLAIN));
     List<List<CompressionType>> compressors =
-        Arrays.asList(Arrays.asList(CompressionType.SNAPPY), Arrays.asList(CompressionType.SNAPPY));
-    CreateSchemaTemplateStatement createSchemaTemplateStatement =
-        new CreateSchemaTemplateStatement(name, measurements, dataTypes, encodings, compressors);
-    return createSchemaTemplateStatement;
+        Arrays.asList(
+            Collections.singletonList(CompressionType.SNAPPY),
+            Collections.singletonList(CompressionType.SNAPPY));
+    return new CreateSchemaTemplateStatement(name, measurements, dataTypes, encodings, compressors);
   }
 
   @Test