You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/16 13:46:54 UTC

[iotdb] branch beyyes/fix_ratis_migration created (now 864ce1eb68)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a change to branch beyyes/fix_ratis_migration
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 864ce1eb68 fix ratis migration problem

This branch includes the following new commits:

     new 864ce1eb68 fix ratis migration problem

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix ratis migration problem

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/fix_ratis_migration
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 864ce1eb68f51529a7456b780a77af1d668d2857
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Nov 16 21:46:33 2022 +0800

    fix ratis migration problem
---
 .../client/sync/SyncDataNodeClientPool.java        |   3 +-
 .../procedure/env/DataNodeRemoveHandler.java       |  14 +-
 .../impl/statemachine/RegionMigrateProcedure.java  |   3 +
 .../procedure/state/RegionTransitionState.java     |   1 +
 .../apache/iotdb/db/mpp/plan/TestRPCClient.java    |   3 +-
 .../iotdb/db/service/RegionMigrateService.java     | 159 +++++++++++----------
 .../impl/DataNodeInternalRPCServiceImpl.java       |  37 +++--
 thrift/src/main/thrift/datanode.thrift             |  11 +-
 8 files changed, 137 insertions(+), 94 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 61625cc603..921481240b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.mpp.rpc.thrift.TAddPeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -136,7 +137,7 @@ public class SyncDataNodeClientPool {
       case CREATE_NEW_REGION_PEER:
         return client.createNewRegionPeer((TCreatePeerReq) req);
       case ADD_REGION_PEER:
-        return client.addRegionPeer((TMaintainPeerReq) req);
+        return client.addRegionPeer((TAddPeerReq) req);
       case REMOVE_REGION_PEER:
         return client.removeRegionPeer((TMaintainPeerReq) req);
       case DELETE_OLD_REGION_PEER:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index b802fdcc50..e9b46e0069 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import org.apache.iotdb.mpp.rpc.thrift.TAddPeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
@@ -52,6 +53,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
 import static org.apache.iotdb.consensus.ConsensusFactory.MULTI_LEADER_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
 import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
 
 public class DataNodeRemoveHandler {
@@ -241,13 +243,21 @@ public class DataNodeRemoveHandler {
 
     // Send addRegionPeer request to the selected DataNode,
     // destDataNode is where the new RegionReplica is created
-    TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, destDataNode);
+    TAddPeerReq addPeerReq = new TAddPeerReq(regionId, destDataNode);
+    if ((regionId.getType() == TConsensusGroupType.SchemaRegion
+            && CONF.getSchemaRegionConsensusProtocolClass().equals(RATIS_CONSENSUS))
+        || (regionId.getType() == TConsensusGroupType.DataRegion
+            && CONF.getDataRegionConsensusProtocolClass().equals(RATIS_CONSENSUS))) {
+      // For Ratis region group, we need the originalRegionLocations field
+      addPeerReq.setRegionLocations(findRegionLocations(regionId));
+    }
     status =
         SyncDataNodeClientPool.getInstance()
             .sendSyncRequestToDataNodeWithRetry(
                 selectedDataNode.get().getInternalEndPoint(),
-                maintainPeerReq,
+                addPeerReq,
                 DataNodeRequestType.ADD_REGION_PEER);
+
     LOGGER.info(
         "{}, Send action addRegionPeer finished, regionId: {}, rpcDataNode: {},  destDataNode: {}",
         REMOVE_DATANODE_PROCESS,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 07a498c7a1..27571fc120 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -48,6 +50,7 @@ import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
 public class RegionMigrateProcedure
     extends StateMachineProcedure<ConfigNodeProcedureEnv, RegionTransitionState> {
   private static final Logger LOG = LoggerFactory.getLogger(RegionMigrateProcedure.class);
+  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
   private static final int RETRY_THRESHOLD = 5;
 
   /** Wait region migrate finished */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
index a21a6af941..fa2e6f047d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
@@ -23,6 +23,7 @@ public enum RegionTransitionState {
   REGION_MIGRATE_PREPARE,
   CREATE_NEW_REGION_PEER,
   ADD_REGION_PEER,
+  ADD_NEW_PEER_TO_EXISTED_GROUP,
   CHANGE_REGION_LEADER,
   REMOVE_REGION_PEER,
   DELETE_OLD_REGION_PEER,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
index 4a5b4169fe..130f5665f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
 import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
 import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.mpp.rpc.thrift.TAddPeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
 
@@ -106,7 +107,7 @@ public class TestRPCClient {
     try (SyncDataNodeInternalServiceClient client =
         INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
       client.addRegionPeer(
-          new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
+          new TAddPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
     } catch (IOException | TException e) {
       throw new RuntimeException(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 8c37145a4b..acfae19187 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -34,11 +34,14 @@ import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.rescon.AbstractPoolManager;
+import org.apache.iotdb.mpp.rpc.thrift.TAddPeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -46,14 +49,22 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static org.apache.iotdb.consensus.ConsensusFactory.MULTI_LEADER_CONSENSUS;
+
 public class RegionMigrateService implements IService {
   private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
 
   public static final String REMOVE_DATANODE_PROCESS = "[REMOVE_DATANODE_PROCESS]";
 
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final TDataNodeLocation currentNode = new TDataNodeLocation();
+
   private static final int MAX_RETRY_NUM = 5;
 
   private static final int SLEEP_MILLIS = 5000;
@@ -67,57 +78,47 @@ public class RegionMigrateService implements IService {
   }
 
   /**
-   * submit AddRegionPeerTask
+   * Submit AddRegionPeerTask
    *
    * @param req TMaintainPeerReq
    * @return if the submit task succeed
    */
-  public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) {
-
+  public synchronized boolean submitAddRegionPeerTask(TAddPeerReq req) {
     boolean submitSucceed = true;
     try {
-      regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(), req.getDestNode()));
+      regionMigratePool.submit(
+          new AddRegionPeerTask(req.getRegionId(), req.getDestNode(), req.getRegionLocations()));
     } catch (Exception e) {
-      LOGGER.error(
-          "Submit addRegionPeer task error for Region: {} on DataNode: {}.",
-          req.getRegionId(),
-          req.getDestNode().getInternalEndPoint().getIp(),
-          e);
+      LOGGER.error("Submit AddRegionPeerTask error for Region: {}", req.getRegionId(), e);
       submitSucceed = false;
     }
     return submitSucceed;
   }
 
   /**
-   * submit RemoveRegionPeerTask
+   * Submit RemoveRegionPeerTask
    *
    * @param req TMaintainPeerReq
    * @return if the submit task succeed
    */
   public synchronized boolean submitRemoveRegionPeerTask(TMaintainPeerReq req) {
-
     boolean submitSucceed = true;
     try {
       regionMigratePool.submit(new RemoveRegionPeerTask(req.getRegionId(), req.getDestNode()));
     } catch (Exception e) {
-      LOGGER.error(
-          "Submit removeRegionPeer task error for Region: {} on DataNode: {}.",
-          req.getRegionId(),
-          req.getDestNode().getInternalEndPoint().getIp(),
-          e);
+      LOGGER.error("Submit RemoveRegionPeer task error for Region: {}", req.getRegionId(), e);
       submitSucceed = false;
     }
     return submitSucceed;
   }
 
   /**
-   * remove a region peer
+   * Submit DeleteOldRegionPeerTask
    *
    * @param req TMigrateRegionReq
    * @return submit task succeed?
    */
   public synchronized boolean submitDeleteOldRegionPeerTask(TMaintainPeerReq req) {
-
     boolean submitSucceed = true;
     try {
       regionMigratePool.submit(new DeleteOldRegionPeerTask(req.getRegionId(), req.getDestNode()));
@@ -125,7 +126,6 @@ public class RegionMigrateService implements IService {
       LOGGER.error(
           "Submit deleteOldRegionPeerTask error for Region: {} on DataNode: {}.",
           req.getRegionId(),
-          req.getDestNode().getInternalEndPoint().getIp(),
           e);
       submitSucceed = false;
     }
@@ -193,7 +193,7 @@ public class RegionMigrateService implements IService {
       reportRegionMigrateResultToConfigNode(req);
     } catch (Throwable e) {
       LOGGER.error(
-          "Report region {} migrate successful result error, result: {}", tRegionId, req, e);
+          "Report region {} migrate result error in reportSucceed, result: {}", tRegionId, req, e);
     }
   }
 
@@ -207,7 +207,8 @@ public class RegionMigrateService implements IService {
     try {
       reportRegionMigrateResultToConfigNode(req);
     } catch (Throwable e) {
-      LOGGER.error("Report region {} migrate failed result error, result:{}", tRegionId, req, e);
+      LOGGER.error(
+          "Report region {} migrate result error in reportFailed, result:{}", tRegionId, req, e);
     }
   }
 
@@ -239,15 +240,19 @@ public class RegionMigrateService implements IService {
   private static class AddRegionPeerTask implements Runnable {
     private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
 
-    // The RegionGroup that shall perform the add peer process
-    private final TConsensusGroupId tRegionId;
+    private final TConsensusGroupId migrateRegion;
 
-    // The DataNode that selected to perform the add peer process
-    private final TDataNodeLocation selectedDataNode;
+    private final TDataNodeLocation destDataNode;
 
-    public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation selectedDataNode) {
-      this.tRegionId = tRegionId;
-      this.selectedDataNode = selectedDataNode;
+    private final List<TDataNodeLocation> originalDataNodeLocations;
+
+    public AddRegionPeerTask(
+        TConsensusGroupId migrateRegion,
+        TDataNodeLocation destDataNode,
+        List<TDataNodeLocation> originalDataNodeLocations) {
+      this.migrateRegion = migrateRegion;
+      this.destDataNode = destDataNode;
+      this.originalDataNodeLocations = originalDataNodeLocations;
     }
 
     @Override
@@ -255,28 +260,37 @@ public class RegionMigrateService implements IService {
       TSStatus runResult = addPeer();
       if (isFailed(runResult)) {
         reportFailed(
-            tRegionId, selectedDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
+            migrateRegion, destDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
         return;
       }
 
-      reportSucceed(tRegionId, "AddPeer");
+      reportSucceed(migrateRegion, "AddPeer");
     }
 
     private TSStatus addPeer() {
-      ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
+      ConsensusGroupId regionId =
+          ConsensusGroupId.Factory.createFromTConsensusGroupId(migrateRegion);
+      TEndPoint newPeerNode = getConsensusEndPoint(destDataNode, regionId);
+      Peer newPeer = new Peer(regionId, destDataNode.getDataNodeId(), newPeerNode);
+      List<Peer> originalPeers = new ArrayList<>();
+      if (originalDataNodeLocations != null && !originalDataNodeLocations.isEmpty()) {
+        originalDataNodeLocations.forEach(
+            node ->
+                originalPeers.add(
+                    new Peer(
+                        regionId, node.getDataNodeId(), getConsensusEndPoint(node, regionId))));
+      }
+
+      taskLogger.info("Start to addPeer {} for region {}", newPeerNode, migrateRegion);
+      boolean addPeerSucceed = true;
       TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       ConsensusGenericResponse resp = null;
-      TEndPoint newPeerNode = getConsensusEndPoint(selectedDataNode, regionId);
-      taskLogger.info("Start to addPeer {} for region {}", newPeerNode, tRegionId);
-      boolean addPeerSucceed = true;
       for (int i = 0; i < MAX_RETRY_NUM; i++) {
         try {
           if (!addPeerSucceed) {
             Thread.sleep(SLEEP_MILLIS);
           }
-          resp =
-              addRegionPeer(
-                  regionId, new Peer(regionId, selectedDataNode.getDataNodeId(), newPeerNode));
+          resp = addRegionPeer(regionId, newPeer, originalPeers);
           addPeerSucceed = true;
         } catch (Throwable e) {
           addPeerSucceed = false;
@@ -322,31 +336,30 @@ public class RegionMigrateService implements IService {
       return status;
     }
 
-    private ConsensusGenericResponse addRegionPeer(ConsensusGroupId regionId, Peer newPeer) {
+    private ConsensusGenericResponse addRegionPeer(
+        ConsensusGroupId regionId, Peer newPeer, List<Peer> originalPeers) {
+      LOGGER.info(
+          "{}, execute addRegionPeer, regionId: {}, newPeer: {}, originalPeers: {}",
+          REMOVE_DATANODE_PROCESS,
+          regionId,
+          newPeer,
+          originalPeers);
       ConsensusGenericResponse resp;
       if (regionId instanceof DataRegionId) {
-        resp = DataRegionConsensusImpl.getInstance().addPeer(regionId, newPeer);
+        if (MULTI_LEADER_CONSENSUS.equals(config.getDataRegionConsensusProtocolClass())) {
+          resp = DataRegionConsensusImpl.getInstance().addPeer(regionId, newPeer);
+        } else {
+          resp =
+              DataRegionConsensusImpl.getInstance()
+                  .addNewNodeToExistedGroup(regionId, newPeer, originalPeers);
+        }
       } else {
-        resp = SchemaRegionConsensusImpl.getInstance().addPeer(regionId, newPeer);
+        resp =
+            SchemaRegionConsensusImpl.getInstance()
+                .addNewNodeToExistedGroup(regionId, newPeer, originalPeers);
       }
       return resp;
     }
-
-    private TEndPoint getConsensusEndPoint(
-        TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
-      if (regionId instanceof DataRegionId) {
-        return nodeLocation.getDataRegionConsensusEndPoint();
-      }
-      return nodeLocation.getSchemaRegionConsensusEndPoint();
-    }
-
-    private boolean isSucceed(TSStatus status) {
-      return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
-    }
-
-    private boolean isFailed(TSStatus status) {
-      return !isSucceed(status);
-    }
   }
 
   private static class RemoveRegionPeerTask implements Runnable {
@@ -441,22 +454,6 @@ public class RegionMigrateService implements IService {
       status.setMessage("Remove peer " + oldPeerNode + " for region " + regionId + " succeed");
       return status;
     }
-
-    private TEndPoint getConsensusEndPoint(
-        TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
-      if (regionId instanceof DataRegionId) {
-        return nodeLocation.getDataRegionConsensusEndPoint();
-      }
-      return nodeLocation.getSchemaRegionConsensusEndPoint();
-    }
-
-    private boolean isSucceed(TSStatus status) {
-      return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
-    }
-
-    private boolean isFailed(TSStatus status) {
-      return !isSucceed(status);
-    }
   }
 
   private static class DeleteOldRegionPeerTask implements Runnable {
@@ -539,13 +536,21 @@ public class RegionMigrateService implements IService {
       taskLogger.info("Finished to delete region {}", regionId);
       return status;
     }
+  }
 
-    private boolean isSucceed(TSStatus status) {
-      return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
-    }
+  private static boolean isSucceed(TSStatus status) {
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+  }
+
+  private static boolean isFailed(TSStatus status) {
+    return !isSucceed(status);
+  }
 
-    private boolean isFailed(TSStatus status) {
-      return !isSucceed(status);
+  private static TEndPoint getConsensusEndPoint(
+      TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
+    if (regionId instanceof DataRegionId) {
+      return nodeLocation.getDataRegionConsensusEndPoint();
     }
+    return nodeLocation.getSchemaRegionConsensusEndPoint();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index f691488eb6..1820c877a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.service.thrift.impl;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -121,6 +122,7 @@ import org.apache.iotdb.metrics.type.Gauge;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TAddPeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
@@ -202,6 +204,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.consensus.ConsensusFactory.MULTI_LEADER_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.REMOVE_DATANODE_PROCESS;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 
@@ -1217,24 +1221,35 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   public TSStatus createNewRegionPeer(TCreatePeerReq req) throws TException {
     ConsensusGroupId regionId =
         ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
-    List<Peer> peers =
-        req.getRegionLocations().stream()
-            .map(
-                location ->
-                    new Peer(
-                        regionId,
-                        location.getDataNodeId(),
-                        getConsensusEndPoint(location, regionId)))
-            .collect(Collectors.toList());
     TSStatus status = createNewRegion(regionId, req.getStorageGroup(), req.getTtl());
     if (!isSucceed(status)) {
       return status;
     }
-    return createNewRegionPeer(regionId, peers);
+
+    // Only execute createNewRegionPeer method when DataRegion adopts MULTI_LEADER_CONSENSUS
+    if (TConsensusGroupType.DataRegion.equals(req.getRegionId().getType())
+        && MULTI_LEADER_CONSENSUS.equals(config.getDataRegionConsensusProtocolClass())) {
+      LOGGER.info(
+          "{}, execute createNewRegionPeer for MULTI_LEADER_CONSENSUS DataRegion: {}",
+          REMOVE_DATANODE_PROCESS,
+          regionId);
+      List<Peer> peers =
+          req.getRegionLocations().stream()
+              .map(
+                  location ->
+                      new Peer(
+                          regionId,
+                          location.getDataNodeId(),
+                          getConsensusEndPoint(location, regionId)))
+              .collect(Collectors.toList());
+      return createNewRegionPeer(regionId, peers);
+    }
+
+    return status;
   }
 
   @Override
-  public TSStatus addRegionPeer(TMaintainPeerReq req) throws TException {
+  public TSStatus addRegionPeer(TAddPeerReq req) throws TException {
     TConsensusGroupId regionId = req.getRegionId();
     String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
     boolean submitSucceed = RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 174847cb84..74ff088777 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -47,6 +47,12 @@ struct TCreatePeerReq {
   4: optional i64 ttl
 }
 
+struct TAddPeerReq {
+  1: required common.TConsensusGroupId regionId
+  2: required common.TDataNodeLocation destNode
+  3: optional list<common.TDataNodeLocation> regionLocations
+}
+
 struct TMaintainPeerReq {
   1: required common.TConsensusGroupId regionId
   2: required common.TDataNodeLocation destNode
@@ -430,10 +436,11 @@ service IDataNodeRPCService {
 
   /**
    * Add a Region peer to the specified RegionGroup
+   * Notice that: Ratis Consensuse need regionLocations field, Multileader Consensuse donot need
    *
-   * @param TMaintainPeerReq which contains RegionId and the DataNodeLocation that selected to perform the add peer process
+   * @param TAddPeerReq which contains RegionId and the DataNodeLocation that selected to perform the add peer process
    */
-  common.TSStatus addRegionPeer(TMaintainPeerReq req);
+  common.TSStatus addRegionPeer(TAddPeerReq req);
 
   /**
    * Remove a Region peer from the specified RegionGroup