You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/01/14 08:09:41 UTC

[iotdb] 01/04: fix some issues of multi-raft

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 109d432adecdee83d2881e54d07a91ddc87e1761
Author: lta <li...@163.com>
AuthorDate: Tue Jan 5 11:48:53 2021 +0800

    fix some issues of multi-raft
---
 .../resources/conf/iotdb-cluster.properties        |  2 +-
 .../cluster/client/sync/SyncClientAdaptor.java     |  4 +--
 .../cluster/log/catchup/SnapshotCatchUpTask.java   |  1 +
 .../manage/FilePartitionedSnapshotLogManager.java  |  2 +-
 .../iotdb/cluster/log/snapshot/FileSnapshot.java   | 11 ++++---
 .../cluster/log/snapshot/PullSnapshotTask.java     |  3 +-
 .../log/snapshot/PullSnapshotTaskDescriptor.java   |  3 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  1 +
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |  1 +
 .../cluster/partition/slot/SlotPartitionTable.java |  9 ++---
 .../iotdb/cluster/server/DataClusterServer.java    | 18 +++++-----
 .../iotdb/cluster/server/MetaClusterServer.java    | 16 ++++-----
 .../cluster/server/member/DataGroupMember.java     |  9 ++---
 .../cluster/server/member/MetaGroupMember.java     | 38 +++++++++++++---------
 .../cluster/server/service/BaseAsyncService.java   |  4 +--
 .../cluster/server/service/BaseSyncService.java    |  4 +--
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |  8 ++++-
 .../cluster/client/sync/SyncClientAdaptorTest.java |  7 ++--
 .../iotdb/cluster/common/TestAsyncDataClient.java  |  2 +-
 .../cluster/log/snapshot/DataSnapshotTest.java     |  7 ++--
 .../cluster/log/snapshot/PullSnapshotTaskTest.java |  2 +-
 .../cluster/server/member/DataGroupMemberTest.java |  2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  4 +--
 thrift/src/main/thrift/cluster.thrift              |  6 ++--
 24 files changed, 93 insertions(+), 71 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 7cd8c22..4b9b2f5 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -63,7 +63,7 @@ max_concurrent_client_num=10000
 default_replica_num=2
 
 # sub raft num for multi-raft
-multi_raft_factor=2
+multi_raft_factor=1
 
 # cluster name to identify different clusters
 # all node's cluster_name in one cluster are the same
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 7a31957..019a08b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -387,12 +387,12 @@ public class SyncClientAdaptor {
   }
 
   public static ByteBuffer readFile(AsyncDataClient client, String remotePath, long offset,
-      int fetchSize)
+      int fetchSize, int raftId)
       throws InterruptedException, TException {
     AtomicReference<ByteBuffer> result = new AtomicReference<>();
     GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), result);
 
-    client.readFile(remotePath, offset, fetchSize, handler);
+    client.readFile(remotePath, offset, fetchSize, raftId, handler);
     return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
index 548a2db..483ba64 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
@@ -59,6 +59,7 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
   private void doSnapshotCatchUp()
       throws TException, InterruptedException, LeaderUnknownException {
     SendSnapshotRequest request = new SendSnapshotRequest();
+    request.setRaftId(raftMember.getRaftGroupId());
     if (raftMember.getHeader() != null) {
       request.setHeader(raftMember.getHeader());
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index a3b0153..79f3cd1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -113,7 +113,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
     // 1.collect tsfile
     collectTsFiles();
 
-    //2.register the measurement
+    // 2.register the measurement
     for (Map.Entry<Integer, Collection<TimeseriesSchema>> entry : slotTimeseries.entrySet()) {
       int slotNum = entry.getKey();
       FileSnapshot snapshot = slotSnapshots.computeIfAbsent(slotNum,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index fe3a7a0..9f1b562 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -311,7 +311,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         if (client != null) {
           try {
             client.removeHardLink(resource.getTsFile().getAbsolutePath(),
-                new GenericHandler<>(sourceNode, null));
+                dataGroupMember.getRaftGroupId(), new GenericHandler<>(sourceNode, null));
           } catch (TException e) {
             logger
                 .error("Cannot remove hardlink {} from {}", resource.getTsFile().getAbsolutePath(),
@@ -326,7 +326,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
           return;
         }
         try {
-          client.removeHardLink(resource.getTsFile().getAbsolutePath());
+          client.removeHardLink(resource.getTsFile().getAbsolutePath(),
+              dataGroupMember.getRaftGroupId());
         } catch (TException te) {
           client.getInputProtocol().getTransport().close();
           logger
@@ -516,7 +517,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         if (client == null) {
           throw new IOException("No available client for " + node.toString());
         }
-        ByteBuffer buffer = SyncClientAdaptor.readFile(client, remotePath, offset, fetchSize);
+        ByteBuffer buffer = SyncClientAdaptor
+            .readFile(client, remotePath, offset, fetchSize, dataGroupMember.getRaftGroupId());
         int len = writeBuffer(buffer, dest);
         if (len == 0) {
           break;
@@ -552,7 +554,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
 
       try {
         while (true) {
-          ByteBuffer buffer = client.readFile(remotePath, offset, fetchSize);
+          ByteBuffer buffer = client.readFile(remotePath, offset, fetchSize,
+              dataGroupMember.getRaftGroupId());
           int len = writeBuffer(buffer, dest);
           if (len == 0) {
             break;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index f0aa3f0..9dc6231 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -80,6 +80,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
     this.newMember = newMember;
     this.snapshotFactory = snapshotFactory;
     this.snapshotSave = snapshotSave;
+    persistTask();
   }
 
   @SuppressWarnings("java:S3740") // type cannot be known ahead
@@ -162,9 +163,9 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
 
   @Override
   public Void call() {
-    persistTask();
     request = new PullSnapshotRequest();
     request.setHeader(descriptor.getPreviousHolders().getHeader());
+    request.setRaftId(descriptor.getPreviousHolders().getId());
     request.setRequiredSlots(descriptor.getSlots());
     request.setRequireReadOnly(descriptor.isRequireReadOnly());
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
index 9e1fb90..441e960 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
@@ -75,6 +75,7 @@ public class PullSnapshotTaskDescriptor {
       dataOutputStream.writeInt(slot);
     }
 
+    dataOutputStream.writeInt(previousHolders.getId());
     dataOutputStream.writeInt(previousHolders.size());
     for (Node previousHolder : previousHolders) {
       SerializeUtils.serialize(previousHolder, dataOutputStream);
@@ -90,8 +91,8 @@ public class PullSnapshotTaskDescriptor {
       slots.add(dataInputStream.readInt());
     }
 
+    previousHolders = new PartitionGroup(dataInputStream.readInt());
     int holderSize = dataInputStream.readInt();
-    previousHolders = new PartitionGroup();
     for (int i = 0; i < holderSize; i++) {
       Node node = new Node();
       SerializeUtils.deserialize(node, dataInputStream);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 01817a4..82ef895 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -679,6 +679,7 @@ public class CMManager extends MManager {
     // pull schemas from a remote node
     PullSchemaRequest pullSchemaRequest = new PullSchemaRequest();
     pullSchemaRequest.setHeader(partitionGroup.getHeader());
+    pullSchemaRequest.setRaftId(partitionGroup.getId());
     pullSchemaRequest.setPrefixPaths(prefixPaths);
 
     // decide the node access order with the help of QueryCoordinator
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index e185e9d..16a4988 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -137,6 +137,7 @@ public class MetaPuller {
     // pull schemas from a remote node
     PullSchemaRequest pullSchemaRequest = new PullSchemaRequest();
     pullSchemaRequest.setHeader(partitionGroup.getHeader());
+    pullSchemaRequest.setRaftId(partitionGroup.getId());
     pullSchemaRequest.setPrefixPaths(prefixPaths.stream().map(PartialPath::getFullPath).collect(
         Collectors.toList()));
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 5fcfbcb..f8f89b9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -265,7 +265,7 @@ public class SlotPartitionTable implements PartitionTable {
     }
 
     SlotNodeAdditionResult result = new SlotNodeAdditionResult();
-    for (int raftId = 0 ;raftId < multiRaftFactor; raftId++) {
+    for (int raftId = 0; raftId < multiRaftFactor; raftId++) {
       PartitionGroup newGroup = getHeaderGroup(new RaftNode(node, raftId));
       if (newGroup.contains(thisNode)) {
         localGroups.add(newGroup);
@@ -296,7 +296,7 @@ public class SlotPartitionTable implements PartitionTable {
     // move the slots to the new node if any previous node have more slots than the new average
     int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor;
     int raftId = 0;
-    for(int i = 0 ; i < multiRaftFactor; i++) {
+    for (int i = 0; i < multiRaftFactor; i++) {
       RaftNode raftNode = new RaftNode(newNode, i);
       nodeSlotMap.putIfAbsent(raftNode, new ArrayList<>());
       previousNodeMap.putIfAbsent(raftNode, new HashMap<>());
@@ -307,10 +307,11 @@ public class SlotPartitionTable implements PartitionTable {
       if (transferNum > 0) {
         RaftNode curNode = new RaftNode(newNode, raftId);
         int numToMove = transferNum;
-        if(raftId != multiRaftFactor - 1) {
+        if (raftId != multiRaftFactor - 1) {
           numToMove = Math.min(numToMove, newAvg - nodeSlotMap.get(curNode).size());
         }
-        List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size() - transferNum + numToMove);
+        List<Integer> slotsToMove = slots
+            .subList(slots.size() - transferNum, slots.size() - transferNum + numToMove);
         nodeSlotMap.get(curNode).addAll(slotsToMove);
         for (Integer slot : slotsToMove) {
           // record what node previously hold the integer
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index e37aa7e..81ae373 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -317,12 +317,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void readFile(String filePath, long offset, int length,
+  public void readFile(String filePath, long offset, int length, int raftId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, 0), resultHandler,
+    DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, raftId), resultHandler,
         "Read file:" + filePath);
     if (service != null) {
-      service.readFile(filePath, offset, length, resultHandler);
+      service.readFile(filePath, offset, length, raftId, resultHandler);
     }
   }
 
@@ -849,8 +849,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
-    return getDataSyncService(new RaftNode(thisNode, 0)).readFile(filePath, offset, length);
+  public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
+    return getDataSyncService(new RaftNode(thisNode, raftId)).readFile(filePath, offset, length, raftId);
   }
 
   @Override
@@ -874,14 +874,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath) throws TException {
-    getDataSyncService(new RaftNode(thisNode, 0)).removeHardLink(hardLinkPath);
+  public void removeHardLink(String hardLinkPath, int raftId) throws TException {
+    getDataSyncService(new RaftNode(thisNode, raftId)).removeHardLink(hardLinkPath, raftId);
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath,
+  public void removeHardLink(String hardLinkPath, int raftId,
       AsyncMethodCallback<Void> resultHandler) {
-    getDataAsyncService(new RaftNode(thisNode, 0), resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
+    getDataAsyncService(new RaftNode(thisNode, raftId), resultHandler, hardLinkPath).removeHardLink(hardLinkPath, raftId,
         resultHandler);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index f8830c0..e4a7304 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -222,9 +222,9 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void readFile(String filePath, long offset, int length,
+  public void readFile(String filePath, long offset, int length, int raftId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    asyncService.readFile(filePath, offset, length, resultHandler);
+    asyncService.readFile(filePath, offset, length, raftId, resultHandler);
   }
 
   @Override
@@ -324,8 +324,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
-    return syncService.readFile(filePath, offset, length);
+  public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
+    return syncService.readFile(filePath, offset, length, raftId);
   }
 
   @Override
@@ -334,13 +334,13 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath) throws TException {
-    syncService.removeHardLink(hardLinkPath);
+  public void removeHardLink(String hardLinkPath, int raftId) throws TException {
+    syncService.removeHardLink(hardLinkPath, raftId);
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath,
+  public void removeHardLink(String hardLinkPath, int raftId,
       AsyncMethodCallback<Void> resultHandler) {
-    asyncService.removeHardLink(hardLinkPath, resultHandler);
+    asyncService.removeHardLink(hardLinkPath, raftId, resultHandler);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 1fb8336..2cd675f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -572,8 +572,9 @@ public class DataGroupMember extends RaftMember {
    * @return the path of the directory that is provided exclusively for the member.
    */
   private String getMemberDir() {
-    return IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator +
-        "raft" + File.separator + getHeader().nodeIdentifier + File.separator;
+    return IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "raft"
+        + File.separator + getHeader().nodeIdentifier + File.separator + getRaftGroupId()
+        + File.separator;
   }
 
   public MetaGroupMember getMetaGroupMember() {
@@ -625,8 +626,8 @@ public class DataGroupMember extends RaftMember {
         RaftNode raftNode = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
             partitionId * StorageEngine.getTimePartitionInterval());
         DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(raftNode);
-        if (localDataMember.getHeader().equals(this.getHeader())) {
-          localListPair.add(new Pair<>(partitionId, pair.right));
+        if (localDataMember.getHeader().equals(thisNode)) {
+          localListPair.add(pair);
         }
       }
       try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index c97223b..d93efc0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -532,6 +532,7 @@ public class MetaGroupMember extends RaftMember {
     newStartUpStatus
         .setReplicationNumber(ClusterDescriptor.getInstance().getConfig().getReplicationNum());
     newStartUpStatus.setClusterName(ClusterDescriptor.getInstance().getConfig().getClusterName());
+    newStartUpStatus.setMultiRaftFactor(ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor());
     List<String> seedUrls = ClusterDescriptor.getInstance().getConfig().getSeedNodeUrls();
     List<Node> seedNodeList = new ArrayList<>();
     for (String seedUrl : seedUrls) {
@@ -545,7 +546,7 @@ public class MetaGroupMember extends RaftMember {
    * Send a join cluster request to "node". If the joining is accepted, set the partition table,
    * start DataClusterServer and ClientServer and initialize DataGroupMembers.
    *
-   * @return rue if the node has successfully joined the cluster, false otherwise.
+   * @return true if the node has successfully joined the cluster, false otherwise.
    */
   private boolean joinCluster(Node node, StartUpStatus startUpStatus)
       throws TException, InterruptedException, ConfigInconsistentException {
@@ -594,18 +595,17 @@ public class MetaGroupMember extends RaftMember {
   }
 
   private void handleConfigInconsistency(AddNodeResponse resp) throws ConfigInconsistentException {
-    if (logger.isInfoEnabled()) {
-      CheckStatusResponse checkStatusResponse = resp.getCheckStatusResponse();
-      String parameters =
-          (checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval")
-              + (checkStatusResponse.isHashSaltEquals() ? "" : ", hash salt")
-              + (checkStatusResponse.isReplicationNumEquals() ? "" : ", replication number")
-              + (checkStatusResponse.isSeedNodeEquals() ? "" : ", seedNodes")
-              + (checkStatusResponse.isClusterNameEquals() ? "" : ", clusterName");
-      logger.error(
-          "The start up configuration{} conflicts the cluster. Please reset the configurations. ",
-          parameters.substring(1));
-    }
+    CheckStatusResponse checkStatusResponse = resp.getCheckStatusResponse();
+    String parameters =
+        (checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval")
+            + (checkStatusResponse.isHashSaltEquals() ? "" : ", hash salt")
+            + (checkStatusResponse.isReplicationNumEquals() ? "" : ", replication number")
+            + (checkStatusResponse.isSeedNodeEquals() ? "" : ", seedNodes")
+            + (checkStatusResponse.isClusterNameEquals() ? "" : ", clusterName")
+            + (checkStatusResponse.isMultiRaftFactorEquals() ? "" : ", multiRaftFactor");
+    logger.error(
+        "The start up configuration{} conflicts the cluster. Please reset the configurations. ",
+        parameters.substring(1));
     throw new ConfigInconsistentException();
   }
 
@@ -897,6 +897,7 @@ public class MetaGroupMember extends RaftMember {
     long remotePartitionInterval = remoteStartUpStatus.getPartitionInterval();
     int remoteHashSalt = remoteStartUpStatus.getHashSalt();
     int remoteReplicationNum = remoteStartUpStatus.getReplicationNumber();
+    int remoteMultiRaftFactor = remoteStartUpStatus.getMultiRaftFactor();
     String remoteClusterName = remoteStartUpStatus.getClusterName();
     List<Node> remoteSeedNodeList = remoteStartUpStatus.getSeedNodeList();
     long localPartitionInterval = IoTDBDescriptor.getInstance().getConfig()
@@ -904,7 +905,9 @@ public class MetaGroupMember extends RaftMember {
     int localHashSalt = ClusterConstant.HASH_SALT;
     int localReplicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
     String localClusterName = ClusterDescriptor.getInstance().getConfig().getClusterName();
+    int localMultiRaftFactor = ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
     boolean partitionIntervalEquals = true;
+    boolean multiRaftFactorEquals = true;
     boolean hashSaltEquals = true;
     boolean replicationNumEquals = true;
     boolean seedNodeEquals = true;
@@ -915,6 +918,11 @@ public class MetaGroupMember extends RaftMember {
       logger.info("Remote partition interval conflicts with the leader's. Leader: {}, remote: {}",
           localPartitionInterval, remotePartitionInterval);
     }
+    if (localMultiRaftFactor != remoteMultiRaftFactor) {
+      multiRaftFactorEquals = false;
+      logger.info("Remote multi-raft factor conflicts with the leader's. Leader: {}, remote: {}",
+          localMultiRaftFactor, remoteMultiRaftFactor);
+    }
     if (localHashSalt != remoteHashSalt) {
       hashSaltEquals = false;
       logger.info("Remote hash salt conflicts with the leader's. Leader: {}, remote: {}",
@@ -938,11 +946,11 @@ public class MetaGroupMember extends RaftMember {
       }
     }
     if (!(partitionIntervalEquals && hashSaltEquals && replicationNumEquals && seedNodeEquals
-        && clusterNameEquals)) {
+        && clusterNameEquals && multiRaftFactorEquals)) {
       response.setRespNum((int) Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT);
       response.setCheckStatusResponse(
           new CheckStatusResponse(partitionIntervalEquals, hashSaltEquals,
-              replicationNumEquals, seedNodeEquals, clusterNameEquals));
+              replicationNumEquals, seedNodeEquals, clusterNameEquals, multiRaftFactorEquals));
       return false;
     }
     return true;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 521050e..425a8d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -104,7 +104,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
   }
 
   @Override
-  public void readFile(String filePath, long offset, int length,
+  public void readFile(String filePath, long offset, int length, int raftId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
       resultHandler.onComplete(IOUtils.readFile(filePath, offset, length));
@@ -114,7 +114,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath,
+  public void removeHardLink(String hardLinkPath, int raftId,
       AsyncMethodCallback<Void> resultHandler) {
     try {
       Files.deleteIfExists(new File(hardLinkPath).toPath());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index f9bda1f..92f7018 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -122,7 +122,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
   }
 
   @Override
-  public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
+  public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
     try {
       return IOUtils.readFile(filePath, offset, length);
     } catch (IOException e) {
@@ -131,7 +131,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath) throws TException {
+  public void removeHardLink(String hardLinkPath, int raftId) throws TException {
     try {
       Files.deleteIfExists(new File(hardLinkPath).toPath());
     } catch (IOException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index a777f5a..4d1c87a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -82,12 +82,18 @@ public class ClusterUtils {
     boolean replicationNumEquals = true;
     boolean seedNodeListEquals = true;
     boolean clusterNameEqual = true;
+    boolean multiRaftFactorEqual = true;
 
     if (localStartUpStatus.getPartitionInterval() != remoteStartUpStatus.getPartitionInterval()) {
       partitionIntervalEquals = false;
       logger.error("Remote partition interval conflicts with local. local: {}, remote: {}",
           localStartUpStatus.getPartitionInterval(), remoteStartUpStatus.getPartitionInterval());
     }
+    if (localStartUpStatus.getMultiRaftFactor() != remoteStartUpStatus.getMultiRaftFactor()) {
+      multiRaftFactorEqual = false;
+      logger.error("Remote multi-raft factor conflicts with local. local: {}, remote: {}",
+          localStartUpStatus.getMultiRaftFactor(), remoteStartUpStatus.getMultiRaftFactor());
+    }
     if (localStartUpStatus.getHashSalt() != remoteStartUpStatus.getHashSalt()) {
       hashSaltEquals = false;
       logger.error("Remote hash salt conflicts with local. local: {}, remote: {}",
@@ -115,7 +121,7 @@ public class ClusterUtils {
     }
 
     return new CheckStatusResponse(partitionIntervalEquals, hashSaltEquals,
-        replicationNumEquals, seedNodeListEquals, clusterNameEqual);
+        replicationNumEquals, seedNodeListEquals, clusterNameEqual, multiRaftFactorEqual);
   }
 
   public static boolean checkSeedNodes(boolean isClusterEstablished, List<Node> localSeedNodes,
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index dcc582a..88deb3d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -64,7 +64,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 import org.apache.thrift.TException;
@@ -94,7 +93,7 @@ public class SyncClientAdaptorTest {
   @Before
   public void setUp() {
     nodeStatus = new TNodeStatus();
-    checkStatusResponse = new CheckStatusResponse(true, false, true, false, true);
+    checkStatusResponse = new CheckStatusResponse(true, false, true, false, true, true);
     addNodeResponse = new AddNodeResponse((int) Response.RESPONSE_AGREE);
     aggregateResults = Arrays.asList(ByteBuffer.wrap("1".getBytes()),
         ByteBuffer.wrap("2".getBytes()), ByteBuffer.wrap("2".getBytes()));
@@ -247,7 +246,7 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void readFile(String filePath, long offset, int length,
+      public void readFile(String filePath, long offset, int length, int raftId,
           AsyncMethodCallback<ByteBuffer> resultHandler) {
         resultHandler.onComplete(readFileResult);
       }
@@ -338,7 +337,7 @@ public class SyncClientAdaptorTest {
         TestUtils.getNode(0), 0, paths));
     assertEquals(1L, (long) SyncClientAdaptor.getGroupByExecutor(dataClient, new GroupByRequest()));
     assertEquals(fillResult, SyncClientAdaptor.previousFill(dataClient, new PreviousFillRequest()));
-    assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000));
+    assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000, 0));
     assertEquals(aggregateResults, SyncClientAdaptor.getGroupByResult(dataClient,
         TestUtils.getNode(0), 0, 1, 1, 2));
     assertEquals(peekNextNotNullValueResult, SyncClientAdaptor.peekNextNotNullValue(dataClient,
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 63ce1b7..51f0c8a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -117,7 +117,7 @@ public class TestAsyncDataClient extends AsyncDataClient {
   }
 
   @Override
-  public void readFile(String filePath, long offset, int length,
+  public void readFile(String filePath, long offset, int length, int raftId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     new Thread(() -> {
       File file = new File(filePath);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
index b89d058..38393d1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
@@ -62,7 +62,7 @@ public abstract class DataSnapshotTest {
       public AsyncClient getAsyncClient(Node node) {
         return new AsyncDataClient(null, null, null) {
           @Override
-          public void readFile(String filePath, long offset, int length,
+          public void readFile(String filePath, long offset, int length, int raftId,
               AsyncMethodCallback<ByteBuffer> resultHandler) {
             new Thread(() -> {
               if (addNetFailure && (failureCnt++) % failureFrequency == 0) {
@@ -79,8 +79,7 @@ public abstract class DataSnapshotTest {
           }
 
           @Override
-          public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler)
-              throws TException {
+          public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) {
             new Thread(() -> {
               try {
                 Files.deleteIfExists(new File(hardLinkPath).toPath());
@@ -121,7 +120,7 @@ public abstract class DataSnapshotTest {
           }
         })) {
           @Override
-          public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
+          public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
             if (addNetFailure && (failureCnt++) % failureFrequency == 0) {
               // simulate failures
               throw new TException("Faked network failure");
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
index d9ce485..539e839 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
@@ -119,7 +119,7 @@ public class PullSnapshotTaskTest extends DataSnapshotTest {
           }
 
           @Override
-          public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
+          public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
             try {
               return IOUtils.readFile(filePath, offset, length);
             } catch (IOException e) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 8180d4e..c920f06 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -222,7 +222,7 @@ public class DataGroupMemberTest extends MemberTest {
             }
 
             @Override
-            public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) {
+            public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) {
               new Thread(() -> {
                 try {
                   Files.deleteIfExists(new File(hardLinkPath).toPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 781836f..b03f984 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -500,9 +500,7 @@ public class StorageEngine implements IService {
    * @throws StorageGroupNotSetException
    */
   public void closeStorageGroupProcessor(PartialPath storageGroupPath, long partitionId,
-      boolean isSeq,
-      boolean isSync)
-      throws StorageGroupNotSetException {
+      boolean isSeq, boolean isSync) throws StorageGroupNotSetException {
     StorageGroupProcessor processor = processorMap.get(storageGroupPath);
     if (processor == null) {
       throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 69a4dc5..9019680 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -132,6 +132,7 @@ struct StartUpStatus {
   3: required int replicationNumber
   4: required list<Node> seedNodeList
   5: required string clusterName
+  6: required int multiRaftFactor
 }
 
 // follower -> leader
@@ -141,6 +142,7 @@ struct CheckStatusResponse {
   3: required bool replicationNumEquals
   4: required bool seedNodeEquals
   5: required bool clusterNameEquals
+  6: required bool multiRaftFactorEquals
 }
 
 struct SendSnapshotRequest {
@@ -319,7 +321,7 @@ service RaftService {
   * bytes, only the remaining will be returned.
   * Notice that when the last chunk of the file is read, the file will be deleted immediately.
   **/
-  binary readFile(1:string filePath, 2:long offset, 3:int length)
+  binary readFile(1:string filePath, 2:long offset, 3:int length, 4: int raftId)
 
   /**
   * Test if a log of "index" and "term" exists.
@@ -330,7 +332,7 @@ service RaftService {
   * When a follower finds that it already has a file in a snapshot locally, it calls this
   * interface to notify the leader to remove the associated hardlink.
   **/
-  void removeHardLink(1: string hardLinkPath)
+  void removeHardLink(1: string hardLinkPath, 2: int raftId)
 }