You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2020/12/19 10:37:08 UTC

[iotdb] 02/02: add multi-raft except for add/remove node

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch cluster_multi_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d5c075d26e216160c58a9353b7e196248356cb7f
Author: lta <li...@163.com>
AuthorDate: Tue Dec 15 15:29:32 2020 +0800

    add multi-raft except for add/remove node
---
 .../resources/conf/iotdb-cluster.properties        |   3 +
 .../cluster/client/sync/SyncClientAdaptor.java     |  49 ++--
 .../apache/iotdb/cluster/config/ClusterConfig.java |  11 +
 .../iotdb/cluster/config/ClusterDescriptor.java    |   5 +-
 .../iotdb/cluster/log/catchup/CatchUpTask.java     |  13 +-
 .../iotdb/cluster/log/catchup/LogCatchUpTask.java  |   7 +-
 .../cluster/log/catchup/SnapshotCatchUpTask.java   |   4 +-
 .../cluster/log/snapshot/PartitionedSnapshot.java  |   2 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  40 ++-
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |   2 +-
 .../iotdb/cluster/partition/PartitionGroup.java    |   1 -
 .../iotdb/cluster/partition/PartitionTable.java    |   9 +-
 .../cluster/partition/slot/SlotPartitionTable.java | 102 ++++----
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |  20 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |   4 +-
 .../iotdb/cluster/query/RemoteQueryContext.java    |   9 +-
 .../cluster/query/aggregate/ClusterAggregator.java |   4 +-
 .../cluster/query/fill/ClusterPreviousFill.java    |   4 +-
 .../query/groupby/RemoteGroupByExecutor.java       |  13 +-
 .../query/last/ClusterLastQueryExecutor.java       |   6 +-
 .../cluster/query/reader/ClusterReaderFactory.java |  22 +-
 .../iotdb/cluster/query/reader/DataSourceInfo.java |   6 +-
 .../reader/RemoteSeriesReaderByTimestamp.java      |   4 +-
 .../query/reader/RemoteSimpleSeriesReader.java     |   4 +-
 .../apache/iotdb/cluster/server/ClientServer.java  |   9 +-
 .../iotdb/cluster/server/DataClusterServer.java    | 272 ++++++++++-----------
 .../iotdb/cluster/server/MetaClusterServer.java    |  16 +-
 .../cluster/server/PullSnapshotHintService.java    |  21 +-
 .../iotdb/cluster/server/StoppedMemberManager.java |  27 +-
 .../cluster/server/member/DataGroupMember.java     |  30 +--
 .../cluster/server/member/MetaGroupMember.java     |  33 ++-
 .../iotdb/cluster/server/member/RaftMember.java    |  13 +-
 .../cluster/server/service/BaseAsyncService.java   |   1 +
 .../cluster/server/service/DataSyncService.java    |   4 +-
 .../apache/iotdb/cluster/utils/PartitionUtils.java |   3 +-
 .../cluster/utils/nodetool/ClusterMonitor.java     |  15 +-
 .../cluster/client/async/AsyncDataClientTest.java  |   2 +-
 .../cluster/client/async/AsyncMetaClientTest.java  |   2 +-
 .../cluster/client/sync/SyncClientAdaptorTest.java |  46 ++--
 .../iotdb/cluster/common/TestAsyncDataClient.java  |  16 +-
 .../iotdb/cluster/common/TestDataGroupMember.java  |   5 +-
 .../iotdb/cluster/common/TestMetaGroupMember.java  |   3 +-
 .../iotdb/cluster/log/LogDispatcherTest.java       |   3 +-
 .../cluster/log/applier/DataLogApplierTest.java    |   6 +-
 .../iotdb/cluster/log/catchup/CatchUpTaskTest.java |  16 +-
 .../cluster/log/catchup/LogCatchUpTaskTest.java    |  12 +-
 .../log/catchup/SnapshotCatchUpTaskTest.java       |  10 +-
 .../log/snapshot/PartitionedSnapshotTest.java      |   3 +-
 .../cluster/log/snapshot/PullSnapshotTaskTest.java |   1 -
 .../cluster/partition/SlotPartitionTableTest.java  |  18 +-
 .../reader/RemoteSeriesReaderByTimestampTest.java  |   2 +-
 .../query/reader/RemoteSimpleSeriesReaderTest.java |   2 +-
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |  10 +-
 .../cluster/server/member/DataGroupMemberTest.java |  31 +--
 .../iotdb/cluster/server/member/MemberTest.java    |   4 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   4 +-
 thrift/src/main/thrift/cluster.thrift              |   5 +
 57 files changed, 528 insertions(+), 461 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 640cfba..7cd8c22 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -62,6 +62,9 @@ max_concurrent_client_num=10000
 # number of replications for one partition
 default_replica_num=2
 
+# sub raft num for multi-raft
+multi_raft_factor=2
+
 # cluster name to identify different clusters
 # all node's cluster_name in one cluster are the same
 cluster_name=default
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index b4436f5..04d6ea1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -101,12 +101,12 @@ public class SyncClientAdaptor {
   }
 
   public static Boolean matchTerm(AsyncClient client, Node target, long prevLogIndex,
-      long prevLogTerm, Node header) throws TException, InterruptedException {
+      long prevLogTerm, Node header, int raftId) throws TException, InterruptedException {
     try {
       AtomicReference<Boolean> resultRef = new AtomicReference<>(null);
       GenericHandler<Boolean> matchTermHandler = new GenericHandler<>(target, resultRef);
 
-      client.matchTerm(prevLogIndex, prevLogTerm, header, matchTermHandler);
+      client.matchTerm(prevLogIndex, prevLogTerm, header, raftId, matchTermHandler);
       synchronized (resultRef) {
         if (resultRef.get() == null) {
           resultRef.wait(RaftServer.getConnectionTimeoutInMS());
@@ -157,14 +157,14 @@ public class SyncClientAdaptor {
     return result.get();
   }
 
-  public static List<String> getNodeList(AsyncDataClient client, Node header,
+  public static List<String> getNodeList(AsyncDataClient client, Node header, int raftId,
       String schemaPattern, int level) throws TException, InterruptedException {
     GetNodesListHandler handler = new GetNodesListHandler();
     AtomicReference<List<String>> response = new AtomicReference<>(null);
     handler.setResponse(response);
     handler.setContact(client.getNode());
 
-    client.getNodeList(header, schemaPattern, level, handler);
+    client.getNodeList(header, raftId, schemaPattern, level, handler);
     synchronized (response) {
       if (response.get() == null) {
         response.wait(RaftServer.getReadOperationTimeoutMS());
@@ -173,14 +173,14 @@ public class SyncClientAdaptor {
     return response.get();
   }
 
-  public static Set<String> getNextChildren(AsyncDataClient client, Node header, String path)
+  public static Set<String> getNextChildren(AsyncDataClient client, Node header, int raftId, String path)
       throws TException, InterruptedException {
     GetChildNodeNextLevelPathHandler handler = new GetChildNodeNextLevelPathHandler();
     AtomicReference<Set<String>> response = new AtomicReference<>(null);
     handler.setResponse(response);
     handler.setContact(client.getNode());
 
-    client.getChildNodePathInNextLevel(header, path, handler);
+    client.getChildNodePathInNextLevel(header, raftId, path, handler);
     synchronized (response) {
       if (response.get() == null) {
         response.wait(RaftServer.getReadOperationTimeoutMS());
@@ -190,7 +190,7 @@ public class SyncClientAdaptor {
   }
 
   public static ByteBuffer getAllMeasurementSchema(AsyncDataClient client,
-      Node header, ShowTimeSeriesPlan plan)
+      Node header, int raftId, ShowTimeSeriesPlan plan)
       throws IOException, InterruptedException, TException {
     GetTimeseriesSchemaHandler handler = new GetTimeseriesSchemaHandler();
     AtomicReference<ByteBuffer> response = new AtomicReference<>(null);
@@ -200,7 +200,7 @@ public class SyncClientAdaptor {
     DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
     plan.serialize(dataOutputStream);
 
-    client.getAllMeasurementSchema(header, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()),
+    client.getAllMeasurementSchema(header, raftId, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()),
         handler);
     synchronized (response) {
       if (response.get() == null) {
@@ -309,43 +309,43 @@ public class SyncClientAdaptor {
     return resultReference.get();
   }
 
-  public static List<String> getUnregisteredMeasurements(AsyncDataClient client, Node header,
+  public static List<String> getUnregisteredMeasurements(AsyncDataClient client, Node header, int raftId,
       List<String> seriesPaths) throws TException, InterruptedException {
     AtomicReference<List<String>> remoteResult = new AtomicReference<>();
     GenericHandler<List<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
 
-    client.getUnregisteredTimeseries(header, seriesPaths, handler);
+    client.getUnregisteredTimeseries(header, raftId, seriesPaths, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static GetAllPathsResult getAllPaths(AsyncDataClient client, Node header,
+  public static GetAllPathsResult getAllPaths(AsyncDataClient client, Node header, int raftId,
       List<String> pathsToQuery, boolean withAlias)
       throws InterruptedException, TException {
     AtomicReference<GetAllPathsResult> remoteResult = new AtomicReference<>();
     GenericHandler<GetAllPathsResult> handler = new GenericHandler<>(client.getNode(),
         remoteResult);
 
-    client.getAllPaths(header, pathsToQuery, withAlias, handler);
+    client.getAllPaths(header, raftId, pathsToQuery, withAlias, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static Integer getPathCount(AsyncDataClient client, Node header, List<String> pathsToQuery,
+  public static Integer getPathCount(AsyncDataClient client, Node header, int raftId, List<String> pathsToQuery,
       int level)
       throws InterruptedException, TException {
     AtomicReference<Integer> remoteResult = new AtomicReference<>(null);
     GenericHandler<Integer> handler = new GenericHandler<>(client.getNode(), remoteResult);
 
-    client.getPathCount(header, pathsToQuery, level, handler);
+    client.getPathCount(header, raftId, pathsToQuery, level, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static Set<String> getAllDevices(AsyncDataClient client, Node header,
+  public static Set<String> getAllDevices(AsyncDataClient client, Node header, int raftId,
       List<String> pathsToQuery)
       throws InterruptedException, TException {
     AtomicReference<Set<String>> remoteResult = new AtomicReference<>();
     GenericHandler<Set<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
 
-    client.getAllDevices(header, pathsToQuery, handler);
+    client.getAllDevices(header, raftId, pathsToQuery, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
@@ -395,23 +395,23 @@ public class SyncClientAdaptor {
     return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
   }
 
-  public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node header,
+  public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node header, int raftId,
       long executorId
       , long curStartTime, long curEndTime) throws InterruptedException, TException {
     AtomicReference<List<ByteBuffer>> fetchResult = new AtomicReference<>();
     GenericHandler<List<ByteBuffer>> handler = new GenericHandler<>(client.getNode(), fetchResult);
 
-    client.getGroupByResult(header, executorId, curStartTime, curEndTime, handler);
+    client.getGroupByResult(header, raftId, executorId, curStartTime, curEndTime, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static ByteBuffer peekNextNotNullValue(AsyncDataClient client, Node header,
+  public static ByteBuffer peekNextNotNullValue(AsyncDataClient client, Node header, int raftId,
       long executorId
       , long curStartTime, long curEndTime) throws InterruptedException, TException {
     AtomicReference<ByteBuffer> fetchResult = new AtomicReference<>();
     GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), fetchResult);
 
-    client.peekNextNotNullValue(header, executorId, curStartTime, curEndTime, handler);
+    client.peekNextNotNullValue(header, raftId, executorId, curStartTime, curEndTime, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
@@ -433,24 +433,23 @@ public class SyncClientAdaptor {
   public static ByteBuffer last(AsyncDataClient client, List<PartialPath> seriesPaths,
       List<Integer> dataTypeOrdinals, QueryContext context,
       Map<String, Set<String>> deviceMeasurements,
-      Node header)
+      Node header, int raftId)
       throws TException, InterruptedException {
     AtomicReference<ByteBuffer> result = new AtomicReference<>();
     GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), result);
     LastQueryRequest request = new LastQueryRequest(PartialPath.toStringList(seriesPaths),
-        dataTypeOrdinals,
-        context.getQueryId(), deviceMeasurements, header, client.getNode());
+        dataTypeOrdinals, context.getQueryId(), deviceMeasurements, header, raftId, client.getNode());
 
     client.last(request, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static boolean onSnapshotApplied(AsyncDataClient client, Node header, List<Integer> slots)
+  public static boolean onSnapshotApplied(AsyncDataClient client, Node header, int raftId, List<Integer> slots)
       throws TException, InterruptedException {
     AtomicReference<Boolean> result = new AtomicReference<>(false);
     GenericHandler<Boolean> handler = new GenericHandler<>(client.getNode(), result);
 
-    client.onSnapshotApplied(header, slots, handler);
+    client.onSnapshotApplied(header, raftId, slots, handler);
     return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 4d5d05b..306b081 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -46,6 +46,9 @@ public class ClusterConfig {
   private int replicationNum = 2;
 
   @ClusterConsistent
+  private int multiRaftFactor = 2;
+
+  @ClusterConsistent
   private String clusterName = "default";
 
   @ClusterConsistent
@@ -234,6 +237,14 @@ public class ClusterConfig {
     this.replicationNum = replicationNum;
   }
 
+  public int getMultiRaftFactor() {
+    return multiRaftFactor;
+  }
+
+  public void setMultiRaftFactor(int multiRaftFactor) {
+    this.multiRaftFactor = multiRaftFactor;
+  }
+
   void setClusterName(String clusterName) {
     this.clusterName = clusterName;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 3c9e8ec..d0c620b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -208,8 +208,11 @@ public class ClusterDescriptor {
     config.setMaxConcurrentClientNum(Integer.parseInt(properties.getProperty(
         "max_concurrent_client_num", String.valueOf(config.getMaxConcurrentClientNum()))));
 
+    config.setMultiRaftFactor(Integer.parseInt(properties.getProperty(
+        "multi_raft_factor", String.valueOf(config.getMultiRaftFactor()))));
+
     config.setReplicationNum(Integer.parseInt(properties.getProperty(
-        "default_replica_num", String.valueOf(config.getReplicationNum()))));
+        "", String.valueOf(config.getReplicationNum()))));
 
     config.setClusterName(properties.getProperty("cluster_name", config.getClusterName()));
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index 23687ad..e40aeba 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -55,9 +55,12 @@ public class CatchUpTask implements Runnable {
   private long lastLogIndex;
   private boolean abort;
   private String name;
+  private int raftId;
 
-  public CatchUpTask(Node node, Peer peer, RaftMember raftMember, long lastLogIdx) {
+
+  public CatchUpTask(Node node, int raftId, Peer peer, RaftMember raftMember, long lastLogIdx) {
     this.node = node;
+    this.raftId = raftId;
     this.peer = peer;
     this.raftMember = raftMember;
     this.logs = Collections.emptyList();
@@ -242,14 +245,14 @@ public class CatchUpTask implements Runnable {
         return false;
       }
       Node header = raftMember.getHeader();
-      matched = SyncClientAdaptor.matchTerm(client, node, logIndex, logTerm, header);
+      matched = SyncClientAdaptor.matchTerm(client, node, logIndex, logTerm, header, raftId);
     } else {
       Client client = raftMember.getSyncClient(node);
       if (client == null) {
         return false;
       }
       try {
-        matched = client.matchTerm(logIndex, logTerm, raftMember.getHeader());
+        matched = client.matchTerm(logIndex, logTerm, raftMember.getHeader(), raftId);
       } catch (TException e) {
         client.getInputProtocol().getTransport().close();
         throw e;
@@ -321,11 +324,11 @@ public class CatchUpTask implements Runnable {
         doSnapshot();
         // snapshot may overlap with logs
         removeSnapshotLogs();
-        SnapshotCatchUpTask task = new SnapshotCatchUpTask(logs, snapshot, node, raftMember);
+        SnapshotCatchUpTask task = new SnapshotCatchUpTask(logs, snapshot, node, raftId, raftMember);
         catchUpSucceeded = task.call();
       } else {
         logger.info("{}: performing a log catch-up to {}", raftMember.getName(), node);
-        LogCatchUpTask task = new LogCatchUpTask(logs, node, raftMember);
+        LogCatchUpTask task = new LogCatchUpTask(logs, node, raftId, raftMember);
         catchUpSucceeded = task.call();
       }
       if (catchUpSucceeded) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
index 3520ce4..8b69884 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
@@ -61,17 +61,20 @@ public class LogCatchUpTask implements Callable<Boolean> {
   private List<Log> logs;
   private boolean useBatch = ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
   boolean abort = false;
+  private int raftId;
 
-  LogCatchUpTask(List<Log> logs, Node node, RaftMember raftMember) {
+  LogCatchUpTask(List<Log> logs, Node node, int raftId, RaftMember raftMember) {
     this.logs = logs;
     this.node = node;
+    this.raftId = raftId;
     this.raftMember = raftMember;
   }
 
   @TestOnly
-  LogCatchUpTask(List<Log> logs, Node node, RaftMember raftMember, boolean useBatch) {
+  LogCatchUpTask(List<Log> logs, Node node, int raftId, RaftMember raftMember, boolean useBatch) {
     this.logs = logs;
     this.node = node;
+    this.raftId = raftId;
     this.raftMember = raftMember;
     this.useBatch = useBatch;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
index 1a858b0..548a2db 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
@@ -51,8 +51,8 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
       .getCatchUpTimeoutMS();
   private Snapshot snapshot;
 
-  SnapshotCatchUpTask(List<Log> logs, Snapshot snapshot, Node node, RaftMember raftMember) {
-    super(logs, node, raftMember);
+  SnapshotCatchUpTask(List<Log> logs, Snapshot snapshot, Node node, int raftId, RaftMember raftMember) {
+    super(logs, node, raftId, raftMember);
     this.snapshot = snapshot;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
index a0eff88..605d086 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
@@ -166,7 +166,7 @@ public class PartitionedSnapshot<T extends Snapshot> extends Snapshot {
       synchronized (dataGroupMember.getSnapshotApplyLock()) {
         List<Integer> slots =
             ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
-                .getNodeSlots(dataGroupMember.getHeader());
+                .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
         for (Integer slot : slots) {
           T subSnapshot = snapshot.getSnapshot(slot);
           if (subSnapshot != null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index ceef221..1997779 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -110,7 +110,7 @@ public class CMManager extends MManager {
 
   private static final Logger logger = LoggerFactory.getLogger(CMManager.class);
 
-  private ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
   // only cache the series who is writing, we need not to cache series who is reading
   // because the read is slow, so pull from remote is little cost comparing to the disk io
   private RemoteMetaCache mRemoteMetaCache;
@@ -578,7 +578,7 @@ public class CMManager extends MManager {
   private List<String> getUnregisteredSeriesListLocally(List<String> seriesList,
       PartitionGroup partitionGroup) throws CheckConsistencyException {
     DataGroupMember dataMember = metaGroupMember.getDataClusterServer()
-        .getDataMember(partitionGroup.getHeader(), null, null);
+        .getDataMember(partitionGroup.getHeader(), partitionGroup.getId(), null, null);
     return dataMember.getLocalQueryExecutor().getUnregisteredTimeseries(seriesList);
   }
 
@@ -591,12 +591,13 @@ public class CMManager extends MManager {
           AsyncDataClient client = metaGroupMember.getClientProvider().getAsyncDataClient(node,
               RaftServer.getReadOperationTimeoutMS());
           result = SyncClientAdaptor
-              .getUnregisteredMeasurements(client, partitionGroup.getHeader(), seriesList);
+              .getUnregisteredMeasurements(client, partitionGroup.getHeader(), partitionGroup.getId(), seriesList);
         } else {
           SyncDataClient syncDataClient =
               metaGroupMember.getClientProvider().getSyncDataClient(node,
                   RaftServer.getReadOperationTimeoutMS());
-          result = syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
+          result = syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(),
+              partitionGroup.getId(), seriesList);
           ClientUtils.putBackSyncClient(syncDataClient);
         }
 
@@ -671,7 +672,7 @@ public class CMManager extends MManager {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the node is in the target group, synchronize with leader should be enough
       metaGroupMember.getLocalDataMember(partitionGroup.getHeader(),
-          "Pull timeseries of " + prefixPaths).syncLeader();
+          partitionGroup.getId(), "Pull timeseries of " + prefixPaths).syncLeader();
       return;
     }
 
@@ -899,7 +900,7 @@ public class CMManager extends MManager {
       if (partitionGroup.contains(metaGroupMember.getThisNode())) {
         // this node is a member of the group, perform a local query after synchronizing with the
         // leader
-        metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader();
+        metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId()).syncLeader();
         List<PartialPath> allTimeseriesName = getMatchedPathsLocally(pathUnderSG, withAlias);
         logger.debug("{}: get matched paths of {} locally, result {}", metaGroupMember.getName(),
             partitionGroup, allTimeseriesName);
@@ -937,7 +938,7 @@ public class CMManager extends MManager {
     List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
     for (Node node : coordinatedNodes) {
       try {
-        List<PartialPath> paths = getMatchedPaths(node, partitionGroup.getHeader(), pathsToQuery,
+        List<PartialPath> paths = getMatchedPaths(node, partitionGroup.getHeader(), partitionGroup.getId(), pathsToQuery,
             withAlias);
         if (logger.isDebugEnabled()) {
           logger.debug("{}: get matched paths of {} and other {} paths from {} in {}, result {}",
@@ -960,19 +961,18 @@ public class CMManager extends MManager {
   }
 
   @SuppressWarnings("java:S1168") // null and empty list are different
-  private List<PartialPath> getMatchedPaths(Node node, Node header, List<String> pathsToQuery,
+  private List<PartialPath> getMatchedPaths(Node node, Node header, int raftId, List<String> pathsToQuery,
       boolean withAlias)
       throws IOException, TException, InterruptedException {
     GetAllPathsResult result;
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       AsyncDataClient client = metaGroupMember.getClientProvider().getAsyncDataClient(node,
           RaftServer.getReadOperationTimeoutMS());
-      result = SyncClientAdaptor.getAllPaths(client, header,
-          pathsToQuery, withAlias);
+      result = SyncClientAdaptor.getAllPaths(client, header, raftId, pathsToQuery, withAlias);
     } else {
       SyncDataClient syncDataClient = metaGroupMember.getClientProvider().getSyncDataClient(node,
           RaftServer.getReadOperationTimeoutMS());
-      result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
+      result = syncDataClient.getAllPaths(header, raftId, pathsToQuery, withAlias);
       ClientUtils.putBackSyncClient(syncDataClient);
     }
 
@@ -1019,7 +1019,7 @@ public class CMManager extends MManager {
       if (partitionGroup.contains(metaGroupMember.getThisNode())) {
         // this node is a member of the group, perform a local query after synchronizing with the
         // leader
-        metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader();
+        metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId()).syncLeader();
         Set<PartialPath> allDevices = getDevices(pathUnderSG);
         logger.debug("{}: get matched paths of {} locally, result {}", metaGroupMember.getName(),
             partitionGroup,
@@ -1050,7 +1050,7 @@ public class CMManager extends MManager {
     List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
     for (Node node : coordinatedNodes) {
       try {
-        Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
+        Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), partitionGroup.getId(), pathsToQuery);
         logger.debug("{}: get matched paths of {} from {}, result {}", metaGroupMember.getName(),
             partitionGroup,
             node, paths);
@@ -1073,18 +1073,17 @@ public class CMManager extends MManager {
     return Collections.emptySet();
   }
 
-  private Set<String> getMatchedDevices(Node node, Node header, List<String> pathsToQuery)
+  private Set<String> getMatchedDevices(Node node, Node header, int raftId, List<String> pathsToQuery)
       throws IOException, TException, InterruptedException {
     Set<String> paths;
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       AsyncDataClient client = metaGroupMember.getClientProvider().getAsyncDataClient(node,
           RaftServer.getReadOperationTimeoutMS());
-      paths = SyncClientAdaptor.getAllDevices(client, header,
-          pathsToQuery);
+      paths = SyncClientAdaptor.getAllDevices(client, header, raftId, pathsToQuery);
     } else {
       SyncDataClient syncDataClient = metaGroupMember.getClientProvider().getSyncDataClient(node,
           RaftServer.getReadOperationTimeoutMS());
-      paths = syncDataClient.getAllDevices(header, pathsToQuery);
+      paths = syncDataClient.getAllDevices(header, raftId, pathsToQuery);
       ClientUtils.putBackSyncClient(syncDataClient);
     }
     return paths;
@@ -1340,8 +1339,7 @@ public class CMManager extends MManager {
   private void showLocalTimeseries(PartitionGroup group, ShowTimeSeriesPlan plan,
       Set<ShowTimeSeriesResult> resultSet, QueryContext context)
       throws CheckConsistencyException, MetadataException {
-    Node header = group.getHeader();
-    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
     localDataMember.syncLeaderWithConsistencyCheck(false);
     try {
       List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context);
@@ -1392,14 +1390,14 @@ public class CMManager extends MManager {
       AsyncDataClient client = metaGroupMember
           .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
       resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(),
-          plan);
+          group.getId(), plan);
     } else {
       SyncDataClient syncDataClient = metaGroupMember
           .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
       ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
       DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
       plan.serialize(dataOutputStream);
-      resultBinary = syncDataClient.getAllMeasurementSchema(group.getHeader(),
+      resultBinary = syncDataClient.getAllMeasurementSchema(group.getHeader(), group.getId(),
           ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
       ClientUtils.putBackSyncClient(syncDataClient);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index 68306cd..e185e9d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -120,7 +120,7 @@ public class MetaPuller {
       List<PartialPath> prefixPaths, List<MeasurementSchema> results) {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the node is in the target group, synchronize with leader should be enough
-      metaGroupMember.getLocalDataMember(partitionGroup.getHeader(),
+      metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId(),
           "Pull timeseries of " + prefixPaths).syncLeader();
       int preSize = results.size();
       for (PartialPath prefixPath : prefixPaths) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index dc5cbcc..5ab4275 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -69,7 +69,6 @@ public class PartitionGroup extends ArrayList<Node> {
     return Objects.hash(id, getHeader());
   }
 
-
   public Node getHeader() {
     return get(0);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index a1a5ae6..f1c9a91 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.commons.collections4.map.MultiKeyMap;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -55,7 +56,7 @@ public interface PartitionTable {
    * @param timestamp
    * @return
    */
-  Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long timestamp);
+  RaftNode routeToHeaderByTime(String storageGroupName, long timestamp);
 
   /**
    * Add a new node to update the partition table.
@@ -79,10 +80,12 @@ public interface PartitionTable {
   List<PartitionGroup> getLocalGroups();
 
   /**
-   * @param pair
+   * @param raftNode
    * @return the partition group starting from the header.
    */
-  PartitionGroup getHeaderGroup(Pair<Node, Integer> pair);
+  PartitionGroup getHeaderGroup(RaftNode raftNode);
+
+  PartitionGroup getHeaderGroup(Node node);
 
   ByteBuffer serialize();
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index a1f98ce..355ac95 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotStrategy.DefaultStrategy;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.db.utils.SerializeUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +46,8 @@ public class SlotPartitionTable implements PartitionTable {
   private int replicationNum =
       ClusterDescriptor.getInstance().getConfig().getReplicationNum();
 
-  private int raftGroupNum = 2;
+  private int multiRaftFactor =
+      ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
 
   //all nodes
   private List<Node> nodeRing = new ArrayList<>();
@@ -55,12 +56,12 @@ public class SlotPartitionTable implements PartitionTable {
 
   //The following fields are used for determining which node a data item belongs to.
   // the slots held by each node
-  private Map<Pair<Node, Integer>, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>();
   // each slot is managed by whom
-  private Pair<Node, Integer>[] slotNodes = new Pair[ClusterConstant.SLOT_NUM];
+  private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM];
   // the nodes that each slot belongs to before a new node is added, used for the new node to
   // find the data source
-  private Map<Pair<Node, Integer>, Map<Integer, Node>> previousNodeMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, Map<Integer, RaftNode>> previousNodeMap = new ConcurrentHashMap<>();
 
   //the filed is used for determining which nodes need to be a group.
   // the data groups which this node belongs to.
@@ -112,30 +113,28 @@ public class SlotPartitionTable implements PartitionTable {
     // evenly assign the slots to each node
     int nodeNum = nodeRing.size();
     int slotsPerNode = totalSlotNumbers / nodeNum;
-    int slotsPerRaftGroup = slotsPerNode / raftGroupNum;
+    int slotsPerRaftGroup = slotsPerNode / multiRaftFactor;
     for (Node node : nodeRing) {
-      for (int i = 0; i < raftGroupNum; i++) {
-        nodeSlotMap.put(new Pair<>(node, i), new ArrayList<>());
+      for (int i = 0; i < multiRaftFactor; i++) {
+        nodeSlotMap.put(new RaftNode(node, i), new ArrayList<>());
       }
     }
 
     for (int i = 0; i < totalSlotNumbers; i++) {
       int nodeIdx = i / slotsPerNode;
+      int raftId = i % slotsPerNode / slotsPerRaftGroup;
       if (nodeIdx >= nodeNum) {
         // the last node may receive a little more if total slots cannot de divided by node number
         nodeIdx--;
       }
-      for (int j = 0; j < nodeIdx; j++) {
-        int groupIdx = j / slotsPerRaftGroup;
-        if (groupIdx >= raftGroupNum) {
-          groupIdx--;
-        }
-        nodeSlotMap.get(new Pair<>(nodeRing.get(nodeIdx), groupIdx)).add(i);
+      if (raftId >= multiRaftFactor) {
+        raftId--;
       }
+      nodeSlotMap.get(new RaftNode(nodeRing.get(nodeIdx), raftId)).add(i);
     }
 
     // build the index to find a node by slot
-    for (Entry<Pair<Node, Integer>, List<Integer>> entry : nodeSlotMap.entrySet()) {
+    for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
       for (Integer slot : entry.getValue()) {
         slotNodes[slot] = entry.getKey();
       }
@@ -155,8 +154,8 @@ public class SlotPartitionTable implements PartitionTable {
       if (startIndex < 0) {
         startIndex = startIndex + nodeRing.size();
       }
-      for (int j = 0; j < raftGroupNum; j++) {
-        ret.add(getHeaderGroup(new Pair<>(nodeRing.get(startIndex), j)));
+      for (int j = 0; j < multiRaftFactor; j++) {
+        ret.add(getHeaderGroup(new RaftNode(nodeRing.get(startIndex), j)));
       }
     }
 
@@ -165,13 +164,13 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public PartitionGroup getHeaderGroup(Pair<Node, Integer> pair) {
-    PartitionGroup ret = new PartitionGroup(pair.right);
+  public PartitionGroup getHeaderGroup(RaftNode raftNode) {
+    PartitionGroup ret = new PartitionGroup(raftNode.getRaftId());
 
     // assuming the nodes are [1,2,3,4,5]
-    int nodeIndex = nodeRing.indexOf(pair.left);
+    int nodeIndex = nodeRing.indexOf(raftNode.getNode());
     if (nodeIndex == -1) {
-      logger.error("Node {} is not in the cluster", pair.left);
+      logger.error("Node {} is not in the cluster", raftNode.getNode());
       return null;
     }
     int endIndex = nodeIndex + replicationNum;
@@ -187,10 +186,15 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
+  public PartitionGroup getHeaderGroup(Node node) {
+    return getHeaderGroup(new RaftNode(node, 0));
+  }
+
+  @Override
   public PartitionGroup route(String storageGroupName, long timestamp) {
     synchronized (nodeRing) {
-      Pair<Node, Integer> pair = routeToHeaderByTime(storageGroupName, timestamp);
-      return getHeaderGroup(pair);
+      RaftNode raftNode = routeToHeaderByTime(storageGroupName, timestamp);
+      return getHeaderGroup(raftNode);
     }
   }
 
@@ -200,24 +204,24 @@ public class SlotPartitionTable implements PartitionTable {
           Thread.currentThread().getStackTrace());
       return null;
     }
-    Pair<Node, Integer> pair = slotNodes[slot];
-    logger.debug("The slot of {} is held by {}", slot, pair);
-    if (pair.left == null) {
+    RaftNode raftNode = slotNodes[slot];
+    logger.debug("The slot of {} is held by {}", slot, raftNode);
+    if (raftNode.getNode() == null) {
       logger.warn("The slot {} is incorrect", slot);
       return null;
     }
-    return getHeaderGroup(pair);
+    return getHeaderGroup(raftNode);
   }
 
   @Override
-  public Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long timestamp) {
+  public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) {
     synchronized (nodeRing) {
       int slot = getSlotStrategy()
           .calculateSlotByTime(storageGroupName, timestamp, getTotalSlotNumbers());
-      Pair<Node, Integer> pair = slotNodes[slot];
+      RaftNode raftNode = slotNodes[slot];
       logger.trace("The slot of {}@{} is {}, held by {}", storageGroupName, timestamp,
-          slot, pair);
-      return pair;
+          slot, raftNode);
+      return raftNode;
     }
   }
 
@@ -325,9 +329,9 @@ public class SlotPartitionTable implements PartitionTable {
     try {
       dataOutputStream.writeInt(totalSlotNumbers);
       dataOutputStream.writeInt(nodeSlotMap.size());
-      for (Entry<Pair<Node, Integer>, List<Integer>> entry : nodeSlotMap.entrySet()) {
-        SerializeUtils.serialize(entry.getKey().left, dataOutputStream);
-        dataOutputStream.writeInt(entry.getKey().right);
+      for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
+        SerializeUtils.serialize(entry.getKey().getNode(), dataOutputStream);
+        dataOutputStream.writeInt(entry.getKey().getRaftId());
         SerializeUtils.serialize(entry.getValue(), dataOutputStream);
       }
 
@@ -363,11 +367,11 @@ public class SlotPartitionTable implements PartitionTable {
       SerializeUtils.deserialize(node, buffer);
       int id = buffer.getInt();
       SerializeUtils.deserialize(slots, buffer);
-      Pair pair = new Pair<>(node, id);
-      nodeSlotMap.put(pair, slots);
+      RaftNode raftNode = new RaftNode(node, id);
+      nodeSlotMap.put(raftNode, slots);
       idNodeMap.put(node.getNodeIdentifier(), node);
       for (Integer slot : slots) {
-        slotNodes[slot] = pair;
+        slotNodes[slot] = raftNode;
       }
     }
 
@@ -388,9 +392,9 @@ public class SlotPartitionTable implements PartitionTable {
 //    }
     lastLogIndex = buffer.getLong();
 
-    for (Pair<Node, Integer> nodeIntegerPair : nodeSlotMap.keySet()) {
-      if (!nodeRing.contains(nodeIntegerPair.left)) {
-        nodeRing.add(nodeIntegerPair.left);
+    for (RaftNode raftNode : nodeSlotMap.keySet()) {
+      if (!nodeRing.contains(raftNode.getNode())) {
+        nodeRing.add(raftNode.getNode());
       }
     }
     nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
@@ -404,15 +408,19 @@ public class SlotPartitionTable implements PartitionTable {
     return nodeRing;
   }
 
-  public Map<Integer, Node> getPreviousNodeMap(Node node) {
-    return previousNodeMap.get(node);
+  public Map<Integer, RaftNode> getPreviousNodeMap(RaftNode raftNode) {
+    return previousNodeMap.get(raftNode);
+  }
+
+  public List<Integer> getNodeSlots(Node header, int raftId) {
+    return getNodeSlots(new RaftNode(header, raftId));
   }
 
-  public List<Integer> getNodeSlots(Node header) {
+  public List<Integer> getNodeSlots(RaftNode header) {
     return nodeSlotMap.get(header);
   }
 
-  public Map<Pair<Node, Integer>, List<Integer>> getAllNodeSlots() {
+  public Map<RaftNode, List<Integer>> getAllNodeSlots() {
     return nodeSlotMap;
   }
 
@@ -516,9 +524,9 @@ public class SlotPartitionTable implements PartitionTable {
 
   private void calculateGlobalGroups() {
     globalGroups = new ArrayList<>();
-    for (Node n : getAllNodes()) {
-      for (int i = 0; i < raftGroupNum; i++) {
-        globalGroups.add(getHeaderGroup(new Pair<>(n, i)));
+    for (Node node : getAllNodes()) {
+      for (int i = 0; i < multiRaftFactor; i++) {
+        globalGroups.add(getHeaderGroup(new RaftNode(node, i)));
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 8b7116b..0acf992 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -177,7 +177,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
       if (partitionGroup.contains(metaGroupMember.getThisNode())) {
         // this node is a member of the group, perform a local query after synchronizing with the
         // leader
-        metaGroupMember.getLocalDataMember(partitionGroup.getHeader())
+        metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
             .syncLeaderWithConsistencyCheck(false);
         int localResult = getLocalPathCount(pathUnderSG, level);
         logger.debug("{}: get path count of {} locally, result {}", metaGroupMember.getName(),
@@ -254,13 +254,13 @@ public class ClusterPlanExecutor extends PlanExecutor {
           AsyncDataClient client = metaGroupMember
               .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
           client.setTimeout(RaftServer.getReadOperationTimeoutMS());
-          count = SyncClientAdaptor.getPathCount(client, partitionGroup.getHeader(),
+          count = SyncClientAdaptor.getPathCount(client, partitionGroup.getHeader(), partitionGroup.getId(),
               pathsToQuery, level);
         } else {
           SyncDataClient syncDataClient = metaGroupMember
               .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
           syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
-          count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
+          count = syncDataClient.getPathCount(partitionGroup.getHeader(), partitionGroup.getId(), pathsToQuery, level);
           ClientUtils.putBackSyncClient(syncDataClient);
         }
 
@@ -338,12 +338,12 @@ public class ClusterPlanExecutor extends PlanExecutor {
   private List<PartialPath> getLocalNodesList(PartitionGroup group, PartialPath schemaPattern,
       int level) throws CheckConsistencyException, MetadataException {
     Node header = group.getHeader();
-    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header, group.getId());
     localDataMember.syncLeaderWithConsistencyCheck(false);
     try {
       return IoTDB.metaManager.getNodesList(schemaPattern, level,
           new SlotSgFilter(
-              ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header)));
+              ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header, group.getId())));
     } catch (MetadataException e) {
       logger
           .error("Cannot not get node list of {}@{} from {} locally", schemaPattern, level, group);
@@ -360,11 +360,11 @@ public class ClusterPlanExecutor extends PlanExecutor {
           AsyncDataClient client = metaGroupMember
               .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
           paths = SyncClientAdaptor
-              .getNodeList(client, group.getHeader(), schemaPattern.getFullPath(), level);
+              .getNodeList(client, group.getHeader(), group.getId(), schemaPattern.getFullPath(), level);
         } else {
           SyncDataClient syncDataClient = metaGroupMember
               .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
-          paths = syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
+          paths = syncDataClient.getNodeList(group.getHeader(), group.getId(), schemaPattern.getFullPath(), level);
           ClientUtils.putBackSyncClient(syncDataClient);
         }
 
@@ -446,7 +446,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
   private Set<String> getLocalNextChildren(PartitionGroup group, PartialPath path)
       throws CheckConsistencyException {
     Node header = group.getHeader();
-    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header, group.getId());
     localDataMember.syncLeaderWithConsistencyCheck(false);
     try {
       return IoTDB.metaManager.getChildNodePathInNextLevel(path);
@@ -465,12 +465,12 @@ public class ClusterPlanExecutor extends PlanExecutor {
           AsyncDataClient client = metaGroupMember
               .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
           nextChildren = SyncClientAdaptor
-              .getNextChildren(client, group.getHeader(), path.getFullPath());
+              .getNextChildren(client, group.getHeader(), group.getId(), path.getFullPath());
         } else {
           SyncDataClient syncDataClient = metaGroupMember
               .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
           nextChildren = syncDataClient
-              .getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
+              .getChildNodePathInNextLevel(group.getHeader(), group.getId(), path.getFullPath());
           ClientUtils.putBackSyncClient(syncDataClient);
         }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index d4fe221..49e20af 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -425,7 +425,7 @@ public class LocalQueryExecutor {
     }
     List<Integer> nodeSlots =
         ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable()).getNodeSlots(
-            dataGroupMember.getHeader());
+            dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
     try {
       if (ascending) {
         AggregationExecutor.aggregateOneSeries(new PartialPath(path), allSensors, context, timeFilter,
@@ -490,7 +490,7 @@ public class LocalQueryExecutor {
 
     ClusterQueryUtils.checkPathExistence(path);
     List<Integer> nodeSlots = ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
-        .getNodeSlots(dataGroupMember.getHeader());
+        .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
     LocalGroupByExecutor executor = new LocalGroupByExecutor(path,
         deviceMeasurements, dataType, context, timeFilter, new SlotTsFileFilter(nodeSlots), ascending);
     for (Integer aggregationType : aggregationTypes) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java
index 439dbe0..cb7abd3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java
@@ -25,13 +25,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.db.query.context.QueryContext;
 
 public class RemoteQueryContext extends QueryContext {
   /**
    * The remote nodes that are queried in this query, grouped by the header nodes.
    */
-  private Map<Node, Set<Node>> queriedNodesMap = new HashMap<>();
+  private Map<RaftNode, Set<Node>> queriedNodesMap = new HashMap<>();
   /**
    * The readers constructed locally to respond a remote query.
    */
@@ -46,8 +47,8 @@ public class RemoteQueryContext extends QueryContext {
     super(jobId);
   }
 
-  public void registerRemoteNode(Node node, Node header) {
-    queriedNodesMap.computeIfAbsent(header, n -> new HashSet<>()).add(node);
+  public void registerRemoteNode(Node node, Node header, int raftId) {
+    queriedNodesMap.computeIfAbsent(new RaftNode(header, raftId), n -> new HashSet<>()).add(node);
   }
 
   public void registerLocalReader(long readerId) {
@@ -66,7 +67,7 @@ public class RemoteQueryContext extends QueryContext {
     return localGroupByExecutorIds;
   }
 
-  public Map<Node, Set<Node>> getQueriedNodesMap() {
+  public Map<RaftNode, Set<Node>> getQueriedNodesMap() {
     return queriedNodesMap;
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index 99067bc..c2b5974 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -125,7 +125,7 @@ public class ClusterAggregator {
           , partitionGroup, context, ascending);
     } else {
       // perform the aggregations locally
-      DataGroupMember dataMember = metaGroupMember.getLocalDataMember(partitionGroup.getHeader());
+      DataGroupMember dataMember = metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId());
       LocalQueryExecutor localQueryExecutor = new LocalQueryExecutor(dataMember);
       try {
         logger
@@ -186,7 +186,7 @@ public class ClusterAggregator {
             results.add(result);
           }
           // register the queried node to release resources when the query ends
-          ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader());
+          ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId());
           logger.debug("{}: queried aggregation {} of {} from {} of {} are {}",
               metaGroupMember.getName(),
               aggregations, path, node, partitionGroup.getHeader(), results);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index 3887b34..7ebc018 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -138,7 +138,7 @@ public class ClusterPreviousFill extends PreviousFill {
   private void localPreviousFill(PreviousFillArguments arguments, QueryContext context,
       PartitionGroup group,
       PreviousFillHandler fillHandler) {
-    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader());
+    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
     try {
       fillHandler
           .onComplete(
@@ -157,7 +157,7 @@ public class ClusterPreviousFill extends PreviousFill {
     PreviousFillRequest request = new PreviousFillRequest(arguments.getPath().getFullPath(),
         arguments.getQueryTime(),
         arguments.getBeforeRange(), context.getQueryId(), metaGroupMember.getThisNode(),
-        group.getHeader(),
+        group.getHeader(), group.getId(),
         arguments.getDataType().ordinal(),
         arguments.getDeviceMeasurements());
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
index f333baa..2078f09 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
@@ -47,17 +47,18 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
   private MetaGroupMember metaGroupMember;
   private Node source;
   private Node header;
+  private int raftId;
 
   private List<AggregateResult> results = new ArrayList<>();
 
 
   public RemoteGroupByExecutor(long executorId,
-      MetaGroupMember metaGroupMember, Node source, Node header) {
+      MetaGroupMember metaGroupMember, Node source, Node header, int raftId) {
     this.executorId = executorId;
     this.metaGroupMember = metaGroupMember;
     this.source = source;
     this.header = header;
-
+    this.raftId = raftId;
   }
 
   @Override
@@ -81,11 +82,11 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
         AsyncDataClient client = metaGroupMember
             .getClientProvider().getAsyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
         aggrBuffers = SyncClientAdaptor
-            .getGroupByResult(client, header, executorId, curStartTime, curEndTime);
+            .getGroupByResult(client, header, raftId, executorId, curStartTime, curEndTime);
       } else {
         SyncDataClient syncDataClient = metaGroupMember
             .getClientProvider().getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
-        aggrBuffers = syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
+        aggrBuffers = syncDataClient.getGroupByResult(header, raftId, executorId, curStartTime, curEndTime);
         ClientUtils.putBackSyncClient(syncDataClient);
       }
 
@@ -116,11 +117,11 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
         AsyncDataClient client = metaGroupMember
             .getClientProvider().getAsyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
         aggrBuffer = SyncClientAdaptor
-            .peekNextNotNullValue(client, header, executorId, nextStartTime, nextEndTime);
+            .peekNextNotNullValue(client, header, raftId,executorId, nextStartTime, nextEndTime);
       } else {
         SyncDataClient syncDataClient = metaGroupMember
             .getClientProvider().getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
-        aggrBuffer = syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
+        aggrBuffer = syncDataClient.peekNextNotNullValue(header, raftId, executorId, nextStartTime, nextEndTime);
         ClientUtils.putBackSyncClient(syncDataClient);
       }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index 1dac967..21b400a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -172,7 +172,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
         List<PartialPath> seriesPaths,
         QueryContext context)
         throws StorageEngineException, QueryProcessException, IOException {
-      DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader());
+      DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
       try {
         localDataMember.syncLeaderWithConsistencyCheck(false);
       } catch (CheckConsistencyException e) {
@@ -231,7 +231,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
       }
       buffer = SyncClientAdaptor
           .last(asyncDataClient, seriesPaths, dataTypeOrdinals, context, queryPlan.getDeviceToMeasurements(),
-              group.getHeader());
+              group.getHeader(), group.getId());
       return buffer;
     }
 
@@ -240,7 +240,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
           .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
       ByteBuffer result = syncDataClient
           .last(new LastQueryRequest(PartialPath.toStringList(seriesPaths), dataTypeOrdinals,
-              context.getQueryId(), queryPlan.getDeviceToMeasurements(), group.getHeader(),
+              context.getQueryId(), queryPlan.getDeviceToMeasurements(), group.getHeader(), group.getId(),
               syncDataClient.getNode()));
       ClientUtils.putBackSyncClient(syncDataClient);
       return result;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 3baab29..d99bbbf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -128,7 +128,7 @@ public class ClusterReaderFactory {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the target storage group contains this node, perform a local query
       DataGroupMember dataGroupMember =
-          metaGroupMember.getLocalDataMember(partitionGroup.getHeader());
+          metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId());
       if (logger.isDebugEnabled()) {
         logger.debug("{}: creating a local reader for {}#{}", metaGroupMember.getName(),
             path.getFullPath(),
@@ -224,7 +224,7 @@ public class ClusterReaderFactory {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the target storage group contains this node, perform a local query
       DataGroupMember dataGroupMember =
-          metaGroupMember.getLocalDataMember(partitionGroup.getHeader(),
+          metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId(),
               String.format("Query: %s, time filter: %s, queryId: %d", path, timeFilter,
                   context.getQueryId()));
       IPointReader seriesPointReader = getSeriesPointReader(path, deviceMeasurements, dataType,
@@ -268,7 +268,7 @@ public class ClusterReaderFactory {
     }
     return new SeriesRawDataPointReader(
         getSeriesReader(path, allSensors, dataType, timeFilter,
-            valueFilter, context, dataGroupMember.getHeader(), ascending));
+            valueFilter, context, dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId(), ascending));
 
   }
 
@@ -287,11 +287,11 @@ public class ClusterReaderFactory {
   private SeriesReader getSeriesReader(PartialPath path, Set<String> allSensors, TSDataType
       dataType,
       Filter timeFilter,
-      Filter valueFilter, QueryContext context, Node header, boolean ascending)
+      Filter valueFilter, QueryContext context, Node header, int raftId, boolean ascending)
       throws StorageEngineException, QueryProcessException {
     ClusterQueryUtils.checkPathExistence(path);
     List<Integer> nodeSlots =
-        ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header);
+        ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header, raftId);
     QueryDataSource queryDataSource =
         QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
     return new SeriesReader(path, allSensors, dataType, context, queryDataSource,
@@ -409,7 +409,7 @@ public class ClusterReaderFactory {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the target storage group contains this node, perform a local query
       DataGroupMember dataGroupMember = metaGroupMember
-          .getLocalDataMember(partitionGroup.getHeader());
+          .getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId());
       LocalQueryExecutor localQueryExecutor = new LocalQueryExecutor(dataGroupMember);
       logger.debug("{}: creating a local group by executor for {}#{}", metaGroupMember.getName(),
           path.getFullPath(), context.getQueryId());
@@ -461,13 +461,13 @@ public class ClusterReaderFactory {
 
         if (executorId != -1) {
           // record the queried node to release resources later
-          ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader());
+          ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId());
           logger.debug("{}: get an executorId {} for {}@{} from {}", metaGroupMember.getName(),
               executorId,
               aggregationTypes, path, node);
           // create a remote executor with the return id
           RemoteGroupByExecutor remoteGroupByExecutor = new RemoteGroupByExecutor(executorId,
-              metaGroupMember, node, partitionGroup.getHeader());
+              metaGroupMember, node, partitionGroup.getHeader(), partitionGroup.getId());
           for (Integer aggregationType : aggregationTypes) {
             remoteGroupByExecutor.addAggregateResult(AggregateResultFactory.getAggrResultByType(
                 AggregationType.values()[aggregationType], dataType, ascending));
@@ -530,7 +530,7 @@ public class ClusterReaderFactory {
     }
 
     SeriesReader seriesReader = getSeriesReader(path, allSensors, dataType, timeFilter,
-        valueFilter, context, dataGroupMember.getHeader(), ascending);
+        valueFilter, context, dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId(), ascending);
     if (seriesReader.isEmpty()) {
       return null;
     }
@@ -556,8 +556,8 @@ public class ClusterReaderFactory {
       throw new StorageEngineException(e);
     }
     SeriesReader seriesReader = getSeriesReader(path, allSensors, dataType,
-        TimeFilter.gtEq(Long.MIN_VALUE),
-        null, context, dataGroupMember.getHeader(), ascending);
+        TimeFilter.gtEq(Long.MIN_VALUE), null, context, dataGroupMember.getHeader(),
+        dataGroupMember.getRaftGroupId(), ascending);
     try {
       if (seriesReader.isEmpty()) {
         return null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 7f408de..b3adb16 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -92,7 +92,7 @@ public class DataSourceInfo {
           logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node);
           if (newReaderId != -1) {
             // register the node so the remote resources can be released
-            context.registerRemoteNode(node, partitionGroup.getHeader());
+            context.registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId());
             this.readerId = newReaderId;
             this.curSource = node;
             this.curPos = nextNodePos;
@@ -170,6 +170,10 @@ public class DataSourceInfo {
     }
   }
 
+  public int getRaftId() {
+    return partitionGroup.getId();
+  }
+
   public long getReaderId() {
     return this.readerId;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index 208ac95..3264398 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -67,7 +67,7 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
       fetchResult.set(null);
       try {
         sourceInfo.getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
-            .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(),
+            .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(), sourceInfo.getRaftId(),
                 sourceInfo.getReaderId(), timestamp, handler);
         fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
       } catch (TException e) {
@@ -90,7 +90,7 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
       SyncDataClient curSyncClient = sourceInfo
           .getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
       ByteBuffer buffer = curSyncClient
-          .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(),
+          .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(), sourceInfo.getRaftId(),
               sourceInfo.getReaderId(), timestamp);
       curSyncClient.putBack();
       return buffer;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
index 3cee99b..3c6d745 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
@@ -117,7 +117,7 @@ public class RemoteSimpleSeriesReader implements IPointReader {
       fetchResult.set(null);
       try {
         sourceInfo.getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
-            .fetchSingleSeries(sourceInfo.getHeader(),
+            .fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getRaftId(),
                 sourceInfo.getReaderId(), handler);
         fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
       } catch (TException e) {
@@ -140,7 +140,7 @@ public class RemoteSimpleSeriesReader implements IPointReader {
       SyncDataClient curSyncClient = sourceInfo
           .getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
       ByteBuffer buffer = curSyncClient
-          .fetchSingleSeries(sourceInfo.getHeader(),
+          .fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getRaftId(),
               sourceInfo.getReaderId());
       curSyncClient.putBack();
       return buffer;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index f0c554e..2fdc3f9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
 import org.apache.iotdb.cluster.query.ClusterPlanner;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -294,8 +295,8 @@ public class ClientServer extends TSServiceImpl {
     RemoteQueryContext context = queryContextMap.remove(queryId);
     if (context != null) {
       // release the resources in every queried node
-      for (Entry<Node, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
-        Node header = headerEntry.getKey();
+      for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
+        RaftNode header = headerEntry.getKey();
         Set<Node> queriedNodes = headerEntry.getValue();
 
         for (Node queriedNode : queriedNodes) {
@@ -305,12 +306,12 @@ public class ClientServer extends TSServiceImpl {
               AsyncDataClient client = metaGroupMember
                   .getClientProvider().getAsyncDataClient(queriedNode,
                       RaftServer.getReadOperationTimeoutMS());
-              client.endQuery(header, metaGroupMember.getThisNode(), queryId, handler);
+              client.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId, handler);
             } else {
               SyncDataClient syncDataClient = metaGroupMember
                   .getClientProvider().getSyncDataClient(queriedNode,
                       RaftServer.getReadOperationTimeoutMS());
-              syncDataClient.endQuery(header, metaGroupMember.getThisNode(), queryId);
+              syncDataClient.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId);
             }
           } catch (IOException | TException e) {
             logger.error("Cannot end query {} in {}", queryId, queriedNode);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 76fca3b..e5843f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
@@ -63,7 +64,6 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.service.DataAsyncService;
 import org.apache.iotdb.cluster.server.service.DataSyncService;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -81,9 +81,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   // key: the header of a data group, value: the member representing this node in this group and
   // it is currently at service
-  private Map<Pair<Node, Integer>, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>();
-  private Map<Pair<Node, Integer>, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>();
-  private Map<Pair<Node, Integer>, DataSyncService> syncServiceMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, DataSyncService> syncServiceMap = new ConcurrentHashMap<>();
   // key: the header of a data group, value: the member representing this node in this group but
   // it is out of service because another node has joined the group and expelled this node, or
   // the node itself is removed, but it is still stored to provide snapshot for other nodes
@@ -116,56 +116,66 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    * @param dataGroupMember
    */
   public void addDataGroupMember(DataGroupMember dataGroupMember) {
-    Pair<Node, Integer> pair = new Pair<>(dataGroupMember.getHeader(),
+    RaftNode raftNode = new RaftNode(dataGroupMember.getHeader(),
         dataGroupMember.getRaftGroupId());
-    DataGroupMember removedMember = headerGroupMap
-        .remove(pair);
+    DataGroupMember removedMember = headerGroupMap.remove(raftNode);
     if (removedMember != null) {
       removedMember.stop();
-      asyncServiceMap.remove(pair);
-      syncServiceMap.remove(pair);
+      asyncServiceMap.remove(raftNode);
+      syncServiceMap.remove(raftNode);
     }
-    stoppedMemberManager.remove(pair);
+    stoppedMemberManager.remove(raftNode);
 
-    headerGroupMap.put(pair, dataGroupMember);
+    headerGroupMap.put(raftNode, dataGroupMember);
   }
 
-  private <T> DataAsyncService getDataAsyncService(Node header, Integer id,
+  private <T> DataAsyncService getDataAsyncService(Node node, int raftId,
       AsyncMethodCallback<T> resultHandler, Object request) {
-    Pair<Node, Integer> pair = new Pair<>(header, id);
-    return asyncServiceMap.computeIfAbsent(pair, h -> {
-      DataGroupMember dataMember = getDataMember(pair, resultHandler, request);
+      return getDataAsyncService(new RaftNode(node, raftId), resultHandler, request);
+  }
+
+  private <T> DataAsyncService getDataAsyncService(RaftNode raftNode,
+      AsyncMethodCallback<T> resultHandler, Object request) {
+    return asyncServiceMap.computeIfAbsent(raftNode, h -> {
+      DataGroupMember dataMember = getDataMember(raftNode, resultHandler, request);
       return dataMember != null ? new DataAsyncService(dataMember) : null;
     });
   }
 
-  private DataSyncService getDataSyncService(Node header, Integer id) {
-    Pair<Node, Integer> pair = new Pair<>(header, id);
-    return syncServiceMap.computeIfAbsent(pair, h -> {
-      DataGroupMember dataMember = getDataMember(pair, null, null);
+  private DataSyncService getDataSyncService(Node header, int raftId) {
+    return getDataSyncService(new RaftNode(header, raftId));
+  }
+
+  private DataSyncService getDataSyncService(RaftNode header) {
+    return syncServiceMap.computeIfAbsent(header, h -> {
+      DataGroupMember dataMember = getDataMember(header, null, null);
       return dataMember != null ? new DataSyncService(dataMember) : null;
     });
   }
 
+  public <T> DataGroupMember getDataMember(Node node, int raftId,
+      AsyncMethodCallback<T> resultHandler, Object request) {
+    return getDataMember(new RaftNode(node, raftId), resultHandler, request);
+  }
+
   /**
-   * @param pair          the header of the group which the local node is in
+   * @param raftNode          the header of the group which the local node is in
    * @param resultHandler can be set to null if the request is an internal request
    * @param request       the toString() of this parameter should explain what the request is and it
    *                      is only used in logs for tracing
    * @return
    */
-  public <T> DataGroupMember getDataMember(Pair<Node, Integer> pair,
-      AsyncMethodCallback<T> resultHandler,
-      Object request) {
+  public <T> DataGroupMember getDataMember(RaftNode raftNode,
+      AsyncMethodCallback<T> resultHandler, Object request) {
     // if the resultHandler is not null, then the request is a external one and must be with a
     // header
-    if (pair.left == null) {
+    if (raftNode.getNode() == null) {
       if (resultHandler != null) {
         resultHandler.onError(new NoHeaderNodeException());
       }
       return null;
     }
-    DataGroupMember member = stoppedMemberManager.get(pair);
+    DataGroupMember member = stoppedMemberManager.get(raftNode);
     if (member != null) {
       return member;
     }
@@ -173,14 +183,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     // avoid creating two members for a header
     Exception ex = null;
     synchronized (headerGroupMap) {
-      member = headerGroupMap.get(pair);
+      member = headerGroupMap.get(raftNode);
       if (member != null) {
         return member;
       }
-      logger.info("Received a request \"{}\" from unregistered header {}", request, pair);
+      logger.info("Received a request \"{}\" from unregistered header {}", request, raftNode);
       if (partitionTable != null) {
         try {
-          member = createNewMember(pair);
+          member = createNewMember(raftNode);
         } catch (NotInSameGroupException | CheckConsistencyException e) {
           ex = e;
         }
@@ -196,37 +206,37 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   /**
-   * @param pair
+   * @param raftNode
    * @return A DataGroupMember representing this node in the data group of the header.
    * @throws NotInSameGroupException If this node is not in the group of the header.
    */
-  private DataGroupMember createNewMember(Pair<Node, Integer> pair)
+  private DataGroupMember createNewMember(RaftNode raftNode)
       throws NotInSameGroupException, CheckConsistencyException {
     DataGroupMember member;
     PartitionGroup partitionGroup;
-    partitionGroup = partitionTable.getHeaderGroup(pair);
+    partitionGroup = partitionTable.getHeaderGroup(raftNode);
     if (partitionGroup == null || !partitionGroup.contains(thisNode)) {
       // if the partition table is old, this node may have not been moved to the new group
       metaGroupMember.syncLeaderWithConsistencyCheck(true);
-      partitionGroup = partitionTable.getHeaderGroup(pair);
+      partitionGroup = partitionTable.getHeaderGroup(raftNode);
     }
     if (partitionGroup != null && partitionGroup.contains(thisNode)) {
       // the two nodes are in the same group, create a new data member
       member = dataMemberFactory.create(partitionGroup, thisNode);
-      DataGroupMember prevMember = headerGroupMap.put(pair, member);
+      DataGroupMember prevMember = headerGroupMap.put(raftNode, member);
       if (prevMember != null) {
         prevMember.stop();
       }
-      logger.info("Created a member for header {}", pair);
+      logger.info("Created a member for header {}", raftNode);
       member.start();
     } else {
       // the member may have been stopped after syncLeader
-      member = stoppedMemberManager.get(pair);
+      member = stoppedMemberManager.get(raftNode);
       if (member != null) {
         return member;
       }
       logger.info("This node {} does not belong to the group {}, header {}", thisNode,
-          partitionGroup, pair);
+          partitionGroup, raftNode);
       throw new NotInSameGroupException(partitionGroup, thisNode);
     }
     return member;
@@ -256,8 +266,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.appendEntries(request, resultHandler);
     }
@@ -265,8 +274,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.appendEntry(request, resultHandler);
     }
@@ -274,8 +282,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.sendSnapshot(request, resultHandler);
     }
@@ -284,8 +291,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void pullSnapshot(PullSnapshotRequest request,
       AsyncMethodCallback<PullSnapshotResp> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.pullSnapshot(request, resultHandler);
     }
@@ -294,26 +300,24 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void executeNonQueryPlan(ExecutNonQueryReq request,
       AsyncMethodCallback<TSStatus> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.executeNonQueryPlan(request, resultHandler);
     }
   }
 
   @Override
-  public void requestCommitIndex(Node header, int id, AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
-        "Request commit index");
+  public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) {
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Request commit index");
     if (service != null) {
-      service.requestCommitIndex(header, resultHandler);
+      service.requestCommitIndex(header, raftId, resultHandler);
     }
   }
 
   @Override
   public void readFile(String filePath, long offset, int length,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(thisNode, 0, resultHandler,
+    DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, 0), resultHandler,
         "Read file:" + filePath);
     if (service != null) {
       service.readFile(filePath, offset, length, resultHandler);
@@ -324,46 +328,44 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   public void querySingleSeries(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
     DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler,
-        "Query series:" + request.getPath());
+        resultHandler, "Query series:" + request.getPath());
     if (service != null) {
       service.querySingleSeries(request, resultHandler);
     }
   }
 
   @Override
-  public void fetchSingleSeries(Node header, int id, long readerId,
+  public void fetchSingleSeries(Node header, int raftId, long readerId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Fetch reader:" + readerId);
     if (service != null) {
-      service.fetchSingleSeries(header, readerId, resultHandler);
+      service.fetchSingleSeries(header, raftId, readerId, resultHandler);
     }
   }
 
   @Override
-  public void getAllPaths(Node header, int id, List<String> paths, boolean withAlias,
+  public void getAllPaths(Node header, int raftId, List<String> paths, boolean withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Find path:" + paths);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Find path:" + paths);
     if (service != null) {
-      service.getAllPaths(header, paths, withAlias, resultHandler);
+      service.getAllPaths(header, raftId, paths, withAlias, resultHandler);
     }
   }
 
   @Override
-  public void endQuery(Node header, int id, Node thisNode, long queryId,
+  public void endQuery(Node header, int raftId, Node thisNode, long queryId,
       AsyncMethodCallback<Void> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "End query");
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "End query");
     if (service != null) {
-      service.endQuery(header, thisNode, queryId, resultHandler);
+      service.endQuery(header, raftId, thisNode, queryId, resultHandler);
     }
   }
 
   @Override
   public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler,
         "Query by timestamp:" + request.getQueryId() + "#" + request.getPath() + " of " + request
             .getRequester());
     if (service != null) {
@@ -372,20 +374,19 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(Node header, int id, long readerId, long time,
+  public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Fetch by timestamp:" + readerId);
     if (service != null) {
-      service.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler);
+      service.fetchSingleSeriesByTimestamp(header, raftId, readerId, time, resultHandler);
     }
   }
 
   @Override
   public void pullTimeSeriesSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.pullTimeSeriesSchema(request, resultHandler);
     }
@@ -394,70 +395,67 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void pullMeasurementSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler,
         "Pull measurement schema");
     service.pullMeasurementSchema(request, resultHandler);
   }
 
   @Override
-  public void getAllDevices(Node header, int id, List<String> paths,
+  public void getAllDevices(Node header, int raftId, List<String> paths,
       AsyncMethodCallback<Set<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get all devices");
-    service.getAllDevices(header, paths, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Get all devices");
+    service.getAllDevices(header, raftId, paths, resultHandler);
   }
 
   @Override
-  public void getNodeList(Node header, int id, String path, int nodeLevel,
+  public void getNodeList(Node header, int raftId, String path, int nodeLevel,
       AsyncMethodCallback<List<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get node list");
-    service.getNodeList(header, path, nodeLevel, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Get node list");
+    service.getNodeList(header, raftId, path, nodeLevel, resultHandler);
   }
 
   @Override
-  public void getChildNodePathInNextLevel(Node header, int id, String path,
+  public void getChildNodePathInNextLevel(Node header, int raftId, String path,
       AsyncMethodCallback<Set<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Get child node path in next level");
-    service.getChildNodePathInNextLevel(header, path, resultHandler);
+    service.getChildNodePathInNextLevel(header, raftId, path, resultHandler);
   }
 
   @Override
-  public void getAllMeasurementSchema(Node header, int id, ByteBuffer planBytes,
+  public void getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBytes,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Get all measurement schema");
-    service.getAllMeasurementSchema(header, planBytes, resultHandler);
+    service.getAllMeasurementSchema(header, raftId, planBytes, resultHandler);
   }
 
   @Override
   public void getAggrResult(GetAggrResultRequest request,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     service.getAggrResult(request, resultHandler);
   }
 
   @Override
-  public void getUnregisteredTimeseries(Node header, int id, List<String> timeseriesList,
+  public void getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList,
       AsyncMethodCallback<List<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Check if measurements are registered");
-    service.getUnregisteredTimeseries(header, timeseriesList, resultHandler);
+    service.getUnregisteredTimeseries(header, raftId, timeseriesList, resultHandler);
   }
 
   @Override
   public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     service.getGroupByExecutor(request, resultHandler);
   }
 
   @Override
-  public void getGroupByResult(Node header, int id, long executorId, long startTime, long endTime,
+  public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Fetch group by");
-    service.getGroupByResult(header, executorId, startTime, endTime, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Fetch group by");
+    service.getGroupByResult(header, raftId, executorId, startTime, endTime, resultHandler);
   }
 
   @Override
@@ -633,9 +631,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    * which has no data. This is to make that member pull data from other nodes.
    */
   public void pullSnapshots() {
-    List<Integer> slots = ((SlotPartitionTable) partitionTable).getNodeSlots(thisNode);
-    DataGroupMember dataGroupMember = headerGroupMap.get(thisNode);
-    dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
+    for (int raftId = 0; raftId < ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor(); raftId++) {
+      RaftNode raftNode = new RaftNode(thisNode, raftId);
+      List<Integer> slots = ((SlotPartitionTable) partitionTable).getNodeSlots(raftNode);
+      DataGroupMember dataGroupMember = headerGroupMap.get(raftNode);
+      dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
+    }
   }
 
   /**
@@ -653,8 +654,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void previousFill(PreviousFillRequest request,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     service.previousFill(request, resultHandler);
   }
 
@@ -665,31 +665,30 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void matchTerm(long index, long term, Node header, int id,
+  public void matchTerm(long index, long term, Node header, int raftId,
       AsyncMethodCallback<Boolean> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Match term");
-    service.matchTerm(index, term, header, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Match term");
+    service.matchTerm(index, term, header, raftId, resultHandler);
   }
 
   @Override
   public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, "last");
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, "last");
     service.last(request, resultHandler);
   }
 
   @Override
-  public void getPathCount(Node header, int id, List<String> pathsToQuery, int level,
+  public void getPathCount(Node header, int raftId, List<String> pathsToQuery, int level,
       AsyncMethodCallback<Integer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "count path");
-    service.getPathCount(header, pathsToQuery, level, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "count path");
+    service.getPathCount(header, raftId, pathsToQuery, level, resultHandler);
   }
 
   @Override
-  public void onSnapshotApplied(Node header, int id, List<Integer> slots,
+  public void onSnapshotApplied(Node header, int raftId, List<Integer> slots,
       AsyncMethodCallback<Boolean> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Snapshot applied");
-    service.onSnapshotApplied(header, slots, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Snapshot applied");
+    service.onSnapshotApplied(header, raftId, slots, resultHandler);
   }
 
   @Override
@@ -699,13 +698,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public ByteBuffer fetchSingleSeries(Node header, int raftId, long readerId) throws TException {
-    return getDataSyncService(header, raftId).fetchSingleSeries(header, readerId);
+    return getDataSyncService(header, raftId).fetchSingleSeries(header, raftId, readerId);
   }
 
   @Override
   public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) throws TException {
-    return getDataSyncService(request.getHeader(), request.getRaftId())
-        .querySingleSeriesByTimestamp(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).querySingleSeriesByTimestamp(request);
   }
 
   @Override
@@ -713,42 +711,42 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       long timestamp)
       throws TException {
     return getDataSyncService(header, raftId)
-        .fetchSingleSeriesByTimestamp(header, readerId, timestamp);
+        .fetchSingleSeriesByTimestamp(header, raftId, readerId, timestamp);
   }
 
   @Override
   public void endQuery(Node header, int raftId, Node thisNode, long queryId) throws TException {
-    getDataSyncService(header, raftId).endQuery(header, thisNode, queryId);
+    getDataSyncService(header, raftId).endQuery(header, raftId, thisNode, queryId);
   }
 
   @Override
   public GetAllPathsResult getAllPaths(Node header, int raftId, List<String> path,
       boolean withAlias)
       throws TException {
-    return getDataSyncService(header, raftId).getAllPaths(header, path, withAlias);
+    return getDataSyncService(header, raftId).getAllPaths(header, raftId, path, withAlias);
   }
 
   @Override
   public Set<String> getAllDevices(Node header, int raftId, List<String> path) throws TException {
-    return getDataSyncService(header, raftId).getAllDevices(header, path);
+    return getDataSyncService(header, raftId).getAllDevices(header, raftId, path);
   }
 
   @Override
   public List<String> getNodeList(Node header, int raftId, String path, int nodeLevel)
       throws TException {
-    return getDataSyncService(header, raftId).getNodeList(header, path, nodeLevel);
+    return getDataSyncService(header, raftId).getNodeList(header, raftId, path, nodeLevel);
   }
 
   @Override
   public Set<String> getChildNodePathInNextLevel(Node header, int raftId, String path)
       throws TException {
-    return getDataSyncService(header, raftId).getChildNodePathInNextLevel(header, path);
+    return getDataSyncService(header, raftId).getChildNodePathInNextLevel(header, raftId, path);
   }
 
   @Override
   public ByteBuffer getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary)
       throws TException {
-    return getDataSyncService(header, raftId).getAllMeasurementSchema(header, planBinary);
+    return getDataSyncService(header, raftId).getAllMeasurementSchema(header, raftId, planBinary);
   }
 
   @Override
@@ -760,7 +758,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   public List<String> getUnregisteredTimeseries(Node header, int raftId,
       List<String> timeseriesList)
       throws TException {
-    return getDataSyncService(header, raftId).getUnregisteredTimeseries(header, timeseriesList);
+    return getDataSyncService(header, raftId).getUnregisteredTimeseries(header, raftId, timeseriesList);
   }
 
   @Override
@@ -776,20 +774,17 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public List<ByteBuffer> getGroupByResult(Node header, int raftId, long executorId, long startTime,
       long endTime) throws TException {
-    return getDataSyncService(header, raftId)
-        .getGroupByResult(header, executorId, startTime, endTime);
+    return getDataSyncService(header, raftId).getGroupByResult(header, raftId, executorId, startTime, endTime);
   }
 
   @Override
   public PullSchemaResp pullTimeSeriesSchema(PullSchemaRequest request) throws TException {
-    return getDataSyncService(request.getHeader(), request.getRaftId())
-        .pullTimeSeriesSchema(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).pullTimeSeriesSchema(request);
   }
 
   @Override
   public PullSchemaResp pullMeasurementSchema(PullSchemaRequest request) throws TException {
-    return getDataSyncService(request.getHeader(), request.getRaftId())
-        .pullMeasurementSchema(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).pullMeasurementSchema(request);
   }
 
   @Override
@@ -805,12 +800,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public int getPathCount(Node header, int raftId, List<String> pathsToQuery, int level)
       throws TException {
-    return getDataSyncService(header, raftId).getPathCount(header, pathsToQuery, level);
+    return getDataSyncService(header, raftId).getPathCount(header, raftId, pathsToQuery, level);
   }
 
   @Override
   public boolean onSnapshotApplied(Node header, int raftId, List<Integer> slots) {
-    return getDataSyncService(header, raftId).onSnapshotApplied(header, slots);
+    return getDataSyncService(header, raftId).onSnapshotApplied(header, raftId, slots);
   }
 
   @Override
@@ -840,51 +835,48 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
-    return getDataSyncService(request.getHeader(), request.getRaftId())
-        .executeNonQueryPlan(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).executeNonQueryPlan(request);
   }
 
   @Override
   public long requestCommitIndex(Node header, int raftId) throws TException {
-    return getDataSyncService(header, raftId).requestCommitIndex(header);
+    return getDataSyncService(header, raftId).requestCommitIndex(header, raftId);
   }
 
   @Override
   public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
-    return getDataSyncService(thisNode, 0).readFile(filePath, offset, length);
+    return getDataSyncService(new RaftNode(thisNode, 0)).readFile(filePath, offset, length);
   }
 
   @Override
   public boolean matchTerm(long index, long term, Node header, int raftId) {
-    return getDataSyncService(header, raftId).matchTerm(index, term, header);
+    return getDataSyncService(header, raftId).matchTerm(index, term, header, raftId);
   }
 
   @Override
   public ByteBuffer peekNextNotNullValue(Node header, int raftId, long executorId, long startTime,
       long endTime)
       throws TException {
-    return getDataSyncService(header, raftId)
-        .peekNextNotNullValue(header, executorId, startTime, endTime);
+    return getDataSyncService(header, raftId).peekNextNotNullValue(header, raftId, executorId, startTime, endTime);
   }
 
   @Override
   public void peekNextNotNullValue(Node header, int raftId, long executorId, long startTime,
       long endTime,
       AsyncMethodCallback<ByteBuffer> resultHandler) throws TException {
-    resultHandler.onComplete(
-        getDataSyncService(header, raftId)
-            .peekNextNotNullValue(header, executorId, startTime, endTime));
+    resultHandler.onComplete(getDataSyncService(header, raftId)
+            .peekNextNotNullValue(header, raftId, executorId, startTime, endTime));
   }
 
   @Override
   public void removeHardLink(String hardLinkPath) throws TException {
-    getDataSyncService(thisNode, 0).removeHardLink(hardLinkPath);
+    getDataSyncService(new RaftNode(thisNode, 0)).removeHardLink(hardLinkPath);
   }
 
   @Override
   public void removeHardLink(String hardLinkPath,
       AsyncMethodCallback<Void> resultHandler) {
-    getDataAsyncService(thisNode, 0, resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
+    getDataAsyncService(new RaftNode(thisNode, 0), resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
         resultHandler);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index a0fb04d..f8830c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -212,8 +212,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
-    asyncService.requestCommitIndex(header, resultHandler);
+  public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) {
+    asyncService.requestCommitIndex(header, raftId, resultHandler);
   }
 
   @Override
@@ -253,9 +253,9 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void matchTerm(long index, long term, Node header,
+  public void matchTerm(long index, long term, Node header, int raftId,
       AsyncMethodCallback<Boolean> resultHandler) {
-    asyncService.matchTerm(index, term, header, resultHandler);
+    asyncService.matchTerm(index, term, header, raftId, resultHandler);
   }
 
   @Override
@@ -319,8 +319,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public long requestCommitIndex(Node header) throws TException {
-    return syncService.requestCommitIndex(header);
+  public long requestCommitIndex(Node header, int raftId) throws TException {
+    return syncService.requestCommitIndex(header, raftId);
   }
 
   @Override
@@ -329,8 +329,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public boolean matchTerm(long index, long term, Node header) {
-    return syncService.matchTerm(index, term, header);
+  public boolean matchTerm(long index, long term, Node header, int raftId) {
+    return syncService.matchTerm(index, term, header, raftId);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
index b2bebbf..d60ba42 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.server;
 
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -31,6 +30,7 @@ import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTaskDescriptor;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.thrift.TException;
@@ -72,8 +72,7 @@ public class PullSnapshotHintService {
 
   public void registerHint(PullSnapshotTaskDescriptor descriptor) {
     PullSnapshotHint hint = new PullSnapshotHint();
-    hint.receivers = new ArrayList<>(descriptor.getPreviousHolders());
-    hint.header = descriptor.getPreviousHolders().getHeader();
+    hint.receivers = new PartitionGroup(descriptor.getPreviousHolders());
     hint.slots = descriptor.getSlots();
     hints.add(hint);
   }
@@ -116,7 +115,7 @@ public class PullSnapshotHintService {
   private boolean sendHintsAsync(Node receiver, PullSnapshotHint hint)
       throws TException, InterruptedException {
     AsyncDataClient asyncDataClient = (AsyncDataClient) member.getAsyncClient(receiver);
-    return SyncClientAdaptor.onSnapshotApplied(asyncDataClient, hint.header, hint.slots);
+    return SyncClientAdaptor.onSnapshotApplied(asyncDataClient, hint.getHeader(), hint.getRaftId(), hint.slots);
   }
 
   private boolean sendHintSync(Node receiver, PullSnapshotHint hint) throws TException {
@@ -124,7 +123,7 @@ public class PullSnapshotHintService {
     if (syncDataClient == null) {
       return false;
     }
-    return syncDataClient.onSnapshotApplied(hint.header, hint.slots);
+    return syncDataClient.onSnapshotApplied(hint.getHeader(), hint.getRaftId(), hint.slots);
   }
 
   private static class PullSnapshotHint {
@@ -132,10 +131,16 @@ public class PullSnapshotHintService {
     /**
      * Nodes to send this hint;
      */
-    private List<Node> receivers;
-
-    private Node header;
+    private PartitionGroup receivers;
 
     private List<Integer> slots;
+
+    public Node getHeader() {
+      return receivers.getHeader();
+    }
+
+    public int getRaftId() {
+      return receivers.getId();
+    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
index b203aaf..d941f84 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
@@ -54,7 +55,7 @@ public class StoppedMemberManager {
   private static final String REMOVED = "0";
   private static final String RESUMED = "1";
 
-  private Map<Pair<Node, Integer>, DataGroupMember> removedMemberMap = new HashMap<>();
+  private Map<RaftNode, DataGroupMember> removedMemberMap = new HashMap<>();
   private DataGroupMember.Factory memberFactory;
   private Node thisNode;
 
@@ -69,11 +70,11 @@ public class StoppedMemberManager {
    * When a DataGroupMember is removed, add it here and record this removal, so in next start-up we
    * can recover it as a data source for data transfers.
    *
-   * @param pair
+   * @param raftNode
    * @param dataGroupMember
    */
-  public synchronized void put(Pair<Node, Integer> pair, DataGroupMember dataGroupMember) {
-    removedMemberMap.put(pair, dataGroupMember);
+  public synchronized void put(RaftNode raftNode, DataGroupMember dataGroupMember) {
+    removedMemberMap.put(raftNode, dataGroupMember);
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(stoppedMembersFileName, true))) {
       StringBuilder builder = new StringBuilder(REMOVED);
       for (Node node : dataGroupMember.getAllNodes()) {
@@ -82,7 +83,7 @@ public class StoppedMemberManager {
       writer.write(builder.toString());
       writer.newLine();
     } catch (IOException e) {
-      logger.error("Cannot record removed member of header {}", pair, e);
+      logger.error("Cannot record removed member of header {}", raftNode, e);
     }
   }
 
@@ -90,20 +91,20 @@ public class StoppedMemberManager {
    * When a DataGroupMember is resumed, add it here and record this removal, so in next start-up we
    * will not recover it here.
    *
-   * @param pair
+   * @param raftNode
    */
-  public synchronized void remove(Pair<Node, Integer> pair) {
-    removedMemberMap.remove(pair);
+  public synchronized void remove(RaftNode raftNode) {
+    removedMemberMap.remove(raftNode);
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(stoppedMembersFileName, true))) {
-      writer.write(RESUMED + ";" + pair.toString());
+      writer.write(RESUMED + ";" + raftNode.toString());
       writer.newLine();
     } catch (IOException e) {
-      logger.error("Cannot record resumed member of header {}", pair, e);
+      logger.error("Cannot record resumed member of header {}", raftNode, e);
     }
   }
 
-  public synchronized DataGroupMember get(Pair<Node, Integer> pair) {
-    return removedMemberMap.get(pair);
+  public synchronized DataGroupMember get(RaftNode raftNode) {
+    return removedMemberMap.get(raftNode);
   }
 
   private void recover() {
@@ -147,7 +148,7 @@ public class StoppedMemberManager {
     DataGroupMember member = memberFactory.create(partitionGroup, thisNode);
     member.setReadOnly();
     //TODO CORRECT
-    removedMemberMap.put(new Pair(partitionGroup.getHeader(), 0), member);
+    removedMemberMap.put(new RaftNode(partitionGroup.getHeader(), 0), member);
   }
 
   private void parseResumed(String[] split) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 62f2b71..8a43818 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -75,6 +75,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
@@ -237,10 +238,6 @@ public class DataGroupMember extends RaftMember {
     return allNodes.get(0);
   }
 
-  public Integer getRaftGroupId() {
-    return allNodes.getId();
-  }
-
   public ClusterQueryManager getQueryManager() {
     return queryManager;
   }
@@ -470,29 +467,28 @@ public class DataGroupMember extends RaftMember {
     synchronized (logManager) {
       logger.info("{} pulling {} slots from remote", name, slots.size());
       PartitionedSnapshot<Snapshot> snapshot = (PartitionedSnapshot) logManager.getSnapshot();
-      Map<Integer, Node> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
-          .getPreviousNodeMap(newNode);
+      Map<Integer, RaftNode> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
+          .getPreviousNodeMap(new RaftNode(newNode, getRaftGroupId()));
 
       // group the slots by their owners
-      Map<Node, List<Integer>> holderSlotsMap = new HashMap<>();
+      Map<RaftNode, List<Integer>> holderSlotsMap = new HashMap<>();
       for (int slot : slots) {
         // skip the slot if the corresponding data is already replicated locally
         if (snapshot.getSnapshot(slot) == null) {
-          Node node = prevHolders.get(slot);
-          if (node != null) {
-            holderSlotsMap.computeIfAbsent(node, n -> new ArrayList<>()).add(slot);
+          RaftNode raftNode = prevHolders.get(slot);
+          if (raftNode != null) {
+            holderSlotsMap.computeIfAbsent(raftNode, n -> new ArrayList<>()).add(slot);
           }
         }
       }
 
       // pull snapshots from each owner's data group
-      for (Entry<Node, List<Integer>> entry : holderSlotsMap.entrySet()) {
-        Node node = entry.getKey();
+      for (Entry<RaftNode, List<Integer>> entry : holderSlotsMap.entrySet()) {
+        RaftNode raftNode = entry.getKey();
         List<Integer> nodeSlots = entry.getValue();
         PullSnapshotTaskDescriptor taskDescriptor =
             new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable()
-                .getHeaderGroup(new Pair<>(node, getRaftGroupId())),
-                nodeSlots, false);
+                .getHeaderGroup(raftNode), nodeSlots, false);
         pullFileSnapshot(taskDescriptor, null);
       }
     }
@@ -626,9 +622,9 @@ public class DataGroupMember extends RaftMember {
       List<Pair<Long, Boolean>> tmpPairList = entry.getValue();
       for (Pair<Long, Boolean> pair : tmpPairList) {
         long partitionId = pair.left;
-        Pair<Node, Integer> pair = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
+        RaftNode raftNode = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
             partitionId * StorageEngine.getTimePartitionInterval());
-        DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(pair);
+        DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(raftNode);
         if (localDataMember.getHeader().equals(this.getHeader())) {
           localListPair.add(new Pair<>(partitionId, pair.right));
         }
@@ -741,7 +737,7 @@ public class DataGroupMember extends RaftMember {
     synchronized (allNodes) {
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
-        allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+        allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
         initPeerMap();
         if (removedNode.equals(leader.get())) {
           // if the leader is removed, also start an election immediately
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 9f2fbb6..dcc4105 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -95,6 +95,7 @@ import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
@@ -147,7 +148,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransportException;
@@ -305,12 +305,11 @@ public class MetaGroupMember extends RaftMember {
    * close the partition through that member. Notice: only partitions owned by this node can be
    * closed by the method.
    *
-   * @return true if the member is a leader and the partition is closed, false otherwise
    */
   public void closePartition(String storageGroupName, long partitionId, boolean isSeq) {
-    Pair<Node, Integer> pair = partitionTable.routeToHeaderByTime(storageGroupName,
+    RaftNode raftNode = partitionTable.routeToHeaderByTime(storageGroupName,
         partitionId * StorageEngine.getTimePartitionInterval());
-    DataGroupMember localDataMember = getLocalDataMember(pair);
+    DataGroupMember localDataMember = getLocalDataMember(raftNode);
     if (localDataMember == null || localDataMember.getCharacter() != NodeCharacter.LEADER) {
       return;
     }
@@ -485,7 +484,6 @@ public class MetaGroupMember extends RaftMember {
    * This node is not a seed node and wants to join an established cluster. Pick up a node randomly
    * from the seed nodes and send a join request to it.
    *
-   * @return true if the node has successfully joined the cluster, false otherwise.
    */
   public void joinCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
     if (allNodes.size() == 1) {
@@ -1662,7 +1660,7 @@ public class MetaGroupMember extends RaftMember {
           .getOperationStartTime();
       logger.debug("Execute {} in a local group of {}", entry.getKey(),
           entry.getValue().getHeader());
-      result = getLocalDataMember(entry.getValue().getHeader())
+      result = getLocalDataMember(entry.getValue().getHeader(), entry.getValue().getId())
           .executeNonQueryPlan(entry.getKey());
       Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
           .calOperationCostTimeFromStart(startTime);
@@ -1839,7 +1837,7 @@ public class MetaGroupMember extends RaftMember {
       try {
         PartialPath storageGroupName = IoTDB.metaManager
             .getStorageGroupPath(path);
-        Set<Node> groupHeaders = new HashSet<>();
+        Set<RaftNode> groupHeaders = new HashSet<>();
         for (int i = 0; i < intervals.getIntervalSize(); i++) {
           // compute the headers of groups involved in every interval
           PartitionUtils
@@ -1847,7 +1845,7 @@ public class MetaGroupMember extends RaftMember {
                   intervals.getUpperBound(i), partitionTable, groupHeaders);
         }
         // translate the headers to groups
-        for (Node groupHeader : groupHeaders) {
+        for (RaftNode groupHeader : groupHeaders) {
           partitionGroups.add(partitionTable.getHeaderGroup(groupHeader));
         }
       } catch (MetadataException e) {
@@ -2130,17 +2128,26 @@ public class MetaGroupMember extends RaftMember {
    * @param request the toString() of this parameter should explain what the request is and it is
    *                only used in logs for tracing
    */
-  public DataGroupMember getLocalDataMember(Node header, Object request) {
-    return dataClusterServer.getDataMember(header, null, request);
+  public DataGroupMember getLocalDataMember(Node header, int raftId, Object request) {
+    return dataClusterServer.getDataMember(new RaftNode(header, raftId), null, request);
   }
 
   /**
    * Get a local DataGroupMember that is in the group of "header" for an internal request.
    *
-   * @param pair the header of the group which the local node is in
+   * @param node the header of the group which the local node is in
    */
-  public DataGroupMember getLocalDataMember(Pair<Node, Integer> pair) {
-    return dataClusterServer.getDataMember(pair, null, "Internal call");
+  public DataGroupMember getLocalDataMember(Node node, int raftId) {
+    return dataClusterServer.getDataMember(new RaftNode(node, raftId), null, "Internal call");
+  }
+
+  /**
+   * Get a local DataGroupMember that is in the group of "header" for an internal request.
+   *
+   * @param raftNode the header of the group which the local node is in
+   */
+  public DataGroupMember getLocalDataMember(RaftNode raftNode) {
+    return dataClusterServer.getDataMember(raftNode, null, "Internal call");
   }
 
   public DataClientProvider getClientProvider() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index f396e9c..aa9f8c8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -143,7 +143,9 @@ public abstract class RaftMember {
    * the lock is to make sure that only one thread can apply snapshot at the same time
    */
   private final Object snapshotApplyLock = new Object();
+
   protected Node thisNode = ClusterConstant.EMPTY_NODE;
+
   /**
    * the nodes that belong to the same raft group as thisNode.
    */
@@ -702,7 +704,7 @@ public abstract class RaftMember {
     }
     logger.info("{}: Start to make {} catch up", name, follower);
     if (!catchUpService.isShutdown()) {
-      Future<?> future = catchUpService.submit(new CatchUpTask(follower, peerMap.get(follower),
+      Future<?> future = catchUpService.submit(new CatchUpTask(follower, getRaftGroupId(), peerMap.get(follower),
           this, lastLogIdx));
       catchUpService.submit(() -> {
         try {
@@ -1007,7 +1009,7 @@ public abstract class RaftMember {
       return commitIdResult.get();
     }
     synchronized (commitIdResult) {
-      client.requestCommitIndex(getHeader(), get,new GenericHandler<>(leader.get(), commitIdResult));
+      client.requestCommitIndex(getHeader(), getRaftGroupId(), new GenericHandler<>(leader.get(), commitIdResult));
       commitIdResult.wait(RaftServer.getSyncLeaderMaxWaitMs());
     }
     return commitIdResult.get();
@@ -1023,7 +1025,7 @@ public abstract class RaftMember {
     }
     long commitIndex;
     try {
-      commitIndex = client.requestCommitIndex(getHeader());
+      commitIndex = client.requestCommitIndex(getHeader(), getRaftGroupId());
     } catch (TException e) {
       client.getInputProtocol().getTransport().close();
       throw e;
@@ -1864,7 +1866,12 @@ public abstract class RaftMember {
     return Response.RESPONSE_AGREE;
   }
 
+  public int getRaftGroupId() {
+    return allNodes.getId();
+  }
+
   enum AppendLogResult {
     OK, TIME_OUT, LEADERSHIP_STALE
   }
+
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 521050e..a059846 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.server.NodeCharacter;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index d974bf3..35af6e5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
@@ -280,8 +281,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public List<String> getUnregisteredTimeseries(Node header, int raftId,
-      List<String> timeseriesList)
+  public List<String> getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList)
       throws TException {
     try {
       return dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 6f5836e..0abf7d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -389,7 +390,7 @@ public class PartitionUtils {
    */
   public static void getIntervalHeaders(String storageGroupName, long timeLowerBound,
       long timeUpperBound,
-      PartitionTable partitionTable, Set<Node> result) {
+      PartitionTable partitionTable, Set<RaftNode> result) {
     long partitionInterval = StorageEngine.getTimePartitionInterval();
     long currPartitionStart = timeLowerBound / partitionInterval * partitionInterval;
     while (currPartitionStart <= timeUpperBound) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 2892b55..a0e5f56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -23,10 +23,12 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.collections4.map.MultiKeyMap;
 import org.apache.iotdb.cluster.ClusterMain;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.MetaClusterServer;
 import org.apache.iotdb.cluster.server.Timer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -104,10 +106,10 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
       return null;
     }
     List<PartitionGroup> localGroups = partitionTable.getLocalGroups();
-    Map<Node, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
+    Map<RaftNode, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
     Map<PartitionGroup, Integer> raftGroupMapSlotNum = new HashMap<>();
     for (PartitionGroup group : localGroups) {
-      raftGroupMapSlotNum.put(group, nodeSlotMap.get(group.getHeader()).size());
+      raftGroupMapSlotNum.put(group, nodeSlotMap.get(new RaftNode(group.getHeader(), group.getId())).size());
     }
     return raftGroupMapSlotNum;
   }
@@ -119,11 +121,14 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
       return null;
     }
     List<Node> allNodes = partitionTable.getAllNodes();
-    Map<Node, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
+    Map<RaftNode, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
     Map<PartitionGroup, Integer> raftGroupMapSlotNum = new HashMap<>();
     for (Node header : allNodes) {
-      raftGroupMapSlotNum
-          .put(partitionTable.getHeaderGroup(header), nodeSlotMap.get(header).size());
+      for(int raftId = 0; raftId < ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor(); raftId++) {
+        RaftNode raftNode = new RaftNode(header, raftId);
+        raftGroupMapSlotNum
+            .put(partitionTable.getHeaderGroup(raftNode), nodeSlotMap.get(raftNode).size());
+      }
     }
     return raftGroupMapSlotNum;
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
index 7030fc1..a29f1ca 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
@@ -37,7 +37,7 @@ public class AsyncDataClientTest {
 
     assertEquals(TestUtils.getNode(0), client.getNode());
 
-    client.matchTerm(0, 0, TestUtils.getNode(0), new AsyncMethodCallback<Boolean>() {
+    client.matchTerm(0, 0, TestUtils.getNode(0), 0, new AsyncMethodCallback<Boolean>() {
       @Override
       public void onComplete(Boolean aBoolean) {
         // do nothing
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
index ffca8c5..698357a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
@@ -37,7 +37,7 @@ public class AsyncMetaClientTest {
 
     assertEquals(TestUtils.getNode(0), client.getNode());
 
-    client.matchTerm(0, 0, TestUtils.getNode(0), new AsyncMethodCallback<Boolean>() {
+    client.matchTerm(0, 0, TestUtils.getNode(0), 0, new AsyncMethodCallback<Boolean>() {
       @Override
       public void onComplete(Boolean aBoolean) {
         // do nothing
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 8bd2812..36b17b1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -120,7 +120,7 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void matchTerm(long index, long term, Node header,
+      public void matchTerm(long index, long term, Node header, int raftId,
           AsyncMethodCallback<Boolean> resultHandler) {
         resultHandler.onComplete(true);
       }
@@ -163,19 +163,19 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void getNodeList(Node header, String path, int nodeLevel,
+      public void getNodeList(Node header, int raftId, String path, int nodeLevel,
           AsyncMethodCallback<List<String>> resultHandler) {
         resultHandler.onComplete(Arrays.asList("1", "2", "3"));
       }
 
       @Override
-      public void getChildNodePathInNextLevel(Node header, String path,
+      public void getChildNodePathInNextLevel(Node header, int raftId, String path,
           AsyncMethodCallback<Set<String>> resultHandler) {
         resultHandler.onComplete(new HashSet<>(Arrays.asList("1", "2", "3")));
       }
 
       @Override
-      public void getAllMeasurementSchema(Node header, ByteBuffer planBinary,
+      public void getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary,
           AsyncMethodCallback<ByteBuffer> resultHandler) {
         resultHandler.onComplete(getAllMeasurementSchemaResult);
       }
@@ -211,25 +211,25 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void getUnregisteredTimeseries(Node header, List<String> timeseriesList,
+      public void getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList,
           AsyncMethodCallback<List<String>> resultHandler) {
         resultHandler.onComplete(timeseriesList.subList(0, timeseriesList.size() / 2));
       }
 
       @Override
-      public void getAllPaths(Node header, List<String> path, boolean withAlias,
+      public void getAllPaths(Node header, int raftId, List<String> path, boolean withAlias,
           AsyncMethodCallback<GetAllPathsResult> resultHandler) {
         resultHandler.onComplete(new GetAllPathsResult(path));
       }
 
       @Override
-      public void getPathCount(Node header, List<String> pathsToQuery, int level,
+      public void getPathCount(Node header, int raftId, List<String> pathsToQuery, int level,
           AsyncMethodCallback<Integer> resultHandler) {
         resultHandler.onComplete(pathsToQuery.size());
       }
 
       @Override
-      public void getAllDevices(Node header, List<String> path,
+      public void getAllDevices(Node header, int raftId, List<String> path,
           AsyncMethodCallback<Set<String>> resultHandler) {
         resultHandler.onComplete(new HashSet<>(path));
       }
@@ -253,13 +253,13 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void getGroupByResult(Node header, long executorId, long startTime, long endTime,
+      public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
           AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
         resultHandler.onComplete(aggregateResults);
       }
 
       @Override
-      public void peekNextNotNullValue(Node header, long executorId, long startTime, long endTime,
+      public void peekNextNotNullValue(Node header, int raftId, long executorId, long startTime, long endTime,
           AsyncMethodCallback<ByteBuffer> resultHandler) {
         resultHandler.onComplete(peekNextNotNullValueResult);
       }
@@ -283,7 +283,7 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void onSnapshotApplied(Node header, List<Integer> slots,
+      public void onSnapshotApplied(Node header, int raftId, List<Integer> slots,
           AsyncMethodCallback<Boolean> resultHandler) {
         resultHandler.onComplete(true);
       }
@@ -296,7 +296,7 @@ public class SyncClientAdaptorTest {
     assertEquals(Response.RESPONSE_AGREE, (long) SyncClientAdaptor.removeNode(metaClient,
         TestUtils.getNode(0)));
     assertTrue(SyncClientAdaptor.matchTerm(metaClient, TestUtils.getNode(0), 1, 1,
-        TestUtils.getNode(0)));
+        TestUtils.getNode(0), 0));
     assertEquals(nodeStatus, SyncClientAdaptor.queryNodeStatus(metaClient));
     assertEquals(checkStatusResponse,
         SyncClientAdaptor.checkStatus(metaClient, new StartUpStatus()));
@@ -315,11 +315,11 @@ public class SyncClientAdaptorTest {
     assertEquals(1L, (long) SyncClientAdaptor.querySingleSeries(dataClient,
         new SingleSeriesQueryRequest(), 0));
     assertEquals(Arrays.asList("1", "2", "3"), SyncClientAdaptor.getNodeList(dataClient,
-        TestUtils.getNode(0), "root", 0));
+        TestUtils.getNode(0), 0, "root", 0));
     assertEquals(new HashSet<>(Arrays.asList("1", "2", "3")),
-        SyncClientAdaptor.getNextChildren(dataClient, TestUtils.getNode(0), "root"));
+        SyncClientAdaptor.getNextChildren(dataClient, TestUtils.getNode(0), 0, "root"));
     assertEquals(getAllMeasurementSchemaResult,
-        SyncClientAdaptor.getAllMeasurementSchema(dataClient, TestUtils.getNode(0),
+        SyncClientAdaptor.getAllMeasurementSchema(dataClient, TestUtils.getNode(0), 0,
             new ShowTimeSeriesPlan(new PartialPath("root"))));
     assertEquals(measurementSchemas, SyncClientAdaptor.pullMeasurementSchema(dataClient,
         new PullSchemaRequest()));
@@ -328,21 +328,21 @@ public class SyncClientAdaptorTest {
     assertEquals(aggregateResults, SyncClientAdaptor.getAggrResult(dataClient
         , new GetAggrResultRequest()));
     assertEquals(paths.subList(0, paths.size() / 2),
-        SyncClientAdaptor.getUnregisteredMeasurements(dataClient, TestUtils.getNode(0), paths));
-    assertEquals(paths, SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getNode(0), paths,
+        SyncClientAdaptor.getUnregisteredMeasurements(dataClient, TestUtils.getNode(0), 0, paths));
+    assertEquals(paths, SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getNode(0), 0, paths,
         false).paths);
     assertEquals(paths.size(), (int) SyncClientAdaptor.getPathCount(dataClient,
-        TestUtils.getNode(0),
+        TestUtils.getNode(0), 0,
         paths, 0));
     assertEquals(new HashSet<>(paths), SyncClientAdaptor.getAllDevices(dataClient,
-        TestUtils.getNode(0), paths));
+        TestUtils.getNode(0), 0, paths));
     assertEquals(1L, (long) SyncClientAdaptor.getGroupByExecutor(dataClient, new GroupByRequest()));
     assertEquals(fillResult, SyncClientAdaptor.previousFill(dataClient, new PreviousFillRequest()));
     assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000));
     assertEquals(aggregateResults, SyncClientAdaptor.getGroupByResult(dataClient,
-        TestUtils.getNode(0), 1, 1, 2));
+        TestUtils.getNode(0), 0, 1, 1, 2));
     assertEquals(peekNextNotNullValueResult, SyncClientAdaptor.peekNextNotNullValue(dataClient,
-        TestUtils.getNode(0), 1, 1, 1));
+        TestUtils.getNode(0), 0, 1, 1, 1));
     assertEquals(snapshotMap, SyncClientAdaptor.pullSnapshot(dataClient,
         new PullSnapshotRequest(), Arrays.asList(0, 1, 2),
         new SnapshotFactory<Snapshot>() {
@@ -359,8 +359,8 @@ public class SyncClientAdaptorTest {
     assertEquals(lastResult, SyncClientAdaptor.last(dataClient,
         Collections.singletonList(new PartialPath("1")),
         Collections.singletonList(TSDataType.INT64.ordinal()),
-        new QueryContext(), Collections.emptyMap(), TestUtils.getNode(0)));
-    assertTrue(SyncClientAdaptor.onSnapshotApplied(dataClient, TestUtils.getNode(0),
+        new QueryContext(), Collections.emptyMap(), TestUtils.getNode(0), 0));
+    assertTrue(SyncClientAdaptor.onSnapshotApplied(dataClient, TestUtils.getNode(0), 0,
         Arrays.asList(0, 1, 2)));
   }
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 118f7e5..63ce1b7 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -68,9 +68,9 @@ public class TestAsyncDataClient extends AsyncDataClient {
   }
 
   @Override
-  public void fetchSingleSeries(Node header, long readerId,
+  public void fetchSingleSeries(Node header, int raftId, long readerId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeries(header, readerId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeries(header, raftId, readerId,
         resultHandler)).start();
   }
 
@@ -89,16 +89,16 @@ public class TestAsyncDataClient extends AsyncDataClient {
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(Node header, long readerId, long time,
+  public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeriesByTimestamp(header,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeriesByTimestamp(header, raftId,
         readerId, time, resultHandler)).start();
   }
 
   @Override
-  public void getAllPaths(Node header, List<String> paths, boolean withAlias,
+  public void getAllPaths(Node header, int raftId, List<String> paths, boolean withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getAllPaths(header,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getAllPaths(header, raftId,
         paths, withAlias, resultHandler)).start();
   }
 
@@ -176,9 +176,9 @@ public class TestAsyncDataClient extends AsyncDataClient {
   }
 
   @Override
-  public void getGroupByResult(Node header, long executorId, long startTime, long endTime,
+  public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getGroupByResult(header, executorId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getGroupByResult(header, raftId, executorId,
         startTime, endTime, resultHandler)).start();
   }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
index b945af2..6f182d2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.common;
 import java.util.Collections;
 import java.util.List;
 import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.slot.SlotManager;
 import org.apache.iotdb.cluster.query.manage.ClusterQueryManager;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -33,10 +34,10 @@ public class TestDataGroupMember extends DataGroupMember {
     super();
     setQueryManager(new ClusterQueryManager());
     this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null);
-    this.allNodes = Collections.singletonList(TestUtils.getNode(0));
+    this.allNodes = new PartitionGroup(Collections.singletonList(TestUtils.getNode(0)));
   }
 
-  public TestDataGroupMember(Node thisNode, List<Node> allNodes) {
+  public TestDataGroupMember(Node thisNode, PartitionGroup allNodes) {
     super();
     this.thisNode = thisNode;
     this.allNodes = allNodes;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
index ce8ebd6..2a4e67a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
@@ -21,13 +21,14 @@ package org.apache.iotdb.cluster.common;
 
 import java.util.ArrayList;
 import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 
 public class TestMetaGroupMember extends MetaGroupMember {
 
   public TestMetaGroupMember() {
     super();
-    allNodes = new ArrayList<>();
+    allNodes = new PartitionGroup();
     thisNode = TestUtils.getNode(0);
     for (int i = 0; i < 10; i++) {
       allNodes.add(TestUtils.getNode(i));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
index a1c711a..2a13a79 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -118,7 +119,7 @@ public class LogDispatcherTest {
         };
       }
     };
-    List<Node> allNodes = new ArrayList<>();
+    PartitionGroup allNodes = new PartitionGroup();
     for (int i = 0; i < 10; i++) {
       allNodes.add(TestUtils.getNode(i));
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index df3cabc..0ca2a3f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -92,7 +92,7 @@ public class DataLogApplierTest extends IoTDBTest {
     }
 
     @Override
-    public DataGroupMember getLocalDataMember(Node header, Object request) {
+    public DataGroupMember getLocalDataMember(Node header, int raftId, Object request) {
       return testDataGroupMember;
     }
 
@@ -146,9 +146,9 @@ public class DataLogApplierTest extends IoTDBTest {
       public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
         return new AsyncDataClient(null, null, node, null) {
           @Override
-          public void getAllPaths(Node header, List<String> path, boolean withAlias,
+          public void getAllPaths(Node header, int raftId, List<String> path, boolean withAlias,
               AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-            new Thread(() -> new DataAsyncService(testDataGroupMember).getAllPaths(header, path,
+            new Thread(() -> new DataAsyncService(testDataGroupMember).getAllPaths(header, raftId, path,
                 withAlias, resultHandler)).start();
           }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index c431064..51f0aa6 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -84,7 +84,7 @@ public class CatchUpTaskTest {
         }
 
         @Override
-        public boolean matchTerm(long index, long term, Node header) {
+        public boolean matchTerm(long index, long term, Node header, int raftId) {
           return dummyMatchTerm(index, term);
         }
 
@@ -111,7 +111,7 @@ public class CatchUpTaskTest {
         }
 
         @Override
-        public void matchTerm(long index, long term, Node header,
+        public void matchTerm(long index, long term, Node header, int raftId,
             AsyncMethodCallback<Boolean> resultHandler) {
           new Thread(() -> resultHandler.onComplete(dummyMatchTerm(index, term))).start();
         }
@@ -217,7 +217,7 @@ public class CatchUpTaskTest {
     sender.setCharacter(NodeCharacter.LEADER);
     Peer peer = new Peer(10);
     peer.setMatchIndex(9);
-    CatchUpTask task = new CatchUpTask(receiver, peer, sender, 9);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 9);
     task.run();
 
     assertTrue(receivedLogs.isEmpty());
@@ -242,7 +242,7 @@ public class CatchUpTaskTest {
     sender.setCharacter(NodeCharacter.LEADER);
     Peer peer = new Peer(10);
     peer.setMatchIndex(0);
-    CatchUpTask task = new CatchUpTask(receiver, peer, sender, 5);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 5);
     task.run();
 
     assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -273,7 +273,7 @@ public class CatchUpTaskTest {
       sender.setCharacter(NodeCharacter.LEADER);
       Peer peer = new Peer(10);
       peer.setMatchIndex(0);
-      CatchUpTask task = new CatchUpTask(receiver, peer, sender, 5);
+      CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 5);
       task.run();
 
       assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -299,7 +299,7 @@ public class CatchUpTaskTest {
     sender.setCharacter(NodeCharacter.LEADER);
     Peer peer = new Peer(10);
     peer.setNextIndex(0);
-    CatchUpTask task = new CatchUpTask(receiver, peer, sender, 0);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
     ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
     task.run();
 
@@ -323,7 +323,7 @@ public class CatchUpTaskTest {
     sender.setCharacter(NodeCharacter.LEADER);
     Peer peer = new Peer(10);
     peer.setNextIndex(0);
-    CatchUpTask task = new CatchUpTask(receiver, peer, sender, 0);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
     task.run();
 
     assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -354,7 +354,7 @@ public class CatchUpTaskTest {
     peer.setMatchIndex(0);
     peer.setNextIndex(0);
 
-    CatchUpTask task = new CatchUpTask(receiver, peer, sender, 0);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
     task.setLogs(logList);
     try {
       // 1. case 1: the matched index is in the middle of the logs interval
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
index 8ed04b6..35852f4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
@@ -179,7 +179,7 @@ public class LogCatchUpTaskTest {
     List<Log> logList = TestUtils.prepareTestLogs(logSize);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, useBatch);
+    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, useBatch);
     task.call();
 
     assertEquals(logList, receivedLogs);
@@ -194,7 +194,7 @@ public class LogCatchUpTaskTest {
       List<Log> logList = TestUtils.prepareTestLogs(10);
       Node receiver = new Node();
       sender.setCharacter(NodeCharacter.LEADER);
-      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, false);
+      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, false);
       task.call();
 
       assertEquals(logList, receivedLogs);
@@ -210,7 +210,7 @@ public class LogCatchUpTaskTest {
     List<Log> logList = TestUtils.prepareTestLogs(10);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, false);
+    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, false);
     task.setUseBatch(false);
     try {
       task.call();
@@ -242,7 +242,7 @@ public class LogCatchUpTaskTest {
     List<Log> logList = TestUtils.prepareTestLogs(1030);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, true);
+    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, true);
     task.call();
 
     assertEquals(logList.subList(0, 1024), receivedLogs);
@@ -259,7 +259,7 @@ public class LogCatchUpTaskTest {
           + IoTDBConstant.LEFT_SIZE_IN_REQUEST);
       Node receiver = new Node();
       sender.setCharacter(NodeCharacter.LEADER);
-      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, true);
+      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, true);
       task.call();
 
       assertEquals(logList, receivedLogs);
@@ -278,7 +278,7 @@ public class LogCatchUpTaskTest {
       IoTDBDescriptor.getInstance().getConfig().setThriftMaxFrameSize(0);
       Node receiver = new Node();
       sender.setCharacter(NodeCharacter.LEADER);
-      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, true);
+      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, true);
       task.call();
 
       assertTrue(receivedLogs.isEmpty());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
index c1a2e6e..aea8c33 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
@@ -152,7 +152,7 @@ public class SnapshotCatchUpTaskTest {
     Snapshot snapshot = new TestSnapshot(9989);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+    SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
     task.call();
 
     assertEquals(logList, receivedLogs);
@@ -172,7 +172,7 @@ public class SnapshotCatchUpTaskTest {
     Snapshot snapshot = new TestSnapshot(9989);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+    SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
     task.call();
 
     assertTrue(receivedLogs.isEmpty());
@@ -195,7 +195,7 @@ public class SnapshotCatchUpTaskTest {
       Snapshot snapshot = new TestSnapshot(9989);
       Node receiver = new Node();
       sender.setCharacter(NodeCharacter.LEADER);
-      SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+      SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
       task.call();
 
       assertEquals(logList, receivedLogs);
@@ -213,7 +213,7 @@ public class SnapshotCatchUpTaskTest {
     Snapshot snapshot = new TestSnapshot(9989);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+    LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
     try {
       task.call();
       fail("Expected LeaderUnknownException");
@@ -244,7 +244,7 @@ public class SnapshotCatchUpTaskTest {
     Snapshot snapshot = new TestSnapshot(9989);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.ELECTOR);
-    LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+    LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
     try {
       task.call();
       fail("Expected LeaderUnknownException");
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
index 04b0959..f0fc359 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
@@ -84,8 +84,7 @@ public class PartitionedSnapshotTest extends DataSnapshotTest {
     snapshot.setLastLogIndex(10);
     snapshot.setLastLogTerm(5);
 
-    SnapshotInstaller<PartitionedSnapshot> defaultInstaller = snapshot
-        .getDefaultInstaller(dataGroupMember);
+    SnapshotInstaller<PartitionedSnapshot> defaultInstaller = snapshot.getDefaultInstaller(dataGroupMember);
     for (int i = 0; i < 10; i++) {
       dataGroupMember.getSlotManager().setToPulling(i, TestUtils.getNode(0));
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
index ae49671..d9ce485 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
@@ -61,7 +61,6 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
index 801d0c1..9501962 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.cluster.partition.slot.SlotNodeRemovalResult;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.query.ClusterPlanRouter;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -95,6 +96,7 @@ public class SlotPartitionTableTest {
   SlotPartitionTable localTable;
   Node localNode;
   int replica_size = 5;
+  int raftId = 0;
   MManager[] mManager;
 
   SlotPartitionTable[] tables;//The PartitionTable on each node.
@@ -133,11 +135,11 @@ public class SlotPartitionTableTest {
     for (int i = 0; i < 20; i++) {
       storageNames[i] = String.format("root.sg.l2.l3.%d", i);
       //determine which node the sg belongs to
-      Node node = localTable.routeToHeaderByTime(storageNames[i], 0);
-      nodeSGs[node.getMetaPort() - 30000].add(storageNames[i]);
+      RaftNode node = localTable.routeToHeaderByTime(storageNames[i], 0);
+      nodeSGs[node.getNode().getMetaPort() - 30000].add(storageNames[i]);
       storageNames[i + 20] = String.format("root.sg.l2.l3.l4.%d", i + 20);
       node = localTable.routeToHeaderByTime(storageNames[i + 20], 0);
-      nodeSGs[node.getMetaPort() - 30000].add(storageNames[i + 20]);
+      nodeSGs[node.getNode().getMetaPort() - 30000].add(storageNames[i + 20]);
     }
     for (int i = 0; i < 20; i++) {
       mManager[i] = MManagerWhiteBox.newMManager("target/schemas/mlog_" + i);
@@ -238,9 +240,9 @@ public class SlotPartitionTableTest {
 
   @Test
   public void routeToHeader() {
-    Node node1 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 0);
-    Node node2 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 1);
-    Node node3 = localTable
+    RaftNode node1 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 0);
+    RaftNode node2 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 1);
+    RaftNode node3 = localTable
         .routeToHeaderByTime("root.sg.l2.l3.l4.28", 1 + StorageEngine.getTimePartitionInterval());
     assertEquals(node1, node2);
     assertNotEquals(node2, node3);
@@ -283,7 +285,7 @@ public class SlotPartitionTableTest {
   @Test
   public void getPreviousNodeMap() {
     //before adding or deleting node, it should be null
-    assertNull(localTable.getPreviousNodeMap(localNode));
+    assertNull(localTable.getPreviousNodeMap(new RaftNode(localNode, 0)));
     //TODO after adding or deleting node, it has data
   }
 
@@ -510,7 +512,7 @@ public class SlotPartitionTableTest {
 
   @Test
   public void testRemoveNode() {
-    List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0));
+    List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0), raftId);
     NodeRemovalResult nodeRemovalResult = localTable.removeNode(getNode(0));
     assertFalse(localTable.getAllNodes().contains(getNode(0)));
     PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
index d2f7d82..085bcf8 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
@@ -63,7 +63,7 @@ public class RemoteSeriesReaderByTimestampTest {
       public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
         return new AsyncDataClient(null, null, node, null) {
           @Override
-          public void fetchSingleSeriesByTimestamp(Node header, long readerId, long time,
+          public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time,
               AsyncMethodCallback<ByteBuffer> resultHandler) throws TException {
             if (failedNodes.contains(node)) {
               throw new TException("Node down.");
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
index 550feee..82bceba 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
@@ -73,7 +73,7 @@ public class RemoteSimpleSeriesReaderTest {
       public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
         return new AsyncDataClient(null, null, node, null) {
           @Override
-          public void fetchSingleSeries(Node header, long readerId,
+          public void fetchSingleSeries(Node header, int raftId, long readerId,
               AsyncMethodCallback<ByteBuffer> resultHandler)
               throws TException {
             if (failedNodes.contains(node)) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index d4bf336..e112d31 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -57,7 +58,7 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
-    public Node routeToHeaderByTime(String storageGroupName, long timestamp) {
+    public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) {
       return null;
     }
 
@@ -77,7 +78,12 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
-    public PartitionGroup getHeaderGroup(Node header) {
+    public PartitionGroup getHeaderGroup(RaftNode header) {
+      return null;
+    }
+
+    @Override
+    public PartitionGroup getHeaderGroup(Node node) {
       return null;
     }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 96dc257..8180d4e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -123,6 +123,7 @@ public class DataGroupMemberTest extends MemberTest {
   private boolean hasInitialSnapshots;
   private boolean enableSyncLeader;
   private int prevReplicationNum;
+  private int raftId = 0;
 
   @Before
   public void setUp() throws Exception {
@@ -210,7 +211,7 @@ public class DataGroupMemberTest extends MemberTest {
             }
 
             @Override
-            public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
+            public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) {
               new Thread(() -> {
                 if (enableSyncLeader) {
                   resultHandler.onComplete(-1L);
@@ -253,7 +254,7 @@ public class DataGroupMemberTest extends MemberTest {
   @Test
   public void testAddNode() {
     System.out.println("Start testAddNode()");
-    PartitionGroup partitionGroup = new PartitionGroup(TestUtils.getNode(0),
+    PartitionGroup partitionGroup = new PartitionGroup(raftId, TestUtils.getNode(0),
         TestUtils.getNode(50), TestUtils.getNode(90));
     DataGroupMember firstMember = getDataGroupMember(TestUtils.getNode(0),
         new PartitionGroup(partitionGroup));
@@ -632,7 +633,7 @@ public class DataGroupMemberTest extends MemberTest {
     GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0),
         dataResult);
     new DataAsyncService(dataGroupMember)
-        .fetchSingleSeries(TestUtils.getNode(0), readerId, dataHandler);
+        .fetchSingleSeries(TestUtils.getNode(0), raftId, readerId, dataHandler);
     ByteBuffer dataBuffer = dataResult.get();
     BatchData batchData = SerializeUtils.deserializeBatchData(dataBuffer);
     for (int i = 5; i < 10; i++) {
@@ -643,7 +644,7 @@ public class DataGroupMemberTest extends MemberTest {
     }
     assertFalse(batchData.hasCurrent());
 
-    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
         new GenericHandler<>(TestUtils.getNode(0), null));
   }
 
@@ -690,7 +691,7 @@ public class DataGroupMemberTest extends MemberTest {
     GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0),
         dataResult);
     new DataAsyncService(dataGroupMember)
-        .fetchSingleSeries(TestUtils.getNode(0), readerId, dataHandler);
+        .fetchSingleSeries(TestUtils.getNode(0), raftId, readerId, dataHandler);
     ByteBuffer dataBuffer = dataResult.get();
     BatchData batchData = SerializeUtils.deserializeBatchData(dataBuffer);
     for (int i = 5; i < 9; i++) {
@@ -701,7 +702,7 @@ public class DataGroupMemberTest extends MemberTest {
     }
     assertFalse(batchData.hasCurrent());
 
-    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
         new GenericHandler<>(TestUtils.getNode(0), null));
   }
 
@@ -751,13 +752,13 @@ public class DataGroupMemberTest extends MemberTest {
 
     for (int i = 5; i < 10; i++) {
       new DataAsyncService(dataGroupMember)
-          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i,
+          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), raftId, readerId, i,
               dataHandler);
       Object value = SerializeUtils.deserializeObject(dataResult.get());
       assertEquals(i * 1.0, (Double) value, 0.00001);
     }
 
-    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
         new GenericHandler<>(TestUtils.getNode(0), null));
   }
 
@@ -806,13 +807,13 @@ public class DataGroupMemberTest extends MemberTest {
         dataResult);
     for (int i = 5; i < 9; i++) {
       new DataAsyncService(dataGroupMember)
-          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i,
+          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), raftId, readerId, i,
               dataHandler);
       Object value = SerializeUtils.deserializeObject(dataResult.get());
       assertEquals(i * 1.0, (Double) value, 0.00001);
     }
 
-    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
         new GenericHandler<>(TestUtils.getNode(0), null));
   }
 
@@ -823,7 +824,7 @@ public class DataGroupMemberTest extends MemberTest {
     AtomicReference<GetAllPathsResult> pathResult = new AtomicReference<>();
     GenericHandler<GetAllPathsResult> handler = new GenericHandler<>(TestUtils.getNode(0), pathResult);
     new DataAsyncService(dataGroupMember)
-        .getAllPaths(TestUtils.getNode(0), Collections.singletonList(path), false, handler);
+        .getAllPaths(TestUtils.getNode(0), raftId, Collections.singletonList(path), false, handler);
     List<String> result = pathResult.get().paths;
     assertEquals(20, result.size());
     for (int i = 0; i < 10; i++) {
@@ -835,7 +836,7 @@ public class DataGroupMemberTest extends MemberTest {
   public void testFetchWithoutQuery() {
     System.out.println("Start testFetchWithoutQuery()");
     AtomicReference<Exception> result = new AtomicReference<>();
-    new DataAsyncService(dataGroupMember).fetchSingleSeriesByTimestamp(TestUtils.getNode(0), 0, 0,
+    new DataAsyncService(dataGroupMember).fetchSingleSeriesByTimestamp(TestUtils.getNode(0), raftId, 0, 0,
         new AsyncMethodCallback<ByteBuffer>() {
           @Override
           public void onComplete(ByteBuffer buffer) {
@@ -850,7 +851,7 @@ public class DataGroupMemberTest extends MemberTest {
     assertTrue(exception instanceof ReaderNotFoundException);
     assertEquals("The requested reader 0 is not found", exception.getMessage());
 
-    new DataAsyncService(dataGroupMember).fetchSingleSeries(TestUtils.getNode(0), 0,
+    new DataAsyncService(dataGroupMember).fetchSingleSeries(TestUtils.getNode(0), raftId, 0,
         new AsyncMethodCallback<ByteBuffer>() {
           @Override
           public void onComplete(ByteBuffer buffer) {
@@ -995,7 +996,7 @@ public class DataGroupMemberTest extends MemberTest {
       aggrResultRef = new AtomicReference<>();
       aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
       new DataAsyncService(dataGroupMember)
-          .getGroupByResult(TestUtils.getNode(10), executorId, 0, 20, aggrResultHandler);
+          .getGroupByResult(TestUtils.getNode(10), raftId, executorId, 0, 20, aggrResultHandler);
 
       byteBuffers = aggrResultRef.get();
       assertNotNull(byteBuffers);
@@ -1024,7 +1025,7 @@ public class DataGroupMemberTest extends MemberTest {
       aggrResultRef = new AtomicReference<>();
       aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
       new DataAsyncService(dataGroupMember)
-          .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20, aggrResultHandler);
+          .getGroupByResult(TestUtils.getNode(30), raftId, executorId, 0, 20, aggrResultHandler);
 
       byteBuffers = aggrResultRef.get();
       assertNull(byteBuffers);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 4f8a07b..6065370 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -230,13 +230,13 @@ public class MemberTest {
     MetaGroupMember ret = new TestMetaGroupMember() {
 
       @Override
-      public DataGroupMember getLocalDataMember(Node header,
+      public DataGroupMember getLocalDataMember(Node header, int raftId,
           Object request) {
         return getDataGroupMember(header);
       }
 
       @Override
-      public DataGroupMember getLocalDataMember(Node header) {
+      public DataGroupMember getLocalDataMember(Node header, int raftId) {
         return getDataGroupMember(header);
       }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 32b0b30..1badcd2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -305,13 +305,13 @@ public class MetaGroupMemberTest extends MemberTest {
       }
 
       @Override
-      public DataGroupMember getLocalDataMember(Node header,
+      public DataGroupMember getLocalDataMember(Node header, int raftId,
           Object request) {
         return getDataGroupMember(header);
       }
 
       @Override
-      public DataGroupMember getLocalDataMember(Node header) {
+      public DataGroupMember getLocalDataMember(Node header, int raftId) {
         return getDataGroupMember(header);
       }
 
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 56f1dd4..549cd55 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -121,6 +121,11 @@ struct Node {
   5: required int clientPort
 }
 
+struct RaftNode {
+  1: required Node node
+  2: required int raftId
+}
+
 // leader -> follower
 struct StartUpStatus {
   1: required long partitionInterval