You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/24 13:10:38 UTC

[iotdb] branch beyyes/perfect_region_migrate_process created (now 5e12b17290)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a change to branch beyyes/perfect_region_migrate_process
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 5e12b17290 make code of region migrate process tidy

This branch includes the following new commits:

     new 5e12b17290 make code of region migrate process tidy

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: make code of region migrate process tidy

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/perfect_region_migrate_process
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5e12b172904128370736b5fc79a3b3c0db703bbb
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Thu Nov 24 21:09:34 2022 +0800

    make code of region migrate process tidy
---
 .../procedure/env/DataNodeRemoveHandler.java       |  15 +-
 .../iotdb/db/service/RegionMigrateService.java     | 352 +++++++++------------
 .../impl/DataNodeInternalRPCServiceImpl.java       |  22 +-
 3 files changed, 185 insertions(+), 204 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 877a627ee2..d97859d37a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -178,8 +178,17 @@ public class DataNodeRemoveHandler {
       return status;
     }
 
-    List<TDataNodeLocation> currentPeerNodes = new ArrayList<>(regionReplicaNodes);
-    currentPeerNodes.add(destDataNode);
+    List<TDataNodeLocation> currentPeerNodes;
+    if (TConsensusGroupType.DataRegion.equals(regionId.getType())
+        && MULTI_LEADER_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
+      // parameter of createPeer for MultiLeader should be all peers
+      currentPeerNodes = new ArrayList<>(regionReplicaNodes);
+      currentPeerNodes.add(destDataNode);
+    } else {
+      // parameter of createPeer for Ratis can be empty
+      currentPeerNodes = Collections.emptyList();
+    }
+
     String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
     TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, storageGroup);
     // TODO replace with real ttl
@@ -278,7 +287,7 @@ public class DataNodeRemoveHandler {
     // Here we pick the DataNode who contains one of the RegionReplica of the specified
     // ConsensusGroup except the origin one
     // in order to notify the new ConsensusGroup that the origin peer should secede now
-    // if the selectedDataNode equals null, we choose the destDataNode to execute the method
+    // If the selectedDataNode equals null, we choose the destDataNode to execute the method
     Optional<TDataNodeLocation> selectedDataNode =
         filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
     rpcClientDataNode = selectedDataNode.orElse(destDataNode);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index fc68469a3d..bc6c25f131 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -52,7 +52,7 @@ import java.util.Map;
 public class RegionMigrateService implements IService {
   private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
 
-  public static final String REMOVE_DATANODE_PROCESS = "[REMOVE_DATANODE_PROCESS]";
+  public static final String REGION_MIGRATE_PROCESS = "[REGION_MIGRATE_PROCESS]";
 
   private static final int MAX_RETRY_NUM = 5;
 
@@ -67,7 +67,7 @@ public class RegionMigrateService implements IService {
   }
 
   /**
-   * submit AddRegionPeerTask
+   * Submit AddRegionPeerTask
    *
    * @param req TMaintainPeerReq
    * @return if the submit task succeed
@@ -79,9 +79,9 @@ public class RegionMigrateService implements IService {
       regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(), req.getDestNode()));
     } catch (Exception e) {
       LOGGER.error(
-          "Submit addRegionPeer task error for Region: {} on DataNode: {}.",
+          "{}, Submit AddRegionPeerTask error for Region: {}",
+          REGION_MIGRATE_PROCESS,
           req.getRegionId(),
-          req.getDestNode().getInternalEndPoint().getIp(),
           e);
       submitSucceed = false;
     }
@@ -89,7 +89,7 @@ public class RegionMigrateService implements IService {
   }
 
   /**
-   * submit RemoveRegionPeerTask
+   * Submit RemoveRegionPeerTask
    *
    * @param req TMaintainPeerReq
    * @return if the submit task succeed
@@ -101,9 +101,9 @@ public class RegionMigrateService implements IService {
       regionMigratePool.submit(new RemoveRegionPeerTask(req.getRegionId(), req.getDestNode()));
     } catch (Exception e) {
       LOGGER.error(
-          "Submit removeRegionPeer task error for Region: {} on DataNode: {}.",
+          "{}, Submit RemoveRegionPeer task error for Region: {}",
+          REGION_MIGRATE_PROCESS,
           req.getRegionId(),
-          req.getDestNode().getInternalEndPoint().getIp(),
           e);
       submitSucceed = false;
     }
@@ -111,10 +111,10 @@ public class RegionMigrateService implements IService {
   }
 
   /**
-   * remove a region peer
+   * Submit DeleteOldRegionPeerTask
    *
    * @param req TMigrateRegionReq
-   * @return submit task succeed?
+   * @return if the submit task succeed
    */
   public synchronized boolean submitDeleteOldRegionPeerTask(TMaintainPeerReq req) {
 
@@ -123,9 +123,9 @@ public class RegionMigrateService implements IService {
       regionMigratePool.submit(new DeleteOldRegionPeerTask(req.getRegionId(), req.getDestNode()));
     } catch (Exception e) {
       LOGGER.error(
-          "Submit deleteOldRegionPeerTask error for Region: {} on DataNode: {}.",
+          "{}, Submit DeleteOldRegionPeerTask error for Region: {}",
+          REGION_MIGRATE_PROCESS,
           req.getRegionId(),
-          req.getDestNode().getInternalEndPoint().getIp(),
           e);
       submitSucceed = false;
     }
@@ -152,17 +152,10 @@ public class RegionMigrateService implements IService {
     return ServiceType.DATA_NODE_REGION_MIGRATE_SERVICE;
   }
 
-  private static class Holder {
-    private static final RegionMigrateService INSTANCE = new RegionMigrateService();
-
-    private Holder() {}
-  }
-
   private static class RegionMigratePool extends AbstractPoolManager {
     private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class);
 
     private RegionMigratePool() {
-      // migrate region one by one
       this.pool = IoTDBThreadPoolFactory.newSingleThreadExecutor("Region-Migrate-Pool");
     }
 
@@ -184,78 +177,25 @@ public class RegionMigrateService implements IService {
     }
   }
 
-  private static void reportSucceed(TConsensusGroupId tRegionId, String migrateState) {
-    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    status.setMessage(
-        String.format("Region:%s, state: %s, executed succeed", tRegionId, migrateState));
-    TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
-    try {
-      reportRegionMigrateResultToConfigNode(req);
-    } catch (Throwable e) {
-      LOGGER.error(
-          "Report region {} migrate successful result error, result: {}", tRegionId, req, e);
-    }
-  }
-
-  private static void reportFailed(
-      TConsensusGroupId tRegionId,
-      TDataNodeLocation failedNode,
-      TRegionMigrateFailedType failedType,
-      TSStatus status) {
-    TRegionMigrateResultReportReq req =
-        createFailedRequest(tRegionId, failedNode, failedType, status);
-    try {
-      reportRegionMigrateResultToConfigNode(req);
-    } catch (Throwable e) {
-      LOGGER.error("Report region {} migrate failed result error, result:{}", tRegionId, req, e);
-    }
-  }
-
-  private static TRegionMigrateResultReportReq createFailedRequest(
-      TConsensusGroupId tRegionId,
-      TDataNodeLocation failedNode,
-      TRegionMigrateFailedType failedType,
-      TSStatus status) {
-    Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
-    failedNodeAndReason.put(failedNode, failedType);
-    TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
-    req.setFailedNodeAndReason(failedNodeAndReason);
-    return req;
-  }
-
-  private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
-      throws TException {
-    TSStatus status;
-    try (ConfigNodeClient client = new ConfigNodeClient()) {
-      status = client.reportRegionMigrateResult(req);
-      LOGGER.info(
-          "Report region {} migrate result {} to Config node succeed, result: {}",
-          req.getRegionId(),
-          req,
-          status);
-    }
-  }
-
   private static class AddRegionPeerTask implements Runnable {
     private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
 
     // The RegionGroup that shall perform the add peer process
     private final TConsensusGroupId tRegionId;
 
-    // The DataNode that selected to perform the add peer process
-    private final TDataNodeLocation selectedDataNode;
+    // The new DataNode to be added in the RegionGroup
+    private final TDataNodeLocation destDataNode;
 
-    public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation selectedDataNode) {
+    public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) {
       this.tRegionId = tRegionId;
-      this.selectedDataNode = selectedDataNode;
+      this.destDataNode = destDataNode;
     }
 
     @Override
     public void run() {
       TSStatus runResult = addPeer();
       if (isFailed(runResult)) {
-        reportFailed(
-            tRegionId, selectedDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
+        reportFailed(tRegionId, destDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
         return;
       }
 
@@ -263,11 +203,12 @@ public class RegionMigrateService implements IService {
     }
 
     private TSStatus addPeer() {
+      taskLogger.info(
+          "{}, Start to addPeer {} for region {}", REGION_MIGRATE_PROCESS, destDataNode, tRegionId);
       ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
       TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       ConsensusGenericResponse resp = null;
-      TEndPoint newPeerNode = getConsensusEndPoint(selectedDataNode, regionId);
-      taskLogger.info("Start to addPeer {} for region {}", newPeerNode, tRegionId);
+      TEndPoint destEndpoint = getConsensusEndPoint(destDataNode, regionId);
       boolean addPeerSucceed = true;
       for (int i = 0; i < MAX_RETRY_NUM; i++) {
         try {
@@ -276,49 +217,40 @@ public class RegionMigrateService implements IService {
           }
           resp =
               addRegionPeer(
-                  regionId, new Peer(regionId, selectedDataNode.getDataNodeId(), newPeerNode));
-          addPeerSucceed = true;
+                  regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndpoint));
         } catch (Throwable e) {
           addPeerSucceed = false;
           taskLogger.error(
               "{}, executed addPeer {} for region {} error, retry times: {}",
-              REMOVE_DATANODE_PROCESS,
-              newPeerNode,
+              REGION_MIGRATE_PROCESS,
+              destEndpoint,
               regionId,
               i,
               e);
-          status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-          status.setMessage(
-              String.format(
-                  "Add peer for region error, peerId: %s, regionId: %s, errorMessage: %s",
-                  newPeerNode, regionId, e.getMessage()));
         }
         if (addPeerSucceed && resp != null && resp.isSuccess()) {
           break;
         }
       }
+
       if (!addPeerSucceed || resp == null || !resp.isSuccess()) {
-        taskLogger.error(
-            "{}, Add new peer {} for region {} failed, resp: {}",
-            REMOVE_DATANODE_PROCESS,
-            newPeerNode,
-            regionId,
-            resp);
-        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-        status.setMessage(
+        String errorMsg =
             String.format(
-                "Add peer for region error, peerId: %s, regionId: %s, resp: %s",
-                newPeerNode, regionId, resp));
+                "%s, AddPeer for region error after max retry times, peerId: %s, regionId: %s, resp: %s",
+                REGION_MIGRATE_PROCESS, destEndpoint, regionId, resp);
+        taskLogger.error(errorMsg);
+        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+        status.setMessage(errorMsg);
         return status;
       }
 
       taskLogger.info(
           "{}, Succeed to addPeer {} for region {}",
-          REMOVE_DATANODE_PROCESS,
-          newPeerNode,
+          REGION_MIGRATE_PROCESS,
+          destEndpoint,
           regionId);
       status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      status.setMessage("add peer " + newPeerNode + " for region " + regionId + " succeed");
+      status.setMessage("addPeer " + destEndpoint + " for region " + regionId + " succeed");
       return status;
     }
 
@@ -331,22 +263,6 @@ public class RegionMigrateService implements IService {
       }
       return resp;
     }
-
-    private TEndPoint getConsensusEndPoint(
-        TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
-      if (regionId instanceof DataRegionId) {
-        return nodeLocation.getDataRegionConsensusEndPoint();
-      }
-      return nodeLocation.getSchemaRegionConsensusEndPoint();
-    }
-
-    private boolean isSucceed(TSStatus status) {
-      return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
-    }
-
-    private boolean isFailed(TSStatus status) {
-      return !isSucceed(status);
-    }
   }
 
   private static class RemoveRegionPeerTask implements Runnable {
@@ -354,11 +270,11 @@ public class RegionMigrateService implements IService {
 
     private final TConsensusGroupId tRegionId;
 
-    private final TDataNodeLocation selectedDataNode;
+    private final TDataNodeLocation destDataNode;
 
-    public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation selectedDataNode) {
+    public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) {
       this.tRegionId = tRegionId;
-      this.selectedDataNode = selectedDataNode;
+      this.destDataNode = destDataNode;
     }
 
     @Override
@@ -367,29 +283,18 @@ public class RegionMigrateService implements IService {
       if (isSucceed(runResult)) {
         reportSucceed(tRegionId, "RemovePeer");
       } else {
-        reportFailed(
-            tRegionId, selectedDataNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
-      }
-    }
-
-    private ConsensusGenericResponse removeRegionPeer(ConsensusGroupId regionId, Peer oldPeer) {
-      ConsensusGenericResponse resp;
-      if (regionId instanceof DataRegionId) {
-        resp = DataRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer);
-      } else {
-        resp = SchemaRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer);
+        reportFailed(tRegionId, destDataNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
       }
-      return resp;
     }
 
     private TSStatus removePeer() {
       ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
       TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      TEndPoint oldPeerNode = getConsensusEndPoint(selectedDataNode, regionId);
+      TEndPoint destEndPoint = getConsensusEndPoint(destDataNode, regionId);
       taskLogger.info(
-          "{}, Start to remove peer {} for region {}",
-          REMOVE_DATANODE_PROCESS,
-          oldPeerNode,
+          "{}, Start to removePeer {} for region {}",
+          REGION_MIGRATE_PROCESS,
+          destEndPoint,
           regionId);
       ConsensusGenericResponse resp = null;
       boolean removePeerSucceed = true;
@@ -400,20 +305,16 @@ public class RegionMigrateService implements IService {
           }
           resp =
               removeRegionPeer(
-                  regionId, new Peer(regionId, selectedDataNode.getDataNodeId(), oldPeerNode));
-          removePeerSucceed = true;
+                  regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndPoint));
         } catch (Throwable e) {
           removePeerSucceed = false;
           taskLogger.error(
-              "Remove peer {} for region {} error, retry times: {}", oldPeerNode, regionId, i, e);
-          status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-          status.setMessage(
-              "remove peer: "
-                  + oldPeerNode
-                  + " for region: "
-                  + regionId
-                  + " error. exception: "
-                  + e.getMessage());
+              "{}, executed removePeer {} for region {} error, retry times: {}",
+              REGION_MIGRATE_PROCESS,
+              destEndPoint,
+              regionId,
+              i,
+              e);
         }
         if (removePeerSucceed && resp != null && resp.isSuccess()) {
           break;
@@ -421,41 +322,34 @@ public class RegionMigrateService implements IService {
       }
 
       if (!removePeerSucceed || resp == null || !resp.isSuccess()) {
-        taskLogger.error(
-            "{}, Remove old peer {} for region {} failed, resp: {}",
-            REMOVE_DATANODE_PROCESS,
-            oldPeerNode,
-            regionId,
-            resp);
+        String errorMsg =
+            String.format(
+                "%s, RemovePeer for region error after max retry times, peerId: %s, regionId: %s, resp: %s",
+                REGION_MIGRATE_PROCESS, destEndPoint, regionId, resp);
+        taskLogger.error(errorMsg);
         status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-        status.setMessage("remove old peer " + oldPeerNode + " for region " + regionId + " failed");
+        status.setMessage(errorMsg);
         return status;
       }
 
       taskLogger.info(
-          "{}, Succeed to remove peer {} for region {}",
-          REMOVE_DATANODE_PROCESS,
-          oldPeerNode,
+          "{}, Succeed to removePeer {} for region {}",
+          REGION_MIGRATE_PROCESS,
+          destEndPoint,
           regionId);
       status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      status.setMessage("Remove peer " + oldPeerNode + " for region " + regionId + " succeed");
+      status.setMessage("removePeer " + destEndPoint + " for region " + regionId + " succeed");
       return status;
     }
 
-    private TEndPoint getConsensusEndPoint(
-        TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
+    private ConsensusGenericResponse removeRegionPeer(ConsensusGroupId regionId, Peer oldPeer) {
+      ConsensusGenericResponse resp;
       if (regionId instanceof DataRegionId) {
-        return nodeLocation.getDataRegionConsensusEndPoint();
+        resp = DataRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer);
+      } else {
+        resp = SchemaRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer);
       }
-      return nodeLocation.getSchemaRegionConsensusEndPoint();
-    }
-
-    private boolean isSucceed(TSStatus status) {
-      return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
-    }
-
-    private boolean isFailed(TSStatus status) {
-      return !isSucceed(status);
+      return resp;
     }
   }
 
@@ -464,32 +358,43 @@ public class RegionMigrateService implements IService {
 
     private final TConsensusGroupId tRegionId;
 
-    private final TDataNodeLocation fromNode;
+    private final TDataNodeLocation originalDataNode;
 
-    public DeleteOldRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation fromNode) {
+    public DeleteOldRegionPeerTask(
+        TConsensusGroupId tRegionId, TDataNodeLocation originalDataNode) {
       this.tRegionId = tRegionId;
-      this.fromNode = fromNode;
+      this.originalDataNode = originalDataNode;
     }
 
     @Override
     public void run() {
-      TSStatus runResult = deleteOldRegionPeer();
+      // deletePeer: remove the peer from the consensus group
+      TSStatus runResult = deletePeer();
       if (isFailed(runResult)) {
         reportFailed(
-            tRegionId, fromNode, TRegionMigrateFailedType.RemoveConsensusGroupFailed, runResult);
+            tRegionId,
+            originalDataNode,
+            TRegionMigrateFailedType.RemoveConsensusGroupFailed,
+            runResult);
       }
 
+      // deleteRegion: delete region data
       runResult = deleteRegion();
       if (isFailed(runResult)) {
-        reportFailed(tRegionId, fromNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult);
+        reportFailed(
+            tRegionId, originalDataNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult);
       }
 
       reportSucceed(tRegionId, "DeletePeer");
     }
 
-    private TSStatus deleteOldRegionPeer() {
+    private TSStatus deletePeer() {
+      taskLogger.info(
+          "{}, Start to deletePeer {} for region {}",
+          REGION_MIGRATE_PROCESS,
+          originalDataNode,
+          tRegionId);
       ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
-      taskLogger.info("Start to deleteOldRegionPeer: {}", regionId);
       TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       ConsensusGenericResponse resp;
       try {
@@ -499,30 +404,31 @@ public class RegionMigrateService implements IService {
           resp = SchemaRegionConsensusImpl.getInstance().deletePeer(regionId);
         }
       } catch (Throwable e) {
-        taskLogger.error("DeleteOldRegionPeer error, regionId: {}", regionId, e);
+        taskLogger.error("{}, deletePeer error, regionId: {}", regionId, e);
         status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
         status.setMessage(
-            "deleteOldRegionPeer for region: " + regionId + " error. exception: " + e.getMessage());
+            "deletePeer for region: " + regionId + " error. exception: " + e.getMessage());
         return status;
       }
       if (!resp.isSuccess()) {
-        taskLogger.error("DeleteOldRegionPeer error, regionId: {}", regionId, resp.getException());
-        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-        status.setMessage(
+        String errorMsg =
             String.format(
-                "deleteOldRegionPeer error, regionId: %s, errorMessage: %s",
-                regionId, resp.getException().getMessage()));
+                "deletePeer error, regionId: %s, errorMessage: %s",
+                regionId, resp.getException().getMessage());
+        taskLogger.error(errorMsg);
+        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+        status.setMessage(errorMsg);
         return status;
       }
-      taskLogger.info("Succeed to remove region {} consensus group", regionId);
-      status.setMessage("remove region consensus group " + regionId + "succeed");
+      taskLogger.info("Succeed to deletePeer {} from consensus group", regionId);
+      status.setMessage("deletePeer from consensus group " + regionId + "succeed");
       return status;
     }
 
     private TSStatus deleteRegion() {
       TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
-      taskLogger.info("Start to delete region {}", regionId);
+      taskLogger.info("{}, Start to deleteRegion {}", REGION_MIGRATE_PROCESS, regionId);
       try {
         if (regionId instanceof DataRegionId) {
           StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) regionId);
@@ -530,22 +436,78 @@ public class RegionMigrateService implements IService {
           SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId) regionId);
         }
       } catch (Throwable e) {
-        taskLogger.error("Delete the region {} failed", regionId, e);
+        taskLogger.error("deleteRegion {} failed", regionId, e);
         status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
-        status.setMessage("Delete region " + regionId + "failed, " + e.getMessage());
+        status.setMessage("deleteRegion " + regionId + "failed, " + e.getMessage());
         return status;
       }
-      status.setMessage("delete region " + regionId + " succeed");
-      taskLogger.info("Finished to delete region {}", regionId);
+      status.setMessage("deleteRegion " + regionId + " succeed");
+      taskLogger.info("Succeed to deleteRegion {}", regionId);
       return status;
     }
+  }
+
+  private static class Holder {
+    private static final RegionMigrateService INSTANCE = new RegionMigrateService();
+
+    private Holder() {}
+  }
+
+  private static void reportSucceed(TConsensusGroupId tRegionId, String migrateState) {
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    status.setMessage(
+        String.format("Region: %s, state: %s, executed succeed", tRegionId, migrateState));
+    TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
+    try {
+      reportRegionMigrateResultToConfigNode(req);
+    } catch (Throwable e) {
+      LOGGER.error(
+          "Report region {} migrate result error in reportSucceed, result: {}", tRegionId, req, e);
+    }
+  }
+
+  private static void reportFailed(
+      TConsensusGroupId tRegionId,
+      TDataNodeLocation failedNode,
+      TRegionMigrateFailedType failedType,
+      TSStatus status) {
+    Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
+    failedNodeAndReason.put(failedNode, failedType);
+    TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
+    req.setFailedNodeAndReason(failedNodeAndReason);
+    try {
+      reportRegionMigrateResultToConfigNode(req);
+    } catch (Throwable e) {
+      LOGGER.error("Report region {} migrate error in reportFailed, result:{}", tRegionId, req, e);
+    }
+  }
 
-    private boolean isSucceed(TSStatus status) {
-      return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+  private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
+      throws TException {
+    TSStatus status;
+    try (ConfigNodeClient client = new ConfigNodeClient()) {
+      status = client.reportRegionMigrateResult(req);
+      LOGGER.info(
+          "Report region {} migrate result {} to Config node succeed, result: {}",
+          req.getRegionId(),
+          req,
+          status);
     }
+  }
+
+  private static boolean isSucceed(TSStatus status) {
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+  }
+
+  private static boolean isFailed(TSStatus status) {
+    return !isSucceed(status);
+  }
 
-    private boolean isFailed(TSStatus status) {
-      return !isSucceed(status);
+  private static TEndPoint getConsensusEndPoint(
+      TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
+    if (regionId instanceof DataRegionId) {
+      return nodeLocation.getDataRegionConsensusEndPoint();
     }
+    return nodeLocation.getSchemaRegionConsensusEndPoint();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 3d76821ff8..0be55c3a0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -201,6 +201,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.service.RegionMigrateService.REGION_MIGRATE_PROCESS;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 
@@ -1261,7 +1262,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     if (submitSucceed) {
       LOGGER.info(
-          "Successfully submit addRegionPeer task for region: {} on DataNode: {}",
+          "Successfully submit addRegionPeer task for region: {}, target DataNode: {}",
           regionId,
           selectedDataNodeIP);
       return status;
@@ -1279,7 +1280,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     if (submitSucceed) {
       LOGGER.info(
-          "Successfully submit removeRegionPeer task for region: {} on DataNode: {}",
+          "Successfully submit removeRegionPeer task for region: {}, DataNode to be removed: {}",
           regionId,
           selectedDataNodeIP);
       return status;
@@ -1297,7 +1298,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     if (submitSucceed) {
       LOGGER.info(
-          "Successfully submit deleteOldRegionPeer task for region: {} on DataNode: {}",
+          "Successfully submit deleteOldRegionPeer task for region: {}, DataNode to be removed: {}",
           regionId,
           selectedDataNodeIP);
       return status;
@@ -1455,7 +1456,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   private TSStatus createNewRegionPeer(ConsensusGroupId regionId, List<Peer> peers) {
-    LOGGER.info("Start to createNewRegionPeer {} to region {}", peers, regionId);
+    LOGGER.info(
+        "{}, Start to createNewRegionPeer {} to region {}",
+        REGION_MIGRATE_PROCESS,
+        peers,
+        regionId);
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     ConsensusGenericResponse resp;
     if (regionId instanceof DataRegionId) {
@@ -1465,7 +1470,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     }
     if (!resp.isSuccess()) {
       LOGGER.error(
-          "CreateNewRegionPeer error, peers: {}, regionId: {}, errorMessage",
+          "{}, CreateNewRegionPeer error, peers: {}, regionId: {}, errorMessage",
+          REGION_MIGRATE_PROCESS,
           peers,
           regionId,
           resp.getException());
@@ -1473,7 +1479,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
       status.setMessage(resp.getException().getMessage());
       return status;
     }
-    LOGGER.info("Succeed to createNewRegionPeer {} for region {}", peers, regionId);
+    LOGGER.info(
+        "{}, Succeed to createNewRegionPeer {} for region {}",
+        REGION_MIGRATE_PROCESS,
+        peers,
+        regionId);
     status.setMessage("createNewRegionPeer succeed, regionId: " + regionId);
     return status;
   }