You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/11 03:44:50 UTC

[iotdb] branch master updated: [IOTDB-4044] Remove a DataNode from the cluster, when this node stopped. (#6938)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 68dedba8d2 [IOTDB-4044] Remove a DataNode from the cluster, when this node stopped. (#6938)
68dedba8d2 is described below

commit 68dedba8d2cb3c42a25d3fe83be2dcfc81b79575
Author: wangchao316 <66...@users.noreply.github.com>
AuthorDate: Thu Aug 11 11:44:44 2022 +0800

    [IOTDB-4044] Remove a DataNode from the cluster, when this node stopped. (#6938)
---
 .../confignode/client/DataNodeRequestType.java     |   5 +-
 .../sync/datanode/SyncDataNodeClientPool.java      |  38 +-
 .../procedure/env/DataNodeRemoveHandler.java       | 134 +++++--
 .../procedure/impl/RegionMigrateProcedure.java     |  48 ++-
 .../procedure/impl/RemoveDataNodeProcedure.java    |  26 +-
 .../procedure/state/RegionTransitionState.java     |   5 +-
 .../iotdb/db/service/RegionMigrateService.java     | 387 ++++++++++++---------
 .../impl/DataNodeInternalRPCServiceImpl.java       |  72 ++--
 thrift/src/main/thrift/datanode.thrift             |  19 +-
 9 files changed, 469 insertions(+), 265 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 8eccbfe2d0..7fc041f37e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -24,7 +24,10 @@ public enum DataNodeRequestType {
   INVALIDATE_PARTITION_CACHE,
   INVALIDATE_PERMISSION_CACHE,
   INVALIDATE_SCHEMA_CACHE,
-  MIGRATE_REGION,
+  ADD_REGION_CONSENSUS_GROUP,
+  ADD_REGION_PEER,
+  REMOVE_REGION_PEER,
+  REMOVE_REGION_CONSENSUS_GROUP,
   DISABLE_DATA_NODE,
   STOP_DATA_NODE,
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
index cf95dd7bf0..792ffd4640 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
@@ -79,14 +79,20 @@ public class SyncDataNodeClientPool {
             return client.deleteRegion((TConsensusGroupId) req);
           case INVALIDATE_PERMISSION_CACHE:
             return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
-          case MIGRATE_REGION:
-            return client.migrateRegion((TMigrateRegionReq) req);
           case DISABLE_DATA_NODE:
             return client.disableDataNode((TDisableDataNodeReq) req);
           case STOP_DATA_NODE:
             return client.stopDataNode();
           case UPDATE_TEMPLATE:
             return client.updateTemplate((TUpdateTemplateReq) req);
+          case ADD_REGION_CONSENSUS_GROUP:
+            return client.addToRegionConsensusGroup((TAddConsensusGroup) req);
+          case ADD_REGION_PEER:
+            return client.addRegionPeer((TMigrateRegionReq) req);
+          case REMOVE_REGION_PEER:
+            return client.removeRegionPeer((TMigrateRegionReq) req);
+          case REMOVE_REGION_CONSENSUS_GROUP:
+            return client.removeToRegionConsensusGroup((TMigrateRegionReq) req);
           default:
             return RpcUtils.getStatus(
                 TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType);
@@ -175,34 +181,6 @@ public class SyncDataNodeClientPool {
     return status;
   }
 
-  public TSStatus addToRegionConsensusGroup(
-      List<TDataNodeLocation> regionOriginalPeerNodes,
-      TConsensusGroupId regionId,
-      TDataNodeLocation newPeerNode,
-      String storageGroup,
-      long ttl) {
-    TSStatus status;
-    // do addConsensusGroup in new node locally
-    try (SyncDataNodeInternalServiceClient client =
-        clientManager.borrowClient(newPeerNode.getInternalEndPoint())) {
-      List<TDataNodeLocation> currentPeerNodes = new ArrayList<>(regionOriginalPeerNodes);
-      currentPeerNodes.add(newPeerNode);
-      TAddConsensusGroup req = new TAddConsensusGroup(regionId, currentPeerNodes, storageGroup);
-      req.setTtl(ttl);
-      status = client.addToRegionConsensusGroup(req);
-    } catch (IOException e) {
-      LOGGER.error("Can't connect to Data Node {}.", newPeerNode.getInternalEndPoint(), e);
-      status = new TSStatus(TSStatusCode.NO_CONNECTION.getStatusCode());
-      status.setMessage(e.getMessage());
-    } catch (TException e) {
-      LOGGER.error(
-          "Add region consensus {} group failed to Date node: {}", regionId, newPeerNode, e);
-      status = new TSStatus(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode());
-      status.setMessage(e.getMessage());
-    }
-    return status;
-  }
-
   // TODO: Is the ClientPool must be a singleton?
   private static class ClientPoolHolder {
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 580d1f7519..ca4d63156b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import org.apache.iotdb.mpp.rpc.thrift.TAddConsensusGroup;
 import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -126,7 +127,6 @@ public class DataNodeRemoveHandler {
       return null;
     }
 
-    // will migrate the region to the new node, which should not be same raft
     Optional<TDataNodeLocation> newNode = pickNewReplicaNodeForRegion(regionReplicaNodes);
     if (!newNode.isPresent()) {
       LOGGER.warn("No enough Data node to migrate region: {}", regionId);
@@ -135,30 +135,60 @@ public class DataNodeRemoveHandler {
   }
 
   /**
-   * Send to DataNode, migrate region from originalDataNode to destDataNode
+   * Send to DataNode, add region peer
    *
    * @param originalDataNode old location data node
    * @param destDataNode dest data node
    * @param regionId region id
    * @return migrate status
    */
-  public TSStatus migrateRegion(
+  public TSStatus addRegionPeer(
       TDataNodeLocation originalDataNode,
       TDataNodeLocation destDataNode,
       TConsensusGroupId regionId) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
-    if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+    Optional<TDataNodeLocation> otherNode = findNodeOfAnotherReplica(regionId, originalDataNode);
+    if (!otherNode.isPresent()) {
+      LOGGER.warn(
+          "No other Node to change region leader, check by show regions, region: {}", regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("not find region replica nodes, region: " + regionId);
+      status.setMessage("No other Node to change region leader, check by show regions");
       return status;
     }
 
-    // TODO if region replica is 1, the new leader is null, it also need to migrate
-    Optional<TDataNodeLocation> newLeaderNode =
-        regionReplicaNodes.stream().filter(e -> !e.equals(originalDataNode)).findAny();
-    if (!newLeaderNode.isPresent()) {
+    TMigrateRegionReq migrateRegionReq =
+        new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
+    migrateRegionReq.setNewLeaderNode(otherNode.get());
+
+    // send to otherNode node
+    status =
+        SyncDataNodeClientPool.getInstance()
+            .sendSyncRequestToDataNodeWithRetry(
+                otherNode.get().getInternalEndPoint(),
+                migrateRegionReq,
+                DataNodeRequestType.ADD_REGION_PEER);
+    LOGGER.info(
+        "Send region {} add peer action to {}, wait it finished",
+        regionId,
+        otherNode.get().getInternalEndPoint());
+    return status;
+  }
+
+  /**
+   * Send to DataNode, remove region
+   *
+   * @param originalDataNode old location data node
+   * @param destDataNode dest data node
+   * @param regionId region id
+   * @return migrate status
+   */
+  public TSStatus removeRegionPeer(
+      TDataNodeLocation originalDataNode,
+      TDataNodeLocation destDataNode,
+      TConsensusGroupId regionId) {
+    TSStatus status;
+    Optional<TDataNodeLocation> otherNode = findNodeOfAnotherReplica(regionId, originalDataNode);
+    if (!otherNode.isPresent()) {
       LOGGER.warn(
           "No other Node to change region leader, check by show regions, region: {}", regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
@@ -168,15 +198,46 @@ public class DataNodeRemoveHandler {
 
     TMigrateRegionReq migrateRegionReq =
         new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
-    migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
+    migrateRegionReq.setNewLeaderNode(otherNode.get());
+
+    // send to other node
+    status =
+        SyncDataNodeClientPool.getInstance()
+            .sendSyncRequestToDataNodeWithRetry(
+                otherNode.get().getInternalEndPoint(),
+                migrateRegionReq,
+                DataNodeRequestType.REMOVE_REGION_PEER);
+    LOGGER.info(
+        "Send region {} remove peer to {}, wait it finished",
+        regionId,
+        otherNode.get().getInternalEndPoint());
+    return status;
+  }
+
+  /**
+   * Send to DataNode, remove region consensus group from originalDataNode node
+   *
+   * @param originalDataNode old location data node
+   * @param destDataNode dest data node
+   * @param regionId region id
+   * @return migrate status
+   */
+  public TSStatus removeRegionConsensusGroup(
+      TDataNodeLocation originalDataNode,
+      TDataNodeLocation destDataNode,
+      TConsensusGroupId regionId) {
+    TSStatus status;
+    TMigrateRegionReq migrateRegionReq =
+        new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
+    migrateRegionReq.setNewLeaderNode(originalDataNode);
     status =
         SyncDataNodeClientPool.getInstance()
             .sendSyncRequestToDataNodeWithRetry(
                 originalDataNode.getInternalEndPoint(),
                 migrateRegionReq,
-                DataNodeRequestType.MIGRATE_REGION);
+                DataNodeRequestType.REMOVE_REGION_CONSENSUS_GROUP);
     LOGGER.info(
-        "send region {} migrate action to {}, wait it finished",
+        "Send region {} remove consensus group action to {}, wait it finished",
         regionId,
         originalDataNode.getInternalEndPoint());
     return status;
@@ -256,12 +317,20 @@ public class DataNodeRemoveHandler {
       return status;
     }
 
+    List<TDataNodeLocation> currentPeerNodes = new ArrayList<>(regionReplicaNodes);
+    currentPeerNodes.add(destDataNode);
     String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
+    TAddConsensusGroup req = new TAddConsensusGroup(regionId, currentPeerNodes, storageGroup);
+    // TODO replace with real ttl
+    req.setTtl(Long.MAX_VALUE);
+
     status =
         SyncDataNodeClientPool.getInstance()
-            .addToRegionConsensusGroup(
-                // TODO replace with real ttl
-                regionReplicaNodes, regionId, destDataNode, storageGroup, Long.MAX_VALUE);
+            .sendSyncRequestToDataNodeWithRetry(
+                destDataNode.getInternalEndPoint(),
+                req,
+                DataNodeRequestType.ADD_REGION_CONSENSUS_GROUP);
+
     LOGGER.info("send add region {} consensus group to {}", regionId, destDataNode);
     if (isFailed(status)) {
       LOGGER.error(
@@ -296,9 +365,6 @@ public class DataNodeRemoveHandler {
             .sendSyncRequestToDataNodeWithRetry(
                 dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
     LOGGER.info("stop Data Node {} result: {}", dataNode, status);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new ProcedureException("Failed to stop data node");
-    }
     return status;
   }
 
@@ -383,4 +449,32 @@ public class DataNodeRemoveHandler {
     removeDataNodes.add(tDataNodeLocation);
     configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes));
   }
+
+  public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation tDataNodeLocation) {
+    Optional<TDataNodeLocation> newLeaderNode =
+        findNodeOfAnotherReplica(regionId, tDataNodeLocation);
+    if (newLeaderNode.isPresent()) {
+      SyncDataNodeClientPool.getInstance()
+          .changeRegionLeader(
+              regionId, tDataNodeLocation.getInternalEndPoint(), newLeaderNode.get());
+      LOGGER.info(
+          "Change region leader finished, region is {}, newLeaderNode is {}",
+          regionId,
+          newLeaderNode);
+    }
+  }
+
+  private Optional<TDataNodeLocation> findNodeOfAnotherReplica(
+      TConsensusGroupId regionId, TDataNodeLocation tDataNodeLocation) {
+    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+    if (regionReplicaNodes.isEmpty()) {
+      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+      return Optional.empty();
+    }
+
+    // TODO replace findAny() by select the low load node.
+    Optional<TDataNodeLocation> newLeaderNode =
+        regionReplicaNodes.stream().filter(e -> !e.equals(tDataNodeLocation)).findAny();
+    return newLeaderNode;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
index 8958e9c9f8..dd44b0ac33 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
@@ -74,6 +74,7 @@ public class RegionMigrateProcedure
     if (consensusGroupId == null) {
       return Flow.NO_MORE_STATE;
     }
+    TSStatus tsStatus = null;
     try {
       switch (state) {
         case REGION_MIGRATE_PREPARE:
@@ -82,13 +83,46 @@ public class RegionMigrateProcedure
         case ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP:
           env.getDataNodeRemoveHandler()
               .addNewNodeToRegionConsensusGroup(consensusGroupId, destDataNode);
-          setNextState(RegionTransitionState.MIGRATE_REGION);
+          setNextState(RegionTransitionState.ADD_REGION_PEER);
           break;
-        case MIGRATE_REGION:
-          env.getDataNodeRemoveHandler()
-              .migrateRegion(originalDataNode, destDataNode, consensusGroupId);
-          waitForTheRegionMigrateFinished(consensusGroupId);
-          LOG.info("Wait for region {}  migrate finished", consensusGroupId);
+        case ADD_REGION_PEER:
+          tsStatus =
+              env.getDataNodeRemoveHandler()
+                  .addRegionPeer(originalDataNode, destDataNode, consensusGroupId);
+          if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            waitForOneMigrationStepFinished(consensusGroupId);
+            LOG.info("Wait for region {}  add peer finished", consensusGroupId);
+          } else {
+            throw new ProcedureException("Failed to add region peer");
+          }
+          setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
+          break;
+        case CHANGE_REGION_LEADER:
+          env.getDataNodeRemoveHandler().changeRegionLeader(consensusGroupId, originalDataNode);
+          setNextState(RegionTransitionState.REMOVE_REGION_PEER);
+          break;
+        case REMOVE_REGION_PEER:
+          tsStatus =
+              env.getDataNodeRemoveHandler()
+                  .removeRegionPeer(originalDataNode, destDataNode, consensusGroupId);
+          if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            waitForOneMigrationStepFinished(consensusGroupId);
+            LOG.info("Wait for region {} remove peer finished", consensusGroupId);
+          } else {
+            throw new ProcedureException("Failed to remove region peer");
+          }
+          setNextState(RegionTransitionState.REMOVE_REGION_CONSENSUS_GROUP);
+          break;
+        case REMOVE_REGION_CONSENSUS_GROUP:
+          tsStatus =
+              env.getDataNodeRemoveHandler()
+                  .removeRegionConsensusGroup(originalDataNode, destDataNode, consensusGroupId);
+          if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            waitForOneMigrationStepFinished(consensusGroupId);
+            LOG.info("Wait for region {}  remove consensus group finished", consensusGroupId);
+          }
+          // remove consensus group after a node stop, which will be failed, but we will continue
+          // execute.
           setNextState(RegionTransitionState.UPDATE_REGION_LOCATION_CACHE);
           break;
         case UPDATE_REGION_LOCATION_CACHE:
@@ -200,7 +234,7 @@ public class RegionMigrateProcedure
     return false;
   }
 
-  public TSStatus waitForTheRegionMigrateFinished(TConsensusGroupId consensusGroupId) {
+  public TSStatus waitForOneMigrationStepFinished(TConsensusGroupId consensusGroupId) {
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     synchronized (regionMigrateLock) {
       try {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
index 14348a3cc1..cf327ec8f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
@@ -42,7 +42,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
   private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
   private static final int retryThreshold = 5;
 
-  private TDataNodeLocation tDataNodeLocation;
+  private TDataNodeLocation disableDataNodeLocation;
 
   private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>();
 
@@ -50,26 +50,26 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
     super();
   }
 
-  public RemoveDataNodeProcedure(TDataNodeLocation tDataNodeLocation) {
+  public RemoveDataNodeProcedure(TDataNodeLocation disableDataNodeLocation) {
     super();
-    this.tDataNodeLocation = tDataNodeLocation;
+    this.disableDataNodeLocation = disableDataNodeLocation;
   }
 
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState state) {
-    if (tDataNodeLocation == null) {
+    if (disableDataNodeLocation == null) {
       return Flow.NO_MORE_STATE;
     }
     try {
       switch (state) {
         case REMOVE_DATA_NODE_PREPARE:
           execDataNodeRegionIds =
-              env.getDataNodeRemoveHandler().getDataNodeRegionIds(tDataNodeLocation);
+              env.getDataNodeRemoveHandler().getDataNodeRegionIds(disableDataNodeLocation);
           LOG.info("DataNode region id is {}", execDataNodeRegionIds);
           setNextState(RemoveDataNodeState.BROADCAST_DISABLE_DATA_NODE);
           break;
         case BROADCAST_DISABLE_DATA_NODE:
-          env.getDataNodeRemoveHandler().broadcastDisableDataNode(tDataNodeLocation);
+          env.getDataNodeRemoveHandler().broadcastDisableDataNode(disableDataNodeLocation);
           setNextState(RemoveDataNodeState.SUBMIT_REGION_MIGRATE);
           break;
         case SUBMIT_REGION_MIGRATE:
@@ -77,8 +77,8 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
           setNextState(RemoveDataNodeState.STOP_DATA_NODE);
           break;
         case STOP_DATA_NODE:
-          env.getDataNodeRemoveHandler().stopDataNode(tDataNodeLocation);
-          env.getDataNodeRemoveHandler().removeDataNodePersistence(tDataNodeLocation);
+          env.getDataNodeRemoveHandler().removeDataNodePersistence(disableDataNodeLocation);
+          env.getDataNodeRemoveHandler().stopDataNode(disableDataNodeLocation);
           return Flow.NO_MORE_STATE;
       }
     } catch (Exception e) {
@@ -87,7 +87,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
       } else {
         LOG.error(
             "Retrievable error trying to remove data node {}, state {}",
-            tDataNodeLocation,
+            disableDataNodeLocation,
             state,
             e);
         if (getCycles() > retryThreshold) {
@@ -136,7 +136,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
   public void serialize(DataOutputStream stream) throws IOException {
     stream.writeInt(ProcedureFactory.ProcedureType.REMOVE_DATA_NODE_PROCEDURE.ordinal());
     super.serialize(stream);
-    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(tDataNodeLocation, stream);
+    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(disableDataNodeLocation, stream);
     stream.writeInt(execDataNodeRegionIds.size());
     execDataNodeRegionIds.forEach(
         tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream));
@@ -146,7 +146,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     try {
-      tDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+      disableDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
       int regionSize = byteBuffer.getInt();
       execDataNodeRegionIds = new ArrayList<>(regionSize);
       for (int i = 0; i < regionSize; i++) {
@@ -163,7 +163,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
       RemoveDataNodeProcedure thatProc = (RemoveDataNodeProcedure) that;
       return thatProc.getProcId() == this.getProcId()
           && thatProc.getState() == this.getState()
-          && thatProc.tDataNodeLocation.equals(this.tDataNodeLocation);
+          && thatProc.disableDataNodeLocation.equals(this.disableDataNodeLocation);
     }
     return false;
   }
@@ -175,7 +175,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
               env.getDataNodeRemoveHandler().findDestDataNode(regionId);
           if (destDataNode != null) {
             RegionMigrateProcedure regionMigrateProcedure =
-                new RegionMigrateProcedure(regionId, tDataNodeLocation, destDataNode);
+                new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
             addChildProcedure(regionMigrateProcedure);
             LOG.info("Submit child procedure, {}", regionMigrateProcedure);
           }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
index 236a334dd9..fb0ebe4312 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.confignode.procedure.state;
 public enum RegionTransitionState {
   REGION_MIGRATE_PREPARE,
   ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP,
-  MIGRATE_REGION,
+  ADD_REGION_PEER,
+  CHANGE_REGION_LEADER,
+  REMOVE_REGION_PEER,
+  REMOVE_REGION_CONSENSUS_GROUP,
   UPDATE_REGION_LOCATION_CACHE
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 0aaae47d64..914ec0b0bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -64,28 +64,21 @@ public class RegionMigrateService implements IService {
   }
 
   /**
-   * migrate a region from fromNode to toNode
+   * add a region peer
    *
-   * @param fromNode from which node
-   * @param regionId which region
-   * @param toNode to which node
-   * @param newLeaderNode transfer region leader to which node
+   * @param req TMigrateRegionReq
    * @return submit task succeed?
    */
-  private boolean submitRegionMigrateTask(
-      TDataNodeLocation fromNode,
-      TConsensusGroupId regionId,
-      TDataNodeLocation toNode,
-      TDataNodeLocation newLeaderNode) {
+  public synchronized boolean submitAddRegionPeerTask(TMigrateRegionReq req) {
+
     boolean submitSucceed = true;
     try {
-      regionMigratePool.submit(new RegionMigrateTask(regionId, fromNode, toNode, newLeaderNode));
+      regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(), req.getToNode()));
     } catch (Exception e) {
       LOGGER.error(
-          "submit region migrate task error. region: {}, from: {} --> to: {}.",
-          regionId,
-          fromNode.getInternalEndPoint().getIp(),
-          toNode.getInternalEndPoint().getIp(),
+          "submit add region peer task error. region: {}, to: {}.",
+          req.getRegionId(),
+          req.getToNode().getInternalEndPoint().getIp(),
           e);
       submitSucceed = false;
     }
@@ -93,14 +86,49 @@ public class RegionMigrateService implements IService {
   }
 
   /**
-   * migrate a region
+   * remove a region peer
    *
    * @param req TMigrateRegionReq
    * @return submit task succeed?
    */
-  public synchronized boolean submitRegionMigrateTask(TMigrateRegionReq req) {
-    return submitRegionMigrateTask(
-        req.getFromNode(), req.getRegionId(), req.getToNode(), req.getNewLeaderNode());
+  public synchronized boolean submitRemoveRegionPeerTask(TMigrateRegionReq req) {
+
+    boolean submitSucceed = true;
+    try {
+      regionMigratePool.submit(
+          new RemoveRegionPeerTask(req.getRegionId(), req.getFromNode(), req.getNewLeaderNode()));
+    } catch (Exception e) {
+      LOGGER.error(
+          "submit remove region peer task error. region: {}, from: {}.",
+          req.getRegionId(),
+          req.getFromNode().getInternalEndPoint().getIp(),
+          e);
+      submitSucceed = false;
+    }
+    return submitSucceed;
+  }
+
+  /**
+   * remove a region peer
+   *
+   * @param req TMigrateRegionReq
+   * @return submit task succeed?
+   */
+  public synchronized boolean submitRemoveRegionConsensusGroupTask(TMigrateRegionReq req) {
+
+    boolean submitSucceed = true;
+    try {
+      regionMigratePool.submit(
+          new RemoveRegionConsensusGroupTask(req.getRegionId(), req.getFromNode()));
+    } catch (Exception e) {
+      LOGGER.error(
+          "submit remove region consensus group task error. region: {}, from: {}.",
+          req.getRegionId(),
+          req.getFromNode().getInternalEndPoint().getIp(),
+          e);
+      submitSucceed = false;
+    }
+    return submitSucceed;
   }
 
   @Override
@@ -155,160 +183,80 @@ public class RegionMigrateService implements IService {
     }
   }
 
-  private static class RegionMigrateTask implements Runnable {
-    private static final Logger taskLogger = LoggerFactory.getLogger(RegionMigrateTask.class);
+  private static void reportSucceed(TConsensusGroupId tRegionId) {
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    status.setMessage("Region: " + tRegionId + " migrated succeed");
+    TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
+    try {
+      reportRegionMigrateResultToConfigNode(req);
+    } catch (Throwable e) {
+      LOGGER.error(
+          "Report region {} migrate successful result error, result:{}", tRegionId, req, e);
+    }
+  }
+
+  private static void reportFailed(
+      TConsensusGroupId tRegionId,
+      TDataNodeLocation failedNode,
+      TRegionMigrateFailedType failedType,
+      TSStatus status) {
+    TRegionMigrateResultReportReq req =
+        createFailedRequest(tRegionId, failedNode, failedType, status);
+    try {
+      reportRegionMigrateResultToConfigNode(req);
+    } catch (Throwable e) {
+      LOGGER.error("Report region {} migrate failed result error, result:{}", tRegionId, req, e);
+    }
+  }
+
+  private static TRegionMigrateResultReportReq createFailedRequest(
+      TConsensusGroupId tRegionId,
+      TDataNodeLocation failedNode,
+      TRegionMigrateFailedType failedType,
+      TSStatus status) {
+    Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
+    failedNodeAndReason.put(failedNode, failedType);
+    TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
+    req.setFailedNodeAndReason(failedNodeAndReason);
+    return req;
+  }
+
+  private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
+      throws TException {
+    TSStatus status;
+    try (ConfigNodeClient client = new ConfigNodeClient()) {
+      status = client.reportRegionMigrateResult(req);
+      LOGGER.info(
+          "Report region {} migrate result {} to Config node succeed, result: {}",
+          req.getRegionId(),
+          req,
+          status);
+    }
+  }
+
+  private static class AddRegionPeerTask implements Runnable {
+    private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
 
     // migrate which region
     private final TConsensusGroupId tRegionId;
 
-    // migrate from which node
-    private final TDataNodeLocation fromNode;
-
     // migrate to which node
     private final TDataNodeLocation toNode;
 
-    // transfer leader to which node
-    private final TDataNodeLocation newLeaderNode;
-
-    public RegionMigrateTask(
-        TConsensusGroupId tRegionId,
-        TDataNodeLocation fromNode,
-        TDataNodeLocation toNode,
-        TDataNodeLocation newLeaderNode) {
+    public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation toNode) {
       this.tRegionId = tRegionId;
-      this.fromNode = fromNode;
       this.toNode = toNode;
-      this.newLeaderNode = newLeaderNode;
     }
 
     @Override
     public void run() {
       TSStatus runResult = addPeer();
       if (isFailed(runResult)) {
-        reportFailed(toNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
+        reportFailed(tRegionId, toNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
         return;
       }
 
-      changeLeader();
-
-      runResult = removePeer();
-      if (isFailed(runResult)) {
-        reportFailed(fromNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
-      }
-      runResult = removeConsensusGroup();
-      if (isFailed(runResult)) {
-        reportFailed(fromNode, TRegionMigrateFailedType.RemoveConsensusGroupFailed, runResult);
-      }
-
-      runResult = deleteRegion();
-      if (isFailed(runResult)) {
-        reportFailed(fromNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult);
-      }
-
-      reportSucceed();
-    }
-
-    private void changeLeader() {
-      ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
-      taskLogger.debug("start to transfer region {} leader", regionId);
-      try {
-        if (!isLeader(regionId)) {
-          taskLogger.debug("region {} is not leader, no need to transfer", regionId);
-          return;
-        }
-        transferLeader(regionId);
-      } catch (Throwable e) {
-        taskLogger.error(
-            "transfer region {} leader to node {} error",
-            regionId,
-            newLeaderNode.getInternalEndPoint(),
-            e);
-      }
-      taskLogger.debug("finished to change region {} leader", regionId);
-    }
-
-    private void transferLeader(ConsensusGroupId regionId) {
-      taskLogger.debug("transfer region {} leader to {} ", regionId, newLeaderNode);
-      if (regionId instanceof DataRegionId) {
-        Peer newLeaderPeer = new Peer(regionId, newLeaderNode.getDataRegionConsensusEndPoint());
-        DataRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
-      } else {
-        Peer newLeaderPeer = new Peer(regionId, newLeaderNode.getSchemaRegionConsensusEndPoint());
-        SchemaRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
-      }
-    }
-
-    private boolean isLeader(ConsensusGroupId regionId) {
-      if (regionId instanceof DataRegionId) {
-        return DataRegionConsensusImpl.getInstance().isLeader(regionId);
-      }
-      return SchemaRegionConsensusImpl.getInstance().isLeader(regionId);
-    }
-
-    private TSStatus deleteRegion() {
-      TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
-      taskLogger.debug("start to delete region {}", regionId);
-      try {
-        if (regionId instanceof DataRegionId) {
-          StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) regionId);
-        } else {
-          SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId) regionId);
-        }
-      } catch (Throwable e) {
-        taskLogger.error("delete the region {} failed", regionId, e);
-        status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
-        status.setMessage("delete region " + regionId + "failed, " + e.getMessage());
-        return status;
-      }
-      status.setMessage("delete region " + regionId + " succeed");
-      taskLogger.debug("finished to delete region {}", regionId);
-      return status;
-    }
-
-    private void reportSucceed() {
-      TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      status.setMessage("Region: " + tRegionId + " migrated succeed");
-      TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
-      try {
-        reportRegionMigrateResultToConfigNode(req);
-      } catch (Throwable e) {
-        taskLogger.error(
-            "report region {} migrate successful result error, result:{}", tRegionId, req, e);
-      }
-    }
-
-    private void reportFailed(
-        TDataNodeLocation failedNode, TRegionMigrateFailedType failedType, TSStatus status) {
-      TRegionMigrateResultReportReq req = createFailedRequest(failedNode, failedType, status);
-      try {
-        reportRegionMigrateResultToConfigNode(req);
-      } catch (Throwable e) {
-        taskLogger.error(
-            "report region {} migrate failed result error, result:{}", tRegionId, req, e);
-      }
-    }
-
-    private TRegionMigrateResultReportReq createFailedRequest(
-        TDataNodeLocation failedNode, TRegionMigrateFailedType failedType, TSStatus status) {
-      Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
-      failedNodeAndReason.put(failedNode, failedType);
-      TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
-      req.setFailedNodeAndReason(failedNodeAndReason);
-      return req;
-    }
-
-    private void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
-        throws TException {
-      TSStatus status;
-      try (ConfigNodeClient client = new ConfigNodeClient()) {
-        status = client.reportRegionMigrateResult(req);
-        taskLogger.info(
-            "report region {} migrate result {} to Config node succeed, result: {}",
-            tRegionId,
-            req,
-            status);
-      }
+      reportSucceed(tRegionId);
     }
 
     private TSStatus addPeer() {
@@ -316,7 +264,7 @@ public class RegionMigrateService implements IService {
       TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       ConsensusGenericResponse resp = null;
       TEndPoint newPeerNode = getConsensusEndPoint(toNode, regionId);
-      taskLogger.info("start to add peer {} for region {}", newPeerNode, tRegionId);
+      taskLogger.info("Start to add peer {} for region {}", newPeerNode, tRegionId);
       boolean addPeerSucceed = true;
       for (int i = 0; i < RETRY; i++) {
         try {
@@ -350,7 +298,7 @@ public class RegionMigrateService implements IService {
         return status;
       }
 
-      taskLogger.info("succeed to add peer {} for region {}", newPeerNode, regionId);
+      taskLogger.info("Succeed to add peer {} for region {}", newPeerNode, regionId);
       status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       status.setMessage("add peer " + newPeerNode + " for region " + regionId + " succeed");
       return status;
@@ -366,6 +314,48 @@ public class RegionMigrateService implements IService {
       return resp;
     }
 
+    private TEndPoint getConsensusEndPoint(
+        TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
+      if (regionId instanceof DataRegionId) {
+        return nodeLocation.getDataRegionConsensusEndPoint();
+      }
+      return nodeLocation.getSchemaRegionConsensusEndPoint();
+    }
+
+    private boolean isSucceed(TSStatus status) {
+      return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+    }
+
+    private boolean isFailed(TSStatus status) {
+      return !isSucceed(status);
+    }
+  }
+
+  private static class RemoveRegionPeerTask implements Runnable {
+    private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class);
+
+    // migrate which region
+    private final TConsensusGroupId tRegionId;
+
+    // migrate from which node
+    private final TDataNodeLocation fromNode;
+
+    public RemoveRegionPeerTask(
+        TConsensusGroupId tRegionId, TDataNodeLocation fromNode, TDataNodeLocation newLeaderNode) {
+      this.tRegionId = tRegionId;
+      this.fromNode = fromNode;
+    }
+
+    @Override
+    public void run() {
+      TSStatus runResult = removePeer();
+      if (isFailed(runResult)) {
+        reportFailed(tRegionId, fromNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
+      }
+
+      reportSucceed(tRegionId);
+    }
+
     private ConsensusGenericResponse removeRegionPeer(ConsensusGroupId regionId, Peer oldPeer) {
       ConsensusGenericResponse resp;
       if (regionId instanceof DataRegionId) {
@@ -410,15 +400,84 @@ public class RegionMigrateService implements IService {
 
       if (!removePeerSucceed || resp == null || !resp.isSuccess()) {
         taskLogger.error(
-            "remove old peer {} for region {} failed, resp: {}", oldPeerNode, regionId, resp);
+            "Remove old peer {} for region {} failed, resp: {}", oldPeerNode, regionId, resp);
         status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
         status.setMessage("remove old peer " + oldPeerNode + " for region " + regionId + " failed");
         return status;
       }
 
-      taskLogger.info("succeed to remove peer {} for region {}", oldPeerNode, regionId);
+      taskLogger.info("Succeed to remove peer {} for region {}", oldPeerNode, regionId);
       status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      status.setMessage("remove peer " + oldPeerNode + " for region " + regionId + " succeed");
+      status.setMessage("Remove peer " + oldPeerNode + " for region " + regionId + " succeed");
+      return status;
+    }
+
+    private TEndPoint getConsensusEndPoint(
+        TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
+      if (regionId instanceof DataRegionId) {
+        return nodeLocation.getDataRegionConsensusEndPoint();
+      }
+      return nodeLocation.getSchemaRegionConsensusEndPoint();
+    }
+
+    private boolean isSucceed(TSStatus status) {
+      return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+    }
+
+    private boolean isFailed(TSStatus status) {
+      return !isSucceed(status);
+    }
+  }
+
+  private static class RemoveRegionConsensusGroupTask implements Runnable {
+    private static final Logger taskLogger =
+        LoggerFactory.getLogger(RemoveRegionConsensusGroupTask.class);
+
+    // migrate which region
+    private final TConsensusGroupId tRegionId;
+
+    // migrate from which node
+    private final TDataNodeLocation fromNode;
+
+    public RemoveRegionConsensusGroupTask(TConsensusGroupId tRegionId, TDataNodeLocation fromNode) {
+      this.tRegionId = tRegionId;
+      this.fromNode = fromNode;
+    }
+
+    @Override
+    public void run() {
+      TSStatus runResult = removeConsensusGroup();
+      if (isFailed(runResult)) {
+        reportFailed(
+            tRegionId, fromNode, TRegionMigrateFailedType.RemoveConsensusGroupFailed, runResult);
+      }
+
+      runResult = deleteRegion();
+      if (isFailed(runResult)) {
+        reportFailed(tRegionId, fromNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult);
+      }
+
+      reportSucceed(tRegionId);
+    }
+
+    private TSStatus deleteRegion() {
+      TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
+      taskLogger.debug("start to delete region {}", regionId);
+      try {
+        if (regionId instanceof DataRegionId) {
+          StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) regionId);
+        } else {
+          SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId) regionId);
+        }
+      } catch (Throwable e) {
+        taskLogger.error("delete the region {} failed", regionId, e);
+        status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
+        status.setMessage("delete region " + regionId + "failed, " + e.getMessage());
+        return status;
+      }
+      status.setMessage("delete region " + regionId + " succeed");
+      taskLogger.info("Finished to delete region {}", regionId);
       return status;
     }
 
@@ -458,14 +517,6 @@ public class RegionMigrateService implements IService {
       return status;
     }
 
-    private TEndPoint getConsensusEndPoint(
-        TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
-      if (regionId instanceof DataRegionId) {
-        return nodeLocation.getDataRegionConsensusEndPoint();
-      }
-      return nodeLocation.getSchemaRegionConsensusEndPoint();
-    }
-
     private boolean isSucceed(TSStatus status) {
       return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index a7c9a06d23..074a505f59 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -576,10 +576,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     TEndPoint newNode = getConsensusEndPoint(req.getNewLeaderNode(), regionId);
     Peer newLeaderPeer = new Peer(regionId, newNode);
     if (!isLeader(regionId)) {
-      LOGGER.debug("region {} is not leader, no need to change leader", regionId);
+      LOGGER.info("region {} is not leader, no need to change leader", regionId);
       return status;
     }
-    LOGGER.debug("region {} is leader, will change leader", regionId);
+    LOGGER.info("region {} is leader, will change leader", regionId);
     return transferLeader(regionId, newLeaderPeer);
   }
 
@@ -632,6 +632,48 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     return addConsensusGroup(regionId, peers);
   }
 
+  @Override
+  public TSStatus removeRegionPeer(TMigrateRegionReq req) throws TException {
+    TConsensusGroupId regionId = req.getRegionId();
+    String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
+    boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    if (submitSucceed) {
+      LOGGER.info(
+          "succeed to submit a remove region peer task. region: {}, from {}",
+          regionId,
+          req.getFromNode().getInternalEndPoint());
+      return status;
+    }
+    status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+    status.setMessage(
+        "submit region remove region peer task failed, region: "
+            + regionId
+            + ", from "
+            + req.getFromNode().getInternalEndPoint());
+    return status;
+  }
+
+  @Override
+  public TSStatus removeToRegionConsensusGroup(TMigrateRegionReq req) throws TException {
+    TConsensusGroupId regionId = req.getRegionId();
+    String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
+    boolean submitSucceed =
+        RegionMigrateService.getInstance().submitRemoveRegionConsensusGroupTask(req);
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    if (submitSucceed) {
+      LOGGER.info(
+          "succeed to submit a remove region consensus group task. region: {}, from {}",
+          regionId,
+          fromNodeIp);
+      return status;
+    }
+    status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+    status.setMessage(
+        "submit region remove region consensus group task failed, region: " + regionId);
+    return status;
+  }
+
   private TSStatus createNewRegion(ConsensusGroupId regionId, String storageGroup, long ttl) {
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     LOGGER.info("start to create new region {}", regionId);
@@ -681,32 +723,18 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSStatus migrateRegion(TMigrateRegionReq req) throws TException {
+  public TSStatus addRegionPeer(TMigrateRegionReq req) throws TException {
     TConsensusGroupId regionId = req.getRegionId();
-    String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
     String toNodeIp = req.getToNode().getInternalEndPoint().getIp();
-    LOGGER.debug(
-        "start to submit a region migrate task. region: {}, from {} to {}",
-        regionId,
-        fromNodeIp,
-        toNodeIp);
-    boolean submitSucceed = RegionMigrateService.getInstance().submitRegionMigrateTask(req);
+    boolean submitSucceed = RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     if (submitSucceed) {
-      LOGGER.debug(
-          "succeed to submit a region migrate task. region: {}, from {} to {}",
-          regionId,
-          fromNodeIp,
-          toNodeIp);
+      LOGGER.info(
+          "succeed to submit a add region peer task. region: {}, to {}", regionId, toNodeIp);
       return status;
     }
-    LOGGER.error(
-        "failed to submit a region migrate task. region: {}, from {} to {}",
-        regionId,
-        fromNodeIp,
-        toNodeIp);
     status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-    status.setMessage("submit region migrate task failed, region: " + regionId);
+    status.setMessage("submit add region peer task failed, region: " + regionId);
     return status;
   }
 
@@ -723,7 +751,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   private TSStatus addConsensusGroup(ConsensusGroupId regionId, List<Peer> peers) {
-    LOGGER.info("start to add peers {} to region {} consensus group", peers, regionId);
+    LOGGER.info("Start to add consensus group {} to region {}", peers, regionId);
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     ConsensusGenericResponse resp;
     if (regionId instanceof DataRegionId) {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 5f436c12e3..a4544fd50a 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -279,10 +279,23 @@ service IDataNodeRPCService {
   common.TSStatus addToRegionConsensusGroup(TAddConsensusGroup req);
 
   /**
-   * Config node will migrate a region from this node to newNode
-   * @param migrate which region from one node to other node
+   * Config node will add a region peer to a region group
+   * @param add region req which region from one node to other node
    */
-  common.TSStatus migrateRegion(TMigrateRegionReq req);
+  common.TSStatus addRegionPeer(TMigrateRegionReq req);
+
+  /**
+   * Config node will remove a region peer to a region group
+   * @param remove region peer region from one node to other node
+   */
+  common.TSStatus removeRegionPeer(TMigrateRegionReq req);
+
+  /**
+   * Config node will remove a region group from this node to newNode. Usually a region group has
+   * multiple replicas, thus relates to multiple nodes.
+   * @param remove consensus group req which region from one node to other node
+  */
+  common.TSStatus removeToRegionConsensusGroup(TMigrateRegionReq req);
 
   /**
   * Config node will disable the Data node, the Data node will not accept read/write request when disabled