You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2020/12/19 10:37:06 UTC

[iotdb] branch cluster_multi_raft created (now d5c075d)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch cluster_multi_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at d5c075d  add multi-raft except for add/remove node

This branch includes the following new commits:

     new 55fc053  init
     new d5c075d  add multi-raft except for add/remove node

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/02: init

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch cluster_multi_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 55fc053da4fa93d8959a35404e3fda4bdf7788fa
Author: LebronAl <TX...@gmail.com>
AuthorDate: Fri Dec 11 19:49:04 2020 +0800

    init
---
 .../iotdb/cluster/partition/PartitionGroup.java    |  34 +-
 .../iotdb/cluster/partition/PartitionTable.java    |   7 +-
 .../cluster/partition/slot/SlotPartitionTable.java | 387 ++++++++++---------
 .../iotdb/cluster/server/DataClusterServer.java    | 426 +++++++++++----------
 .../iotdb/cluster/server/StoppedMemberManager.java |  48 +--
 .../cluster/server/member/DataGroupMember.java     |  23 +-
 .../cluster/server/member/MetaGroupMember.java     |  21 +-
 .../iotdb/cluster/server/member/RaftMember.java    |   7 +-
 .../cluster/server/service/BaseAsyncService.java   |   6 +-
 .../cluster/server/service/BaseSyncService.java    |   6 +-
 .../cluster/server/service/DataAsyncService.java   |  61 +--
 .../cluster/server/service/DataSyncService.java    |  34 +-
 thrift/src/main/thrift/cluster.thrift              |  80 ++--
 13 files changed, 630 insertions(+), 510 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index 8bf9f52..dc5cbcc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -21,45 +21,61 @@ package org.apache.iotdb.cluster.partition;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 
 /**
  * PartitionGroup contains all the nodes that will form a data group with a certain node, which are
- * the REPLICATION_NUM - 1 different physical nodes after this node.
- * The first element of the list is called header, which is also the identifier of the data group.
+ * the REPLICATION_NUM - 1 different physical nodes after this node. The first element of the list
+ * is called header, which is also the identifier of the data group.
  */
 public class PartitionGroup extends ArrayList<Node> {
 
-  private Node thisNode;
+  private int id;
 
   public PartitionGroup() {
   }
 
-  public PartitionGroup(Node... nodes) {
+  public PartitionGroup(Collection<Node> nodes) {
+    this.addAll(nodes);
+  }
+
+  public PartitionGroup(int id, Node... nodes) {
     this.addAll(Arrays.asList(nodes));
+    this.id = id;
   }
 
   public PartitionGroup(PartitionGroup other) {
     super(other);
-    this.thisNode = other.thisNode;
+    this.id = other.getId();
   }
 
   @Override
   public boolean equals(Object o) {
-    return super.equals(o);
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PartitionGroup group = (PartitionGroup) o;
+    return Objects.equals(id, group.getId()) &&
+        super.equals(group);
   }
 
   @Override
   public int hashCode() {
-    return super.hashCode();
+    return Objects.hash(id, getHeader());
   }
 
+
   public Node getHeader() {
     return get(0);
   }
 
-  public void setThisNode(Node thisNode) {
-    this.thisNode = thisNode;
+  public int getId() {
+    return id;
   }
 
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 81f0199..a1a5ae6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 /**
  * PartitionTable manages the map whose key is the StorageGroupName with a time interval and the
@@ -54,7 +55,7 @@ public interface PartitionTable {
    * @param timestamp
    * @return
    */
-  Node routeToHeaderByTime(String storageGroupName, long timestamp);
+  Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long timestamp);
 
   /**
    * Add a new node to update the partition table.
@@ -78,10 +79,10 @@ public interface PartitionTable {
   List<PartitionGroup> getLocalGroups();
 
   /**
-   * @param header
+   * @param pair
    * @return the partition group starting from the header.
    */
-  PartitionGroup getHeaderGroup(Node header);
+  PartitionGroup getHeaderGroup(Pair<Node, Integer> pair);
 
   ByteBuffer serialize();
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index ead856c..a1f98ce 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -14,8 +14,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -31,6 +29,7 @@ import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotStrategy.DefaultStrategy;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +46,8 @@ public class SlotPartitionTable implements PartitionTable {
   private int replicationNum =
       ClusterDescriptor.getInstance().getConfig().getReplicationNum();
 
+  private int raftGroupNum = 2;
+
   //all nodes
   private List<Node> nodeRing = new ArrayList<>();
   //normally, it is equal to ClusterConstant.SLOT_NUM.
@@ -54,12 +55,12 @@ public class SlotPartitionTable implements PartitionTable {
 
   //The following fields are used for determining which node a data item belongs to.
   // the slots held by each node
-  private Map<Node, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>();
+  private Map<Pair<Node, Integer>, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>();
   // each slot is managed by whom
-  private Node[] slotNodes = new Node[ClusterConstant.SLOT_NUM];
+  private Pair<Node, Integer>[] slotNodes = new Pair[ClusterConstant.SLOT_NUM];
   // the nodes that each slot belongs to before a new node is added, used for the new node to
   // find the data source
-  private Map<Node, Map<Integer, Node>> previousNodeMap = new ConcurrentHashMap<>();
+  private Map<Pair<Node, Integer>, Map<Integer, Node>> previousNodeMap = new ConcurrentHashMap<>();
 
   //the filed is used for determining which nodes need to be a group.
   // the data groups which this node belongs to.
@@ -111,8 +112,11 @@ public class SlotPartitionTable implements PartitionTable {
     // evenly assign the slots to each node
     int nodeNum = nodeRing.size();
     int slotsPerNode = totalSlotNumbers / nodeNum;
+    int slotsPerRaftGroup = slotsPerNode / raftGroupNum;
     for (Node node : nodeRing) {
-      nodeSlotMap.put(node, new ArrayList<>());
+      for (int i = 0; i < raftGroupNum; i++) {
+        nodeSlotMap.put(new Pair<>(node, i), new ArrayList<>());
+      }
     }
 
     for (int i = 0; i < totalSlotNumbers; i++) {
@@ -121,11 +125,17 @@ public class SlotPartitionTable implements PartitionTable {
         // the last node may receive a little more if total slots cannot de divided by node number
         nodeIdx--;
       }
-      nodeSlotMap.get(nodeRing.get(nodeIdx)).add(i);
+      for (int j = 0; j < nodeIdx; j++) {
+        int groupIdx = j / slotsPerRaftGroup;
+        if (groupIdx >= raftGroupNum) {
+          groupIdx--;
+        }
+        nodeSlotMap.get(new Pair<>(nodeRing.get(nodeIdx), groupIdx)).add(i);
+      }
     }
 
     // build the index to find a node by slot
-    for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) {
+    for (Entry<Pair<Node, Integer>, List<Integer>> entry : nodeSlotMap.entrySet()) {
       for (Integer slot : entry.getValue()) {
         slotNodes[slot] = entry.getKey();
       }
@@ -145,7 +155,9 @@ public class SlotPartitionTable implements PartitionTable {
       if (startIndex < 0) {
         startIndex = startIndex + nodeRing.size();
       }
-      ret.add(getHeaderGroup(nodeRing.get(startIndex)));
+      for (int j = 0; j < raftGroupNum; j++) {
+        ret.add(getHeaderGroup(new Pair<>(nodeRing.get(startIndex), j)));
+      }
     }
 
     logger.debug("The partition groups of {} are: {}", node, ret);
@@ -153,13 +165,13 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public PartitionGroup getHeaderGroup(Node node) {
-    PartitionGroup ret = new PartitionGroup();
+  public PartitionGroup getHeaderGroup(Pair<Node, Integer> pair) {
+    PartitionGroup ret = new PartitionGroup(pair.right);
 
     // assuming the nodes are [1,2,3,4,5]
-    int nodeIndex = nodeRing.indexOf(node);
+    int nodeIndex = nodeRing.indexOf(pair.left);
     if (nodeIndex == -1) {
-      logger.error("Node {} is not in the cluster", node);
+      logger.error("Node {} is not in the cluster", pair.left);
       return null;
     }
     int endIndex = nodeIndex + replicationNum;
@@ -177,8 +189,8 @@ public class SlotPartitionTable implements PartitionTable {
   @Override
   public PartitionGroup route(String storageGroupName, long timestamp) {
     synchronized (nodeRing) {
-      Node node = routeToHeaderByTime(storageGroupName, timestamp);
-      return getHeaderGroup(node);
+      Pair<Node, Integer> pair = routeToHeaderByTime(storageGroupName, timestamp);
+      return getHeaderGroup(pair);
     }
   }
 
@@ -188,112 +200,115 @@ public class SlotPartitionTable implements PartitionTable {
           Thread.currentThread().getStackTrace());
       return null;
     }
-    Node node = slotNodes[slot];
-    logger.debug("The slot of {} is held by {}", slot, node);
-    if (node == null) {
+    Pair<Node, Integer> pair = slotNodes[slot];
+    logger.debug("The slot of {} is held by {}", slot, pair);
+    if (pair.left == null) {
       logger.warn("The slot {} is incorrect", slot);
       return null;
     }
-    return getHeaderGroup(node);
+    return getHeaderGroup(pair);
   }
 
   @Override
-  public Node routeToHeaderByTime(String storageGroupName, long timestamp) {
+  public Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long timestamp) {
     synchronized (nodeRing) {
       int slot = getSlotStrategy()
           .calculateSlotByTime(storageGroupName, timestamp, getTotalSlotNumbers());
-      Node node = slotNodes[slot];
+      Pair<Node, Integer> pair = slotNodes[slot];
       logger.trace("The slot of {}@{} is {}, held by {}", storageGroupName, timestamp,
-          slot, node);
-      return node;
+          slot, pair);
+      return pair;
     }
   }
 
   @Override
   public NodeAdditionResult addNode(Node node) {
-    synchronized (nodeRing) {
-      if (nodeRing.contains(node)) {
-        return null;
-      }
-
-      nodeRing.add(node);
-      nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
-
-      List<PartitionGroup> retiredGroups = new ArrayList<>();
-      for (int i = 0; i < localGroups.size(); i++) {
-        PartitionGroup oldGroup = localGroups.get(i);
-        Node header = oldGroup.getHeader();
-        PartitionGroup newGrp = getHeaderGroup(header);
-        if (newGrp.contains(node) && newGrp.contains(thisNode)) {
-          // this group changes but still contains the local node
-          localGroups.set(i, newGrp);
-        } else if (newGrp.contains(node) && !newGrp.contains(thisNode)) {
-          // the local node retires from the group
-          retiredGroups.add(newGrp);
-        }
-      }
-
-      // remove retired groups
-      Iterator<PartitionGroup> groupIterator = localGroups.iterator();
-      while (groupIterator.hasNext()) {
-        PartitionGroup partitionGroup = groupIterator.next();
-        for (PartitionGroup retiredGroup : retiredGroups) {
-          if (retiredGroup.getHeader().equals(partitionGroup.getHeader())) {
-            groupIterator.remove();
-            break;
-          }
-        }
-      }
-    }
-
-    SlotNodeAdditionResult result = new SlotNodeAdditionResult();
-    PartitionGroup newGroup = getHeaderGroup(node);
-    if (newGroup.contains(thisNode)) {
-      localGroups.add(newGroup);
-    }
-    result.setNewGroup(newGroup);
-
-    calculateGlobalGroups();
-
-    // the slots movement is only done logically, the new node itself will pull data from the
-    // old node
-    result.setLostSlots(moveSlotsToNew(node));
-
-    return result;
+//    synchronized (nodeRing) {
+//      if (nodeRing.contains(node)) {
+//        return null;
+//      }
+//
+//      nodeRing.add(node);
+//      nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
+//
+//      List<PartitionGroup> retiredGroups = new ArrayList<>();
+//      for (int i = 0; i < localGroups.size(); i++) {
+//        PartitionGroup oldGroup = localGroups.get(i);
+//        Node header = oldGroup.getHeader();
+//        PartitionGroup newGrp = getHeaderGroup(header);
+//        if (newGrp.contains(node) && newGrp.contains(thisNode)) {
+//          // this group changes but still contains the local node
+//          localGroups.set(i, newGrp);
+//        } else if (newGrp.contains(node) && !newGrp.contains(thisNode)) {
+//          // the local node retires from the group
+//          retiredGroups.add(newGrp);
+//        }
+//      }
+//
+//      // remove retired groups
+//      Iterator<PartitionGroup> groupIterator = localGroups.iterator();
+//      while (groupIterator.hasNext()) {
+//        PartitionGroup partitionGroup = groupIterator.next();
+//        for (PartitionGroup retiredGroup : retiredGroups) {
+//          if (retiredGroup.getHeader().equals(partitionGroup.getHeader())) {
+//            groupIterator.remove();
+//            break;
+//          }
+//        }
+//      }
+//    }
+//
+//    SlotNodeAdditionResult result = new SlotNodeAdditionResult();
+//    PartitionGroup newGroup = getHeaderGroup(node);
+//    if (newGroup.contains(thisNode)) {
+//      localGroups.add(newGroup);
+//    }
+//    result.setNewGroup(newGroup);
+//
+//    calculateGlobalGroups();
+//
+//    // the slots movement is only done logically, the new node itself will pull data from the
+//    // old node
+//    result.setLostSlots(moveSlotsToNew(node));
+//
+//    return result;
+    return null;
   }
 
 
   /**
    * Move last slots from each group whose slot number is bigger than the new average to the new
    * node.
+   *
    * @param newNode
    * @return a map recording what slots each group lost.
    */
   private Map<Node, Set<Integer>> moveSlotsToNew(Node newNode) {
-    Map<Node, Set<Integer>> result = new HashMap<>();
-    // as a node is added, the average slots for each node decrease
-    // move the slots to the new node if any previous node have more slots than the new average
-    List<Integer> newSlots = new ArrayList<>();
-    Map<Integer, Node> previousHolders = new HashMap<>();
-    int newAvg = totalSlotNumbers / nodeRing.size();
-    for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) {
-      List<Integer> slots = entry.getValue();
-      int transferNum = slots.size() - newAvg;
-      if (transferNum > 0) {
-        List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size());
-        newSlots.addAll(slotsToMove);
-        for (Integer slot : slotsToMove) {
-          // record what node previously hold the integer
-          previousHolders.put(slot, entry.getKey());
-          slotNodes[slot] = newNode;
-        }
-        result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove);
-        slotsToMove.clear();
-      }
-    }
-    nodeSlotMap.put(newNode, newSlots);
-    previousNodeMap.put(newNode, previousHolders);
-    return result;
+//    Map<Node, Set<Integer>> result = new HashMap<>();
+//    // as a node is added, the average slots for each node decrease
+//    // move the slots to the new node if any previous node have more slots than the new average
+//    List<Integer> newSlots = new ArrayList<>();
+//    Map<Integer, Node> previousHolders = new HashMap<>();
+//    int newAvg = totalSlotNumbers / nodeRing.size();
+//    for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) {
+//      List<Integer> slots = entry.getValue();
+//      int transferNum = slots.size() - newAvg;
+//      if (transferNum > 0) {
+//        List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size());
+//        newSlots.addAll(slotsToMove);
+//        for (Integer slot : slotsToMove) {
+//          // record what node previously hold the integer
+//          previousHolders.put(slot, entry.getKey());
+//          slotNodes[slot] = newNode;
+//        }
+//        result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove);
+//        slotsToMove.clear();
+//      }
+//    }
+//    nodeSlotMap.put(newNode, newSlots);
+//    previousNodeMap.put(newNode, previousHolders);
+//    return result;
+    return null;
   }
 
   @Override
@@ -310,22 +325,23 @@ public class SlotPartitionTable implements PartitionTable {
     try {
       dataOutputStream.writeInt(totalSlotNumbers);
       dataOutputStream.writeInt(nodeSlotMap.size());
-      for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) {
-        SerializeUtils.serialize(entry.getKey(), dataOutputStream);
+      for (Entry<Pair<Node, Integer>, List<Integer>> entry : nodeSlotMap.entrySet()) {
+        SerializeUtils.serialize(entry.getKey().left, dataOutputStream);
+        dataOutputStream.writeInt(entry.getKey().right);
         SerializeUtils.serialize(entry.getValue(), dataOutputStream);
       }
 
-      dataOutputStream.writeInt(previousNodeMap.size());
-      for (Entry<Node, Map<Integer, Node>> nodeMapEntry : previousNodeMap.entrySet()) {
-        dataOutputStream.writeInt(nodeMapEntry.getKey().getNodeIdentifier());
-
-        Map<Integer, Node> prevHolders = nodeMapEntry.getValue();
-        dataOutputStream.writeInt(prevHolders.size());
-        for (Entry<Integer, Node> integerNodeEntry : prevHolders.entrySet()) {
-          dataOutputStream.writeInt(integerNodeEntry.getKey());
-          dataOutputStream.writeInt(integerNodeEntry.getValue().getNodeIdentifier());
-        }
-      }
+//      dataOutputStream.writeInt(previousNodeMap.size());
+//      for (Entry<Node, Map<Integer, Node>> nodeMapEntry : previousNodeMap.entrySet()) {
+//        dataOutputStream.writeInt(nodeMapEntry.getKey().getNodeIdentifier());
+//
+//        Map<Integer, Node> prevHolders = nodeMapEntry.getValue();
+//        dataOutputStream.writeInt(prevHolders.size());
+//        for (Entry<Integer, Node> integerNodeEntry : prevHolders.entrySet()) {
+//          dataOutputStream.writeInt(integerNodeEntry.getKey());
+//          dataOutputStream.writeInt(integerNodeEntry.getValue().getNodeIdentifier());
+//        }
+//      }
 
       dataOutputStream.writeLong(lastLogIndex);
     } catch (IOException ignored) {
@@ -345,32 +361,38 @@ public class SlotPartitionTable implements PartitionTable {
       Node node = new Node();
       List<Integer> slots = new ArrayList<>();
       SerializeUtils.deserialize(node, buffer);
+      int id = buffer.getInt();
       SerializeUtils.deserialize(slots, buffer);
-      nodeSlotMap.put(node, slots);
+      Pair pair = new Pair<>(node, id);
+      nodeSlotMap.put(pair, slots);
       idNodeMap.put(node.getNodeIdentifier(), node);
       for (Integer slot : slots) {
-        slotNodes[slot] = node;
+        slotNodes[slot] = pair;
       }
     }
 
-    int prevNodeMapSize = buffer.getInt();
-    previousNodeMap = new HashMap<>();
-    for (int i = 0; i < prevNodeMapSize; i++) {
-      int nodeId = buffer.getInt();
-      Node node = idNodeMap.get(nodeId);
-
-      Map<Integer, Node> prevHolders = new HashMap<>();
-      int holderNum = buffer.getInt();
-      for (int i1 = 0; i1 < holderNum; i1++) {
-        int slot = buffer.getInt();
-        Node holder = idNodeMap.get(buffer.getInt());
-        prevHolders.put(slot, holder);
-      }
-      previousNodeMap.put(node, prevHolders);
-    }
+//    int prevNodeMapSize = buffer.getInt();
+//    previousNodeMap = new HashMap<>();
+//    for (int i = 0; i < prevNodeMapSize; i++) {
+//      int nodeId = buffer.getInt();
+//      Node node = idNodeMap.get(nodeId);
+//
+//      Map<Integer, Node> prevHolders = new HashMap<>();
+//      int holderNum = buffer.getInt();
+//      for (int i1 = 0; i1 < holderNum; i1++) {
+//        int slot = buffer.getInt();
+//        Node holder = idNodeMap.get(buffer.getInt());
+//        prevHolders.put(slot, holder);
+//      }
+//      previousNodeMap.put(node, prevHolders);
+//    }
     lastLogIndex = buffer.getLong();
 
-    nodeRing.addAll(nodeSlotMap.keySet());
+    for (Pair<Node, Integer> nodeIntegerPair : nodeSlotMap.keySet()) {
+      if (!nodeRing.contains(nodeIntegerPair.left)) {
+        nodeRing.add(nodeIntegerPair.left);
+      }
+    }
     nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
     logger.info("All known nodes: {}", nodeRing);
 
@@ -390,7 +412,7 @@ public class SlotPartitionTable implements PartitionTable {
     return nodeSlotMap.get(header);
   }
 
-  public Map<Node, List<Integer>> getAllNodeSlots() {
+  public Map<Pair<Node, Integer>, List<Integer>> getAllNodeSlots() {
     return nodeSlotMap;
   }
 
@@ -421,62 +443,63 @@ public class SlotPartitionTable implements PartitionTable {
 
   @Override
   public NodeRemovalResult removeNode(Node target) {
-    synchronized (nodeRing) {
-      if (!nodeRing.contains(target)) {
-        return null;
-      }
-
-      SlotNodeRemovalResult result = new SlotNodeRemovalResult();
-      result.setRemovedGroup(getHeaderGroup(target));
-      nodeRing.remove(target);
-
-      // if the node belongs to a group that headed by target, this group should be removed
-      // and other groups containing target should be updated
-      int removedGroupIdx = -1;
-      for (int i = 0; i < localGroups.size(); i++) {
-        PartitionGroup oldGroup = localGroups.get(i);
-        Node header = oldGroup.getHeader();
-        if (header.equals(target)) {
-          removedGroupIdx = i;
-        } else {
-          PartitionGroup newGrp = getHeaderGroup(header);
-          localGroups.set(i, newGrp);
-        }
-      }
-      if (removedGroupIdx != -1) {
-        localGroups.remove(removedGroupIdx);
-        // each node exactly joins replicationNum groups, so when a group is removed, the node
-        // should join a new one
-        int thisNodeIdx = nodeRing.indexOf(thisNode);
-        // this node must be the last node of the new group
-        int headerNodeIdx = thisNodeIdx - (replicationNum - 1);
-        headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx;
-        Node header = nodeRing.get(headerNodeIdx);
-        PartitionGroup newGrp = getHeaderGroup(header);
-        localGroups.add(newGrp);
-        result.setNewGroup(newGrp);
-      }
-
-      calculateGlobalGroups();
-
-      // the slots movement is only done logically, the new node itself will pull data from the
-      // old node
-      Map<Node, List<Integer>> nodeListMap = retrieveSlots(target);
-      result.setNewSlotOwners(nodeListMap);
-      return result;
-    }
+//    synchronized (nodeRing) {
+//      if (!nodeRing.contains(target)) {
+//        return null;
+//      }
+//
+//      SlotNodeRemovalResult result = new SlotNodeRemovalResult();
+//      result.setRemovedGroup(getHeaderGroup(target));
+//      nodeRing.remove(target);
+//
+//      // if the node belongs to a group that headed by target, this group should be removed
+//      // and other groups containing target should be updated
+//      int removedGroupIdx = -1;
+//      for (int i = 0; i < localGroups.size(); i++) {
+//        PartitionGroup oldGroup = localGroups.get(i);
+//        Node header = oldGroup.getHeader();
+//        if (header.equals(target)) {
+//          removedGroupIdx = i;
+//        } else {
+//          PartitionGroup newGrp = getHeaderGroup(header);
+//          localGroups.set(i, newGrp);
+//        }
+//      }
+//      if (removedGroupIdx != -1) {
+//        localGroups.remove(removedGroupIdx);
+//        // each node exactly joins replicationNum groups, so when a group is removed, the node
+//        // should join a new one
+//        int thisNodeIdx = nodeRing.indexOf(thisNode);
+//        // this node must be the last node of the new group
+//        int headerNodeIdx = thisNodeIdx - (replicationNum - 1);
+//        headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx;
+//        Node header = nodeRing.get(headerNodeIdx);
+//        PartitionGroup newGrp = getHeaderGroup(header);
+//        localGroups.add(newGrp);
+//        result.setNewGroup(newGrp);
+//      }
+//
+//      calculateGlobalGroups();
+//
+//      // the slots movement is only done logically, the new node itself will pull data from the
+//      // old node
+//      Map<Node, List<Integer>> nodeListMap = retrieveSlots(target);
+//      result.setNewSlotOwners(nodeListMap);
+//      return result;
+//    }
+    return null;
   }
 
   private Map<Node, List<Integer>> retrieveSlots(Node target) {
     Map<Node, List<Integer>> newHolderSlotMap = new HashMap<>();
-    List<Integer> slots = nodeSlotMap.remove(target);
-    for (int i = 0; i < slots.size(); i++) {
-      int slot = slots.get(i);
-      Node newHolder = nodeRing.get(i % nodeRing.size());
-      slotNodes[slot] = newHolder;
-      nodeSlotMap.get(newHolder).add(slot);
-      newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
-    }
+//    List<Integer> slots = nodeSlotMap.remove(target);
+//    for (int i = 0; i < slots.size(); i++) {
+//      int slot = slots.get(i);
+//      Node newHolder = nodeRing.get(i % nodeRing.size());
+//      slotNodes[slot] = newHolder;
+//      nodeSlotMap.get(newHolder).add(slot);
+//      newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
+//    }
     return newHolderSlotMap;
   }
 
@@ -494,7 +517,9 @@ public class SlotPartitionTable implements PartitionTable {
   private void calculateGlobalGroups() {
     globalGroups = new ArrayList<>();
     for (Node n : getAllNodes()) {
-      globalGroups.add(getHeaderGroup(n));
+      for (int i = 0; i < raftGroupNum; i++) {
+        globalGroups.add(getHeaderGroup(new Pair<>(n, i)));
+      }
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 391cf69..76fca3b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -22,10 +22,8 @@ package org.apache.iotdb.cluster.server;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -65,6 +63,7 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.service.DataAsyncService;
 import org.apache.iotdb.cluster.server.service.DataSyncService;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -82,9 +81,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   // key: the header of a data group, value: the member representing this node in this group and
   // it is currently at service
-  private Map<Node, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>();
-  private Map<Node, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>();
-  private Map<Node, DataSyncService> syncServiceMap = new ConcurrentHashMap<>();
+  private Map<Pair<Node, Integer>, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>();
+  private Map<Pair<Node, Integer>, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>();
+  private Map<Pair<Node, Integer>, DataSyncService> syncServiceMap = new ConcurrentHashMap<>();
   // key: the header of a data group, value: the member representing this node in this group but
   // it is out of service because another node has joined the group and expelled this node, or
   // the node itself is removed, but it is still stored to provide snapshot for other nodes
@@ -117,50 +116,56 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    * @param dataGroupMember
    */
   public void addDataGroupMember(DataGroupMember dataGroupMember) {
-    DataGroupMember removedMember = headerGroupMap.remove(dataGroupMember.getHeader());
+    Pair<Node, Integer> pair = new Pair<>(dataGroupMember.getHeader(),
+        dataGroupMember.getRaftGroupId());
+    DataGroupMember removedMember = headerGroupMap
+        .remove(pair);
     if (removedMember != null) {
       removedMember.stop();
-      asyncServiceMap.remove(dataGroupMember.getHeader());
-      syncServiceMap.remove(dataGroupMember.getHeader());
+      asyncServiceMap.remove(pair);
+      syncServiceMap.remove(pair);
     }
-    stoppedMemberManager.remove(dataGroupMember.getHeader());
+    stoppedMemberManager.remove(pair);
 
-    headerGroupMap.put(dataGroupMember.getHeader(), dataGroupMember);
+    headerGroupMap.put(pair, dataGroupMember);
   }
 
-  private <T> DataAsyncService getDataAsyncService(Node header,
+  private <T> DataAsyncService getDataAsyncService(Node header, Integer id,
       AsyncMethodCallback<T> resultHandler, Object request) {
-    return asyncServiceMap.computeIfAbsent(header, h -> {
-      DataGroupMember dataMember = getDataMember(h, resultHandler, request);
+    Pair<Node, Integer> pair = new Pair<>(header, id);
+    return asyncServiceMap.computeIfAbsent(pair, h -> {
+      DataGroupMember dataMember = getDataMember(pair, resultHandler, request);
       return dataMember != null ? new DataAsyncService(dataMember) : null;
     });
   }
 
-  private DataSyncService getDataSyncService(Node header) {
-    return syncServiceMap.computeIfAbsent(header, h -> {
-      DataGroupMember dataMember = getDataMember(h, null, null);
+  private DataSyncService getDataSyncService(Node header, Integer id) {
+    Pair<Node, Integer> pair = new Pair<>(header, id);
+    return syncServiceMap.computeIfAbsent(pair, h -> {
+      DataGroupMember dataMember = getDataMember(pair, null, null);
       return dataMember != null ? new DataSyncService(dataMember) : null;
     });
   }
 
   /**
-   * @param header        the header of the group which the local node is in
+   * @param pair          the header of the group which the local node is in
    * @param resultHandler can be set to null if the request is an internal request
    * @param request       the toString() of this parameter should explain what the request is and it
    *                      is only used in logs for tracing
    * @return
    */
-  public <T> DataGroupMember getDataMember(Node header, AsyncMethodCallback<T> resultHandler,
+  public <T> DataGroupMember getDataMember(Pair<Node, Integer> pair,
+      AsyncMethodCallback<T> resultHandler,
       Object request) {
     // if the resultHandler is not null, then the request is a external one and must be with a
     // header
-    if (header == null) {
+    if (pair.left == null) {
       if (resultHandler != null) {
         resultHandler.onError(new NoHeaderNodeException());
       }
       return null;
     }
-    DataGroupMember member = stoppedMemberManager.get(header);
+    DataGroupMember member = stoppedMemberManager.get(pair);
     if (member != null) {
       return member;
     }
@@ -168,14 +173,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     // avoid creating two members for a header
     Exception ex = null;
     synchronized (headerGroupMap) {
-      member = headerGroupMap.get(header);
+      member = headerGroupMap.get(pair);
       if (member != null) {
         return member;
       }
-      logger.info("Received a request \"{}\" from unregistered header {}", request, header);
+      logger.info("Received a request \"{}\" from unregistered header {}", request, pair);
       if (partitionTable != null) {
         try {
-          member = createNewMember(header);
+          member = createNewMember(pair);
         } catch (NotInSameGroupException | CheckConsistencyException e) {
           ex = e;
         }
@@ -191,37 +196,37 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   /**
-   * @param header
+   * @param pair
    * @return A DataGroupMember representing this node in the data group of the header.
    * @throws NotInSameGroupException If this node is not in the group of the header.
    */
-  private DataGroupMember createNewMember(Node header)
+  private DataGroupMember createNewMember(Pair<Node, Integer> pair)
       throws NotInSameGroupException, CheckConsistencyException {
     DataGroupMember member;
     PartitionGroup partitionGroup;
-    partitionGroup = partitionTable.getHeaderGroup(header);
+    partitionGroup = partitionTable.getHeaderGroup(pair);
     if (partitionGroup == null || !partitionGroup.contains(thisNode)) {
       // if the partition table is old, this node may have not been moved to the new group
       metaGroupMember.syncLeaderWithConsistencyCheck(true);
-      partitionGroup = partitionTable.getHeaderGroup(header);
+      partitionGroup = partitionTable.getHeaderGroup(pair);
     }
     if (partitionGroup != null && partitionGroup.contains(thisNode)) {
       // the two nodes are in the same group, create a new data member
       member = dataMemberFactory.create(partitionGroup, thisNode);
-      DataGroupMember prevMember = headerGroupMap.put(header, member);
+      DataGroupMember prevMember = headerGroupMap.put(pair, member);
       if (prevMember != null) {
         prevMember.stop();
       }
-      logger.info("Created a member for header {}", header);
+      logger.info("Created a member for header {}", pair);
       member.start();
     } else {
       // the member may have been stopped after syncLeader
-      member = stoppedMemberManager.get(header);
+      member = stoppedMemberManager.get(pair);
       if (member != null) {
         return member;
       }
       logger.info("This node {} does not belong to the group {}, header {}", thisNode,
-          partitionGroup, header);
+          partitionGroup, pair);
       throw new NotInSameGroupException(partitionGroup, thisNode);
     }
     return member;
@@ -233,8 +238,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void sendHeartbeat(HeartBeatRequest request,
       AsyncMethodCallback<HeartBeatResponse> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.sendHeartbeat(request, resultHandler);
     }
@@ -242,8 +247,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public void startElection(ElectionRequest request, AsyncMethodCallback<Long> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.startElection(request, resultHandler);
     }
@@ -251,8 +256,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.appendEntries(request, resultHandler);
     }
@@ -260,8 +265,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.appendEntry(request, resultHandler);
     }
@@ -269,8 +274,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.sendSnapshot(request, resultHandler);
     }
@@ -279,8 +284,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void pullSnapshot(PullSnapshotRequest request,
       AsyncMethodCallback<PullSnapshotResp> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.pullSnapshot(request, resultHandler);
     }
@@ -289,16 +294,17 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void executeNonQueryPlan(ExecutNonQueryReq request,
       AsyncMethodCallback<TSStatus> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.executeNonQueryPlan(request, resultHandler);
     }
   }
 
   @Override
-  public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "Request commit index");
+  public void requestCommitIndex(Node header, int id, AsyncMethodCallback<Long> resultHandler) {
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+        "Request commit index");
     if (service != null) {
       service.requestCommitIndex(header, resultHandler);
     }
@@ -307,7 +313,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void readFile(String filePath, long offset, int length,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(thisNode, resultHandler,
+    DataAsyncService service = getDataAsyncService(thisNode, 0, resultHandler,
         "Read file:" + filePath);
     if (service != null) {
       service.readFile(filePath, offset, length, resultHandler);
@@ -317,7 +323,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void querySingleSeries(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler,
         "Query series:" + request.getPath());
     if (service != null) {
       service.querySingleSeries(request, resultHandler);
@@ -325,9 +332,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void fetchSingleSeries(Node header, long readerId,
+  public void fetchSingleSeries(Node header, int id, long readerId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
         "Fetch reader:" + readerId);
     if (service != null) {
       service.fetchSingleSeries(header, readerId, resultHandler);
@@ -335,18 +342,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void getAllPaths(Node header, List<String> paths, boolean withAlias,
+  public void getAllPaths(Node header, int id, List<String> paths, boolean withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "Find path:" + paths);
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Find path:" + paths);
     if (service != null) {
       service.getAllPaths(header, paths, withAlias, resultHandler);
     }
   }
 
   @Override
-  public void endQuery(Node header, Node thisNode, long queryId,
+  public void endQuery(Node header, int id, Node thisNode, long queryId,
       AsyncMethodCallback<Void> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "End query");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "End query");
     if (service != null) {
       service.endQuery(header, thisNode, queryId, resultHandler);
     }
@@ -355,7 +362,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler,
         "Query by timestamp:" + request.getQueryId() + "#" + request.getPath() + " of " + request
             .getRequester());
     if (service != null) {
@@ -364,9 +372,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(Node header, long readerId, long time,
+  public void fetchSingleSeriesByTimestamp(Node header, int id, long readerId, long time,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
         "Fetch by timestamp:" + readerId);
     if (service != null) {
       service.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler);
@@ -376,8 +384,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void pullTimeSeriesSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.pullTimeSeriesSchema(request, resultHandler);
     }
@@ -386,37 +394,38 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void pullMeasurementSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler,
         "Pull measurement schema");
     service.pullMeasurementSchema(request, resultHandler);
   }
 
   @Override
-  public void getAllDevices(Node header, List<String> paths,
+  public void getAllDevices(Node header, int id, List<String> paths,
       AsyncMethodCallback<Set<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "Get all devices");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get all devices");
     service.getAllDevices(header, paths, resultHandler);
   }
 
   @Override
-  public void getNodeList(Node header, String path, int nodeLevel,
+  public void getNodeList(Node header, int id, String path, int nodeLevel,
       AsyncMethodCallback<List<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "Get node list");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get node list");
     service.getNodeList(header, path, nodeLevel, resultHandler);
   }
 
   @Override
-  public void getChildNodePathInNextLevel(Node header, String path,
+  public void getChildNodePathInNextLevel(Node header, int id, String path,
       AsyncMethodCallback<Set<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
         "Get child node path in next level");
     service.getChildNodePathInNextLevel(header, path, resultHandler);
   }
 
   @Override
-  public void getAllMeasurementSchema(Node header, ByteBuffer planBytes,
+  public void getAllMeasurementSchema(Node header, int id, ByteBuffer planBytes,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
         "Get all measurement schema");
     service.getAllMeasurementSchema(header, planBytes, resultHandler);
   }
@@ -424,28 +433,30 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void getAggrResult(GetAggrResultRequest request,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     service.getAggrResult(request, resultHandler);
   }
 
   @Override
-  public void getUnregisteredTimeseries(Node header, List<String> timeseriesList,
+  public void getUnregisteredTimeseries(Node header, int id, List<String> timeseriesList,
       AsyncMethodCallback<List<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
         "Check if measurements are registered");
     service.getUnregisteredTimeseries(header, timeseriesList, resultHandler);
   }
 
   @Override
   public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     service.getGroupByExecutor(request, resultHandler);
   }
 
   @Override
-  public void getGroupByResult(Node header, long executorId, long startTime, long endTime,
+  public void getGroupByResult(Node header, int id, long executorId, long startTime, long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "Fetch group by");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Fetch group by");
     service.getGroupByResult(header, executorId, startTime, endTime, resultHandler);
   }
 
@@ -488,37 +499,37 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    * @param result
    */
   public void addNode(Node node, NodeAdditionResult result) {
-    Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
-    synchronized (headerGroupMap) {
-      while (entryIterator.hasNext()) {
-        Entry<Node, DataGroupMember> entry = entryIterator.next();
-        DataGroupMember dataGroupMember = entry.getValue();
-        // the member may be extruded from the group, remove and stop it if so
-        boolean shouldLeave = dataGroupMember.addNode(node, result);
-        if (shouldLeave) {
-          logger.info("This node does not belong to {} any more", dataGroupMember.getAllNodes());
-          entryIterator.remove();
-          removeMember(entry.getKey(), entry.getValue());
-        }
-      }
-
-      if (result.getNewGroup().contains(thisNode)) {
-        logger.info("Adding this node into a new group {}", result.getNewGroup());
-        DataGroupMember dataGroupMember = dataMemberFactory.create(result.getNewGroup(), thisNode);
-        addDataGroupMember(dataGroupMember);
-        dataGroupMember.start();
-        dataGroupMember
-            .pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node),
-                node);
-      }
-    }
+//    Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
+//    synchronized (headerGroupMap) {
+//      while (entryIterator.hasNext()) {
+//        Entry<Node, DataGroupMember> entry = entryIterator.next();
+//        DataGroupMember dataGroupMember = entry.getValue();
+//        // the member may be extruded from the group, remove and stop it if so
+//        boolean shouldLeave = dataGroupMember.addNode(node, result);
+//        if (shouldLeave) {
+//          logger.info("This node does not belong to {} any more", dataGroupMember.getAllNodes());
+//          entryIterator.remove();
+//          removeMember(entry.getKey(), entry.getValue());
+//        }
+//      }
+//
+//      if (result.getNewGroup().contains(thisNode)) {
+//        logger.info("Adding this node into a new group {}", result.getNewGroup());
+//        DataGroupMember dataGroupMember = dataMemberFactory.create(result.getNewGroup(), thisNode);
+//        addDataGroupMember(dataGroupMember);
+//        dataGroupMember.start();
+//        dataGroupMember
+//            .pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node),
+//                node);
+//      }
+//    }
   }
 
   private void removeMember(Node header, DataGroupMember dataGroupMember) {
-    dataGroupMember.syncLeader();
-    dataGroupMember.setReadOnly();
-    dataGroupMember.stop();
-    stoppedMemberManager.put(header, dataGroupMember);
+//    dataGroupMember.syncLeader();
+//    dataGroupMember.setReadOnly();
+//    dataGroupMember.stop();
+//    stoppedMemberManager.put(header, dataGroupMember);
   }
 
   /**
@@ -574,43 +585,43 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    * @param removalResult cluster changes due to the node removal
    */
   public void removeNode(Node node, NodeRemovalResult removalResult) {
-    Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
-    synchronized (headerGroupMap) {
-      while (entryIterator.hasNext()) {
-        Entry<Node, DataGroupMember> entry = entryIterator.next();
-        DataGroupMember dataGroupMember = entry.getValue();
-        if (dataGroupMember.getHeader().equals(node)) {
-          // the group is removed as the node is removed, so new writes should be rejected as
-          // they belong to the new holder, but the member is kept alive for other nodes to pull
-          // snapshots
-          entryIterator.remove();
-          removeMember(entry.getKey(), entry.getValue());
-        } else {
-          if (node.equals(thisNode)) {
-            // this node is removed, it is no more replica of other groups
-            List<Integer> nodeSlots =
-                ((SlotPartitionTable) partitionTable).getNodeSlots(dataGroupMember.getHeader());
-            dataGroupMember.removeLocalData(nodeSlots);
-            entryIterator.remove();
-            dataGroupMember.stop();
-          } else {
-            // the group should be updated and pull new slots from the removed node
-            dataGroupMember.removeNode(node, removalResult);
-          }
-        }
-      }
-      PartitionGroup newGroup = removalResult.getNewGroup();
-      if (newGroup != null) {
-        logger.info("{} should join a new group {}", thisNode, newGroup);
-        try {
-          createNewMember(newGroup.getHeader());
-        } catch (NotInSameGroupException e) {
-          // ignored
-        } catch (CheckConsistencyException ce) {
-          logger.error("remove node failed, error={}", ce.getMessage());
-        }
-      }
-    }
+//    Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
+//    synchronized (headerGroupMap) {
+//      while (entryIterator.hasNext()) {
+//        Entry<Node, DataGroupMember> entry = entryIterator.next();
+//        DataGroupMember dataGroupMember = entry.getValue();
+//        if (dataGroupMember.getHeader().equals(node)) {
+//          // the group is removed as the node is removed, so new writes should be rejected as
+//          // they belong to the new holder, but the member is kept alive for other nodes to pull
+//          // snapshots
+//          entryIterator.remove();
+//          removeMember(entry.getKey(), entry.getValue());
+//        } else {
+//          if (node.equals(thisNode)) {
+//            // this node is removed, it is no more replica of other groups
+//            List<Integer> nodeSlots =
+//                ((SlotPartitionTable) partitionTable).getNodeSlots(dataGroupMember.getHeader());
+//            dataGroupMember.removeLocalData(nodeSlots);
+//            entryIterator.remove();
+//            dataGroupMember.stop();
+//          } else {
+//            // the group should be updated and pull new slots from the removed node
+//            dataGroupMember.removeNode(node, removalResult);
+//          }
+//        }
+//      }
+//      PartitionGroup newGroup = removalResult.getNewGroup();
+//      if (newGroup != null) {
+//        logger.info("{} should join a new group {}", thisNode, newGroup);
+//        try {
+//          createNewMember(newGroup.getHeader());
+//        } catch (NotInSameGroupException e) {
+//          // ignored
+//        } catch (CheckConsistencyException ce) {
+//          logger.error("remove node failed, error={}", ce.getMessage());
+//        }
+//      }
+//    }
   }
 
   public void setPartitionTable(PartitionTable partitionTable) {
@@ -642,7 +653,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void previousFill(PreviousFillRequest request,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, request);
     service.previousFill(request, resultHandler);
   }
 
@@ -653,208 +665,226 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void matchTerm(long index, long term, Node header,
+  public void matchTerm(long index, long term, Node header, int id,
       AsyncMethodCallback<Boolean> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "Match term");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Match term");
     service.matchTerm(index, term, header, resultHandler);
   }
 
   @Override
   public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, "last");
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler, "last");
     service.last(request, resultHandler);
   }
 
   @Override
-  public void getPathCount(Node header, List<String> pathsToQuery, int level,
+  public void getPathCount(Node header, int id, List<String> pathsToQuery, int level,
       AsyncMethodCallback<Integer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "count path");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "count path");
     service.getPathCount(header, pathsToQuery, level, resultHandler);
   }
 
   @Override
-  public void onSnapshotApplied(Node header, List<Integer> slots,
+  public void onSnapshotApplied(Node header, int id, List<Integer> slots,
       AsyncMethodCallback<Boolean> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "Snapshot applied");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Snapshot applied");
     service.onSnapshotApplied(header, slots, resultHandler);
   }
 
   @Override
   public long querySingleSeries(SingleSeriesQueryRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).querySingleSeries(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).querySingleSeries(request);
   }
 
   @Override
-  public ByteBuffer fetchSingleSeries(Node header, long readerId) throws TException {
-    return getDataSyncService(header).fetchSingleSeries(header, readerId);
+  public ByteBuffer fetchSingleSeries(Node header, int raftId, long readerId) throws TException {
+    return getDataSyncService(header, raftId).fetchSingleSeries(header, readerId);
   }
 
   @Override
   public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).querySingleSeriesByTimestamp(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId())
+        .querySingleSeriesByTimestamp(request);
   }
 
   @Override
-  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
+  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId,
+      long timestamp)
       throws TException {
-    return getDataSyncService(header).fetchSingleSeriesByTimestamp(header, readerId, timestamp);
+    return getDataSyncService(header, raftId)
+        .fetchSingleSeriesByTimestamp(header, readerId, timestamp);
   }
 
   @Override
-  public void endQuery(Node header, Node thisNode, long queryId) throws TException {
-    getDataSyncService(header).endQuery(header, thisNode, queryId);
+  public void endQuery(Node header, int raftId, Node thisNode, long queryId) throws TException {
+    getDataSyncService(header, raftId).endQuery(header, thisNode, queryId);
   }
 
   @Override
-  public GetAllPathsResult getAllPaths(Node header, List<String> path, boolean withAlias)
+  public GetAllPathsResult getAllPaths(Node header, int raftId, List<String> path,
+      boolean withAlias)
       throws TException {
-    return getDataSyncService(header).getAllPaths(header, path, withAlias);
+    return getDataSyncService(header, raftId).getAllPaths(header, path, withAlias);
   }
 
   @Override
-  public Set<String> getAllDevices(Node header, List<String> path) throws TException {
-    return getDataSyncService(header).getAllDevices(header, path);
+  public Set<String> getAllDevices(Node header, int raftId, List<String> path) throws TException {
+    return getDataSyncService(header, raftId).getAllDevices(header, path);
   }
 
   @Override
-  public List<String> getNodeList(Node header, String path, int nodeLevel) throws TException {
-    return getDataSyncService(header).getNodeList(header, path, nodeLevel);
+  public List<String> getNodeList(Node header, int raftId, String path, int nodeLevel)
+      throws TException {
+    return getDataSyncService(header, raftId).getNodeList(header, path, nodeLevel);
   }
 
   @Override
-  public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException {
-    return getDataSyncService(header).getChildNodePathInNextLevel(header, path);
+  public Set<String> getChildNodePathInNextLevel(Node header, int raftId, String path)
+      throws TException {
+    return getDataSyncService(header, raftId).getChildNodePathInNextLevel(header, path);
   }
 
   @Override
-  public ByteBuffer getAllMeasurementSchema(Node header, ByteBuffer planBinary) throws TException {
-    return getDataSyncService(header).getAllMeasurementSchema(header, planBinary);
+  public ByteBuffer getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary)
+      throws TException {
+    return getDataSyncService(header, raftId).getAllMeasurementSchema(header, planBinary);
   }
 
   @Override
   public List<ByteBuffer> getAggrResult(GetAggrResultRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).getAggrResult(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).getAggrResult(request);
   }
 
   @Override
-  public List<String> getUnregisteredTimeseries(Node header, List<String> timeseriesList)
+  public List<String> getUnregisteredTimeseries(Node header, int raftId,
+      List<String> timeseriesList)
       throws TException {
-    return getDataSyncService(header).getUnregisteredTimeseries(header, timeseriesList);
+    return getDataSyncService(header, raftId).getUnregisteredTimeseries(header, timeseriesList);
   }
 
   @Override
   public PullSnapshotResp pullSnapshot(PullSnapshotRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).pullSnapshot(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).pullSnapshot(request);
   }
 
   @Override
   public long getGroupByExecutor(GroupByRequest request) throws TException {
-    return getDataSyncService(request.header).getGroupByExecutor(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).getGroupByExecutor(request);
   }
 
   @Override
-  public List<ByteBuffer> getGroupByResult(Node header, long executorId, long startTime,
+  public List<ByteBuffer> getGroupByResult(Node header, int raftId, long executorId, long startTime,
       long endTime) throws TException {
-    return getDataSyncService(header).getGroupByResult(header, executorId, startTime, endTime);
+    return getDataSyncService(header, raftId)
+        .getGroupByResult(header, executorId, startTime, endTime);
   }
 
   @Override
   public PullSchemaResp pullTimeSeriesSchema(PullSchemaRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).pullTimeSeriesSchema(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId())
+        .pullTimeSeriesSchema(request);
   }
 
   @Override
   public PullSchemaResp pullMeasurementSchema(PullSchemaRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).pullMeasurementSchema(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId())
+        .pullMeasurementSchema(request);
   }
 
   @Override
   public ByteBuffer previousFill(PreviousFillRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).previousFill(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).previousFill(request);
   }
 
   @Override
   public ByteBuffer last(LastQueryRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).last(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).last(request);
   }
 
   @Override
-  public int getPathCount(Node header, List<String> pathsToQuery, int level) throws TException {
-    return getDataSyncService(header).getPathCount(header, pathsToQuery, level);
+  public int getPathCount(Node header, int raftId, List<String> pathsToQuery, int level)
+      throws TException {
+    return getDataSyncService(header, raftId).getPathCount(header, pathsToQuery, level);
   }
 
   @Override
-  public boolean onSnapshotApplied(Node header, List<Integer> slots) {
-    return getDataSyncService(header).onSnapshotApplied(header, slots);
+  public boolean onSnapshotApplied(Node header, int raftId, List<Integer> slots) {
+    return getDataSyncService(header, raftId).onSnapshotApplied(header, slots);
   }
 
   @Override
   public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
-    return getDataSyncService(request.getHeader()).sendHeartbeat(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).sendHeartbeat(request);
   }
 
   @Override
   public long startElection(ElectionRequest request) {
-    return getDataSyncService(request.getHeader()).startElection(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).startElection(request);
   }
 
   @Override
   public long appendEntries(AppendEntriesRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).appendEntries(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).appendEntries(request);
   }
 
   @Override
   public long appendEntry(AppendEntryRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).appendEntry(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).appendEntry(request);
   }
 
   @Override
   public void sendSnapshot(SendSnapshotRequest request) throws TException {
-    getDataSyncService(request.getHeader()).sendSnapshot(request);
+    getDataSyncService(request.getHeader(), request.getRaftId()).sendSnapshot(request);
   }
 
   @Override
   public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
-    return getDataSyncService(request.getHeader()).executeNonQueryPlan(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId())
+        .executeNonQueryPlan(request);
   }
 
   @Override
-  public long requestCommitIndex(Node header) throws TException {
-    return getDataSyncService(header).requestCommitIndex(header);
+  public long requestCommitIndex(Node header, int raftId) throws TException {
+    return getDataSyncService(header, raftId).requestCommitIndex(header);
   }
 
   @Override
   public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
-    return getDataSyncService(thisNode).readFile(filePath, offset, length);
+    return getDataSyncService(thisNode, 0).readFile(filePath, offset, length);
   }
 
   @Override
-  public boolean matchTerm(long index, long term, Node header) {
-    return getDataSyncService(header).matchTerm(index, term, header);
+  public boolean matchTerm(long index, long term, Node header, int raftId) {
+    return getDataSyncService(header, raftId).matchTerm(index, term, header);
   }
 
   @Override
-  public ByteBuffer peekNextNotNullValue(Node header, long executorId, long startTime, long endTime)
+  public ByteBuffer peekNextNotNullValue(Node header, int raftId, long executorId, long startTime,
+      long endTime)
       throws TException {
-    return getDataSyncService(header).peekNextNotNullValue(header, executorId, startTime, endTime);
+    return getDataSyncService(header, raftId)
+        .peekNextNotNullValue(header, executorId, startTime, endTime);
   }
 
   @Override
-  public void peekNextNotNullValue(Node header, long executorId, long startTime, long endTime,
+  public void peekNextNotNullValue(Node header, int raftId, long executorId, long startTime,
+      long endTime,
       AsyncMethodCallback<ByteBuffer> resultHandler) throws TException {
     resultHandler.onComplete(
-        getDataSyncService(header).peekNextNotNullValue(header, executorId, startTime, endTime));
+        getDataSyncService(header, raftId)
+            .peekNextNotNullValue(header, executorId, startTime, endTime));
   }
 
   @Override
   public void removeHardLink(String hardLinkPath) throws TException {
-    getDataSyncService(thisNode).removeHardLink(hardLinkPath);
+    getDataSyncService(thisNode, 0).removeHardLink(hardLinkPath);
   }
 
   @Override
   public void removeHardLink(String hardLinkPath,
       AsyncMethodCallback<Void> resultHandler) {
-    getDataAsyncService(thisNode, resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
+    getDataAsyncService(thisNode, 0, resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
         resultHandler);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
index 6fd2059..b203aaf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
@@ -33,16 +33,17 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * When a node is added or removed, several partition groups are affected and nodes may exit some
- * groups. For example, the local node is #5 and it is in a data group of [1, 3, 5], then node #3
- * is added, so the group becomes [1, 3, 4] and the local node must leave the group. However, #5
- * may have data that #4 needs to pull, so the Member of #5 in this group is stopped but not
- * removed yet and when system recovers, we need to resume the groups so that they can keep
- * providing snapshots for data transfers.
+ * groups. For example, the local node is #5 and it is in a data group of [1, 3, 5], then node #3 is
+ * added, so the group becomes [1, 3, 4] and the local node must leave the group. However, #5 may
+ * have data that #4 needs to pull, so the Member of #5 in this group is stopped but not removed yet
+ * and when system recovers, we need to resume the groups so that they can keep providing snapshots
+ * for data transfers.
  */
 public class StoppedMemberManager {
 
@@ -53,7 +54,7 @@ public class StoppedMemberManager {
   private static final String REMOVED = "0";
   private static final String RESUMED = "1";
 
-  private Map<Node, DataGroupMember> removedMemberMap = new HashMap<>();
+  private Map<Pair<Node, Integer>, DataGroupMember> removedMemberMap = new HashMap<>();
   private DataGroupMember.Factory memberFactory;
   private Node thisNode;
 
@@ -65,13 +66,14 @@ public class StoppedMemberManager {
   }
 
   /**
-   * When a DataGroupMember is removed, add it here and record this removal, so in next start-up
-   * we can recover it as a data source for data transfers.
-   * @param header
+   * When a DataGroupMember is removed, add it here and record this removal, so in next start-up we
+   * can recover it as a data source for data transfers.
+   *
+   * @param pair
    * @param dataGroupMember
    */
-  public synchronized void put(Node header, DataGroupMember dataGroupMember) {
-    removedMemberMap.put(header, dataGroupMember);
+  public synchronized void put(Pair<Node, Integer> pair, DataGroupMember dataGroupMember) {
+    removedMemberMap.put(pair, dataGroupMember);
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(stoppedMembersFileName, true))) {
       StringBuilder builder = new StringBuilder(REMOVED);
       for (Node node : dataGroupMember.getAllNodes()) {
@@ -80,27 +82,28 @@ public class StoppedMemberManager {
       writer.write(builder.toString());
       writer.newLine();
     } catch (IOException e) {
-      logger.error("Cannot record removed member of header {}", header, e);
+      logger.error("Cannot record removed member of header {}", pair, e);
     }
   }
 
   /**
-   * When a DataGroupMember is resumed, add it here and record this removal, so in next start-up
-   * we will not recover it here.
-   * @param header
+   * When a DataGroupMember is resumed, add it here and record this removal, so in next start-up we
+   * will not recover it here.
+   *
+   * @param pair
    */
-  public synchronized void remove(Node header) {
-    removedMemberMap.remove(header);
+  public synchronized void remove(Pair<Node, Integer> pair) {
+    removedMemberMap.remove(pair);
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(stoppedMembersFileName, true))) {
-      writer.write(RESUMED + ";" + header.toString());
+      writer.write(RESUMED + ";" + pair.toString());
       writer.newLine();
     } catch (IOException e) {
-      logger.error("Cannot record resumed member of header {}", header, e);
+      logger.error("Cannot record resumed member of header {}", pair, e);
     }
   }
 
-  public synchronized DataGroupMember get(Node header) {
-    return removedMemberMap.get(header);
+  public synchronized DataGroupMember get(Pair<Node, Integer> pair) {
+    return removedMemberMap.get(pair);
   }
 
   private void recover() {
@@ -143,7 +146,8 @@ public class StoppedMemberManager {
     }
     DataGroupMember member = memberFactory.create(partitionGroup, thisNode);
     member.setReadOnly();
-    removedMemberMap.put(partitionGroup.getHeader(), member);
+    //TODO CORRECT
+    removedMemberMap.put(new Pair(partitionGroup.getHeader(), 0), member);
   }
 
   private void parseResumed(String[] split) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index de200cf..62f2b71 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -143,8 +143,8 @@ public class DataGroupMember extends RaftMember {
   private LocalQueryExecutor localQueryExecutor;
 
   /**
-   * When a new partition table is installed, all data members will be checked if unchanged. If
-   * not, such members will be removed.
+   * When a new partition table is installed, all data members will be checked if unchanged. If not,
+   * such members will be removed.
    */
   private boolean unchanged;
 
@@ -237,6 +237,10 @@ public class DataGroupMember extends RaftMember {
     return allNodes.get(0);
   }
 
+  public Integer getRaftGroupId() {
+    return allNodes.getId();
+  }
+
   public ClusterQueryManager getQueryManager() {
     return queryManager;
   }
@@ -385,7 +389,8 @@ public class DataGroupMember extends RaftMember {
   public void receiveSnapshot(SendSnapshotRequest request) throws SnapshotInstallationException {
     logger.info("{}: received a snapshot from {} with size {}", name, request.getHeader(),
         request.getSnapshotBytes().length);
-    PartitionedSnapshot<FileSnapshot> snapshot = new PartitionedSnapshot<>(FileSnapshot.Factory.INSTANCE);
+    PartitionedSnapshot<FileSnapshot> snapshot = new PartitionedSnapshot<>(
+        FileSnapshot.Factory.INSTANCE);
 
     snapshot.deserialize(ByteBuffer.wrap(request.getSnapshotBytes()));
     if (logger.isDebugEnabled()) {
@@ -485,7 +490,8 @@ public class DataGroupMember extends RaftMember {
         Node node = entry.getKey();
         List<Integer> nodeSlots = entry.getValue();
         PullSnapshotTaskDescriptor taskDescriptor =
-            new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable().getHeaderGroup(node),
+            new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable()
+                .getHeaderGroup(new Pair<>(node, getRaftGroupId())),
                 nodeSlots, false);
         pullFileSnapshot(taskDescriptor, null);
       }
@@ -522,8 +528,9 @@ public class DataGroupMember extends RaftMember {
           descriptor.getSlots().get(0), descriptor.getSlots().size() - 1);
     }
 
-    pullSnapshotService.submit(new PullSnapshotTask<>(descriptor, this, FileSnapshot.Factory.INSTANCE,
-        snapshotSave));
+    pullSnapshotService
+        .submit(new PullSnapshotTask<>(descriptor, this, FileSnapshot.Factory.INSTANCE,
+            snapshotSave));
   }
 
   /**
@@ -619,9 +626,9 @@ public class DataGroupMember extends RaftMember {
       List<Pair<Long, Boolean>> tmpPairList = entry.getValue();
       for (Pair<Long, Boolean> pair : tmpPairList) {
         long partitionId = pair.left;
-        Node header = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
+        Pair<Node, Integer> pair = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
             partitionId * StorageEngine.getTimePartitionInterval());
-        DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+        DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(pair);
         if (localDataMember.getHeader().equals(this.getHeader())) {
           localListPair.add(new Pair<>(partitionId, pair.right));
         }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 863314f..9f2fbb6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -147,6 +147,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransportException;
@@ -273,7 +274,7 @@ public class MetaGroupMember extends RaftMember {
         new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
         new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory), false),
         new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
-    allNodes = new ArrayList<>();
+    allNodes = new PartitionGroup();
     initPeerMap();
 
     dataClientProvider = new DataClientProvider(factory);
@@ -307,9 +308,9 @@ public class MetaGroupMember extends RaftMember {
    * @return true if the member is a leader and the partition is closed, false otherwise
    */
   public void closePartition(String storageGroupName, long partitionId, boolean isSeq) {
-    Node header = partitionTable.routeToHeaderByTime(storageGroupName,
+    Pair<Node, Integer> pair = partitionTable.routeToHeaderByTime(storageGroupName,
         partitionId * StorageEngine.getTimePartitionInterval());
-    DataGroupMember localDataMember = getLocalDataMember(header);
+    DataGroupMember localDataMember = getLocalDataMember(pair);
     if (localDataMember == null || localDataMember.getCharacter() != NodeCharacter.LEADER) {
       return;
     }
@@ -678,7 +679,7 @@ public class MetaGroupMember extends RaftMember {
   }
 
   private void updateNodeList(Collection<Node> nodes) {
-    allNodes = new ArrayList<>(nodes);
+    allNodes = new PartitionGroup(nodes);
     initPeerMap();
     logger.info("All nodes in the partition table: {}", allNodes);
     initIdNodeMap();
@@ -1730,7 +1731,7 @@ public class MetaGroupMember extends RaftMember {
       if (partitionGroup.contains(thisNode)) {
         // the query should be handled by a group the local node is in, handle it with in the group
         logger.debug("Execute {} in a local group of {}", plan, partitionGroup.getHeader());
-        status = getLocalDataMember(partitionGroup.getHeader())
+        status = getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
             .executeNonQueryPlan(plan);
       } else {
         // forward the query to the group that should handle it
@@ -2113,8 +2114,8 @@ public class MetaGroupMember extends RaftMember {
   }
 
   @Override
-  public void setAllNodes(List<Node> allNodes) {
-    super.setAllNodes(allNodes);
+  public void setAllNodes(PartitionGroup allNodes) {
+    super.setAllNodes(new PartitionGroup(allNodes));
     initPeerMap();
     idNodeMap = new HashMap<>();
     for (Node node : allNodes) {
@@ -2136,10 +2137,10 @@ public class MetaGroupMember extends RaftMember {
   /**
    * Get a local DataGroupMember that is in the group of "header" for an internal request.
    *
-   * @param header the header of the group which the local node is in
+   * @param pair the header of the group which the local node is in
    */
-  public DataGroupMember getLocalDataMember(Node header) {
-    return dataClusterServer.getDataMember(header, null, "Internal call");
+  public DataGroupMember getLocalDataMember(Pair<Node, Integer> pair) {
+    return dataClusterServer.getDataMember(pair, null, "Internal call");
   }
 
   public DataClientProvider getClientProvider() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 25201e9..f396e9c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -146,7 +147,7 @@ public abstract class RaftMember {
   /**
    * the nodes that belong to the same raft group as thisNode.
    */
-  protected List<Node> allNodes;
+  protected PartitionGroup allNodes;
   ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
   /**
    * the name of the member, to distinguish several members in the logs.
@@ -657,7 +658,7 @@ public abstract class RaftMember {
     return allNodes;
   }
 
-  public void setAllNodes(List<Node> allNodes) {
+  public void setAllNodes(PartitionGroup allNodes) {
     this.allNodes = allNodes;
   }
 
@@ -1006,7 +1007,7 @@ public abstract class RaftMember {
       return commitIdResult.get();
     }
     synchronized (commitIdResult) {
-      client.requestCommitIndex(getHeader(), new GenericHandler<>(leader.get(), commitIdResult));
+      client.requestCommitIndex(getHeader(), get,new GenericHandler<>(leader.get(), commitIdResult));
       commitIdResult.wait(RaftServer.getSyncLeaderMaxWaitMs());
     }
     return commitIdResult.get();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 113c862..521050e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -83,7 +83,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
   }
 
   @Override
-  public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
+  public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) {
     long commitIndex = member.getCommitIndex();
     if (commitIndex != Long.MIN_VALUE) {
       resultHandler.onComplete(commitIndex);
@@ -97,7 +97,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
       return;
     }
     try {
-      client.requestCommitIndex(header, resultHandler);
+      client.requestCommitIndex(header, raftId, resultHandler);
     } catch (TException e) {
       resultHandler.onError(e);
     }
@@ -125,7 +125,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
   }
 
   @Override
-  public void matchTerm(long index, long term, Node header,
+  public void matchTerm(long index, long term, Node header, int raftId,
       AsyncMethodCallback<Boolean> resultHandler) {
     resultHandler.onComplete(member.matchLog(index, term));
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index 5a07130..bffc345 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -90,7 +90,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
   }
 
   @Override
-  public long requestCommitIndex(Node header)
+  public long requestCommitIndex(Node header, int raftId)
       throws TException {
     long commitIndex = member.getCommitIndex();
     if (commitIndex != Long.MIN_VALUE) {
@@ -104,7 +104,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
     }
     long commitIndex1 = 0;
     try {
-      commitIndex1 = client.requestCommitIndex(header);
+      commitIndex1 = client.requestCommitIndex(header, raftId);
     } catch (TException e) {
       client.getInputProtocol().getTransport().close();
       throw e;
@@ -141,7 +141,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
   }
 
   @Override
-  public boolean matchTerm(long index, long term, Node header) {
+  public boolean matchTerm(long index, long term, Node header, int raftId) {
     return member.matchLog(index, term);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 3e3accf..486274e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -88,13 +88,15 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
     }
   }
 
-  private void forwardPullSnapshot(PullSnapshotRequest request, AsyncMethodCallback<PullSnapshotResp> resultHandler) {
+  private void forwardPullSnapshot(PullSnapshotRequest request,
+      AsyncMethodCallback<PullSnapshotResp> resultHandler) {
     // if this node has been set readOnly, then it must have been synchronized with the leader
     // otherwise forward the request to the leader
     if (dataGroupMember.getLeader() != null) {
       logger.debug("{} forwarding a pull snapshot request to the leader {}", name,
           dataGroupMember.getLeader());
-      AsyncDataClient client = (AsyncDataClient) dataGroupMember.getAsyncClient(dataGroupMember.getLeader());
+      AsyncDataClient client = (AsyncDataClient) dataGroupMember
+          .getAsyncClient(dataGroupMember.getLeader());
       try {
         client.pullSnapshot(request, resultHandler);
       } catch (TException e) {
@@ -109,7 +111,8 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   public void pullTimeSeriesSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
     try {
-      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().queryTimeSeriesSchema(request));
+      resultHandler
+          .onComplete(dataGroupMember.getLocalQueryExecutor().queryTimeSeriesSchema(request));
     } catch (CheckConsistencyException e) {
       // if this node cannot synchronize with the leader with in a given time, forward the
       // request to the leader
@@ -137,7 +140,8 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   public void pullMeasurementSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
     try {
-      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().queryMeasurementSchema(request));
+      resultHandler
+          .onComplete(dataGroupMember.getLocalQueryExecutor().queryMeasurementSchema(request));
     } catch (CheckConsistencyException e) {
       // if this node cannot synchronize with the leader with in a given time, forward the
       // request to the leader
@@ -170,14 +174,15 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
     try {
-      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().querySingleSeriesByTimestamp(request));
+      resultHandler.onComplete(
+          dataGroupMember.getLocalQueryExecutor().querySingleSeriesByTimestamp(request));
     } catch (Exception e) {
       resultHandler.onError(e);
     }
   }
 
   @Override
-  public void endQuery(Node header, Node requester, long queryId,
+  public void endQuery(Node header, int raftId, Node requester, long queryId,
       AsyncMethodCallback<Void> resultHandler) {
     try {
       dataGroupMember.getQueryManager().endQuery(requester, queryId);
@@ -188,7 +193,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void fetchSingleSeries(Node header, long readerId,
+  public void fetchSingleSeries(Node header, int raftId, long readerId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
       resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().fetchSingleSeries(readerId));
@@ -198,17 +203,18 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp,
+  public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long timestamp,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
-      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().fetchSingleSeriesByTimestamp(readerId, timestamp));
+      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor()
+          .fetchSingleSeriesByTimestamp(readerId, timestamp));
     } catch (ReaderNotFoundException | IOException e) {
       resultHandler.onError(e);
     }
   }
 
   @Override
-  public void getAllPaths(Node header, List<String> paths, boolean withAlias,
+  public void getAllPaths(Node header, int raftId, List<String> paths, boolean withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -219,7 +225,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void getAllDevices(Node header, List<String> path,
+  public void getAllDevices(Node header, int raftId, List<String> path,
       AsyncMethodCallback<Set<String>> resultHandler) {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -230,7 +236,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void getNodeList(Node header, String path, int nodeLevel,
+  public void getNodeList(Node header, int raftId, String path, int nodeLevel,
       AsyncMethodCallback<List<String>> resultHandler) {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -241,7 +247,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void getChildNodePathInNextLevel(Node header, String path,
+  public void getChildNodePathInNextLevel(Node header, int raftId, String path,
       AsyncMethodCallback<Set<String>> resultHandler) {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -252,10 +258,11 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void getAllMeasurementSchema(Node header, ByteBuffer planBinary,
+  public void getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
-      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary));
+      resultHandler
+          .onComplete(dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary));
     } catch (CheckConsistencyException | IOException | MetadataException e) {
       resultHandler.onError(e);
     }
@@ -272,10 +279,11 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void getUnregisteredTimeseries(Node header, List<String> timeseriesList,
+  public void getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList,
       AsyncMethodCallback<List<String>> resultHandler) {
     try {
-      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList));
+      resultHandler.onComplete(
+          dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList));
     } catch (CheckConsistencyException e) {
       resultHandler.onError(e);
     }
@@ -291,10 +299,12 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void getGroupByResult(Node header, long executorId, long startTime, long endTime,
+  public void getGroupByResult(Node header, int raftId, long executorId, long startTime,
+      long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
     try {
-      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getGroupByResult(executorId, startTime, endTime));
+      resultHandler.onComplete(
+          dataGroupMember.getLocalQueryExecutor().getGroupByResult(executorId, startTime, endTime));
     } catch (ReaderNotFoundException | IOException | QueryProcessException e) {
       resultHandler.onError(e);
     }
@@ -320,26 +330,29 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void getPathCount(Node header, List<String> pathsToQuery, int level,
+  public void getPathCount(Node header, int raftId, List<String> pathsToQuery, int level,
       AsyncMethodCallback<Integer> resultHandler) {
     try {
-      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getPathCount(pathsToQuery, level));
+      resultHandler
+          .onComplete(dataGroupMember.getLocalQueryExecutor().getPathCount(pathsToQuery, level));
     } catch (CheckConsistencyException | MetadataException e) {
       resultHandler.onError(e);
     }
   }
 
   @Override
-  public void onSnapshotApplied(Node header, List<Integer> slots,
+  public void onSnapshotApplied(Node header, int raftId, List<Integer> slots,
       AsyncMethodCallback<Boolean> resultHandler) {
     resultHandler.onComplete(dataGroupMember.onSnapshotInstalled(slots));
   }
 
   @Override
-  public void peekNextNotNullValue(Node header, long executorId, long startTime, long endTime,
+  public void peekNextNotNullValue(Node header, int raftId, long executorId, long startTime,
+      long endTime,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
-      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().peekNextNotNullValue(executorId, startTime, endTime));
+      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor()
+          .peekNextNotNullValue(executorId, startTime, endTime));
     } catch (ReaderNotFoundException | IOException e) {
       resultHandler.onError(e);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 38be676..d974bf3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -187,7 +187,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public void endQuery(Node header, Node requester, long queryId) throws TException {
+  public void endQuery(Node header, int raftId, Node requester, long queryId) throws TException {
     try {
       dataGroupMember.getQueryManager().endQuery(requester, queryId);
     } catch (StorageEngineException e) {
@@ -196,7 +196,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public ByteBuffer fetchSingleSeries(Node header, long readerId) throws TException {
+  public ByteBuffer fetchSingleSeries(Node header, int raftId, long readerId) throws TException {
     try {
       return dataGroupMember.getLocalQueryExecutor().fetchSingleSeries(readerId);
     } catch (ReaderNotFoundException | IOException e) {
@@ -205,7 +205,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
+  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId,
+      long timestamp)
       throws TException {
     try {
       return dataGroupMember.getLocalQueryExecutor()
@@ -216,7 +217,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public GetAllPathsResult getAllPaths(Node header, List<String> paths, boolean withAlias)
+  public GetAllPathsResult getAllPaths(Node header, int raftId, List<String> paths,
+      boolean withAlias)
       throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -227,7 +229,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public Set<String> getAllDevices(Node header, List<String> path) throws TException {
+  public Set<String> getAllDevices(Node header, int raftId, List<String> path) throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
       return ((CMManager) IoTDB.metaManager).getAllDevices(path);
@@ -237,7 +239,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public List<String> getNodeList(Node header, String path, int nodeLevel) throws TException {
+  public List<String> getNodeList(Node header, int raftId, String path, int nodeLevel)
+      throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
       return ((CMManager) IoTDB.metaManager).getNodeList(path, nodeLevel);
@@ -247,7 +250,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException {
+  public Set<String> getChildNodePathInNextLevel(Node header, int raftId, String path)
+      throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
       return ((CMManager) IoTDB.metaManager).getChildNodePathInNextLevel(path);
@@ -257,7 +261,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public ByteBuffer getAllMeasurementSchema(Node header, ByteBuffer planBinary) throws TException {
+  public ByteBuffer getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary)
+      throws TException {
     try {
       return dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary);
     } catch (CheckConsistencyException | IOException | MetadataException e) {
@@ -275,7 +280,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public List<String> getUnregisteredTimeseries(Node header, List<String> timeseriesList)
+  public List<String> getUnregisteredTimeseries(Node header, int raftId,
+      List<String> timeseriesList)
       throws TException {
     try {
       return dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList);
@@ -294,7 +300,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public List<ByteBuffer> getGroupByResult(Node header, long executorId, long startTime,
+  public List<ByteBuffer> getGroupByResult(Node header, int raftId, long executorId, long startTime,
       long endTime)
       throws TException {
     try {
@@ -324,7 +330,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public int getPathCount(Node header, List<String> pathsToQuery, int level) throws TException {
+  public int getPathCount(Node header, int raftId, List<String> pathsToQuery, int level)
+      throws TException {
     try {
       return dataGroupMember.getLocalQueryExecutor().getPathCount(pathsToQuery, level);
     } catch (CheckConsistencyException | MetadataException e) {
@@ -333,12 +340,13 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public boolean onSnapshotApplied(Node header, List<Integer> slots) {
+  public boolean onSnapshotApplied(Node header, int raftId, List<Integer> slots) {
     return dataGroupMember.onSnapshotInstalled(slots);
   }
 
   @Override
-  public ByteBuffer peekNextNotNullValue(Node header, long executorId, long startTime, long endTime)
+  public ByteBuffer peekNextNotNullValue(Node header, int raftId, long executorId, long startTime,
+      long endTime)
       throws TException {
     try {
       return dataGroupMember.getLocalQueryExecutor()
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 6c86a1f..56f1dd4 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -41,6 +41,7 @@ struct HeartBeatRequest {
   // because a data server may play many data groups members, this is used to identify which
   // member should process the request or response. Only used in data group communication.
   8: optional Node header
+  9: optional int raftId
 }
 
 // follower -> leader
@@ -56,6 +57,7 @@ struct HeartBeatResponse {
   // because a data server may play many data groups members, this is used to identify which
   // member should process the request or response. Only used in data group communication.
   7: optional Node header
+  8: optional int raftId
 }
 
 // node -> node
@@ -68,8 +70,9 @@ struct ElectionRequest {
   // because a data server may play many data groups members, this is used to identify which
   // member should process the request or response. Only used in data group communication.
   5: optional Node header
-  6: optional long dataLogLastIndex
-  7: optional long dataLogLastTerm
+  6: optional int raftId
+  7: optional long dataLogLastIndex
+  8: optional long dataLogLastTerm
 }
 
 // leader -> follower
@@ -84,6 +87,7 @@ struct AppendEntryRequest {
   // because a data server may play many data groups members, this is used to identify which
   // member should process the request or response. Only used in data group communication.
   7: optional Node header
+  8: optional int raftId
 }
 
 // leader -> follower
@@ -98,6 +102,7 @@ struct AppendEntriesRequest {
   // because a data server may play many data groups members, this is used to identify which
   // member should process the request or response. Only used in data group communication.
   7: optional Node header
+  8: optional int raftId
 }
 
 struct AddNodeResponse {
@@ -138,16 +143,18 @@ struct SendSnapshotRequest {
   1: required binary snapshotBytes
   // for data group
   2: optional Node header
+  3: optional int raftId
 }
 
 struct PullSnapshotRequest {
   1: required list<int> requiredSlots
   // for data group
   2: optional Node header
+  3: optional int raftId
   // set to true if the previous holder has been removed from the cluster.
   // This will make the previous holder read-only so that different new
   // replicas can pull the same snapshot.
-  3: required bool requireReadOnly
+  4: required bool requireReadOnly
 }
 
 struct PullSnapshotResp {
@@ -157,11 +164,13 @@ struct PullSnapshotResp {
 struct ExecutNonQueryReq {
   1: required binary planBytes
   2: optional Node header
+  3: optional int raftId
 }
 
 struct PullSchemaRequest {
   1: required list<string> prefixPaths
   2: optional Node header
+  3: optional int raftId
 }
 
 struct PullSchemaResp {
@@ -175,11 +184,12 @@ struct SingleSeriesQueryRequest {
   4: required long queryId
   5: required Node requester
   6: required Node header
-  7: required int dataTypeOrdinal
-  8: required set<string> deviceMeasurements
-  9: required bool ascending
-  10: required int fetchSize
-  11: required int deduplicatedPathNum
+  7: required int raftId
+  8: required int dataTypeOrdinal
+  9: required set<string> deviceMeasurements
+  10: required bool ascending
+  11: required int fetchSize
+  12: required int deduplicatedPathNum
 }
 
 struct PreviousFillRequest {
@@ -189,8 +199,9 @@ struct PreviousFillRequest {
   4: required long queryId
   5: required Node requester
   6: required Node header
-  7: required int dataTypeOrdinal
-  8: required set<string> deviceMeasurements
+  7: required int raftId
+  8: required int dataTypeOrdinal
+  9: required set<string> deviceMeasurements
 }
 
 // the spec and load of a node, for query coordinating
@@ -204,10 +215,11 @@ struct GetAggrResultRequest {
   3: required int dataTypeOrdinal
   4: optional binary timeFilterBytes
   5: required Node header
-  6: required long queryId
-  7: required Node requestor
-  8: required set<string> deviceMeasurements
-  9: required bool ascending
+  6: required int raftId
+  7: required long queryId
+  8: required Node requestor
+  9: required set<string> deviceMeasurements
+  10: required bool ascending
 }
 
 struct GroupByRequest {
@@ -217,9 +229,10 @@ struct GroupByRequest {
   4: required long queryId
   5: required list<int> aggregationTypeOrdinals
   6: required Node header
-  7: required Node requestor
-  8: required set<string> deviceMeasurements
-  9: required bool ascending
+  7: required int raftId
+  8: required Node requestor
+  9: required set<string> deviceMeasurements
+  10: required bool ascending
 }
 
 struct LastQueryRequest {
@@ -229,7 +242,8 @@ struct LastQueryRequest {
   4: required map<string, set<string>> deviceMeasurements
   5: optional binary filterBytes
   6: required Node header
-  7: required Node requestor
+  7: required int raftId
+  8: required Node requestor
 }
 
 struct GetAllPathsResult {
@@ -293,7 +307,7 @@ service RaftService {
   * Ask the leader for its commit index, used to check whether the node has caught up with the
   * leader.
   **/
-  long requestCommitIndex(1:Node header)
+  long requestCommitIndex(1:Node header, 2:int raftId)
 
 
   /**
@@ -306,7 +320,7 @@ service RaftService {
   /**
   * Test if a log of "index" and "term" exists.
   **/
-  bool matchTerm(1:long index, 2:long term, 3:Node header)
+  bool matchTerm(1:long index, 2:long term, 3:Node header, 4:int raftId)
 
   /**
   * When a follower finds that it already has a file in a snapshot locally, it calls this
@@ -331,7 +345,7 @@ service TSDataService extends RaftService {
   * @return a ByteBuffer containing the serialized time-value pairs or an empty buffer if there
   * are not more results.
   **/
-  binary fetchSingleSeries(1:Node header, 2:long readerId)
+  binary fetchSingleSeries(1:Node header, 2:int raftId, 3:long readerId)
 
    /**
    * Query a time series and generate an IReaderByTimestamp.
@@ -345,32 +359,32 @@ service TSDataService extends RaftService {
    * @return a ByteBuffer containing the serialized value or an empty buffer if there
    * are not more results.
    **/
-   binary fetchSingleSeriesByTimestamp(1:Node header, 2:long readerId, 3:long timestamp)
+   binary fetchSingleSeriesByTimestamp(1:Node header, 2:int raftId, 3:long readerId, 4:long timestamp)
 
   /**
   * Find the local query established for the remote query and release all its resource.
   **/
-  void endQuery(1:Node header, 2:Node thisNode, 3:long queryId)
+  void endQuery(1:Node header, 2:int raftId, 3:Node thisNode, 4:long queryId)
 
   /**
   * Given path patterns (paths with wildcard), return all paths they match.
   **/
-  GetAllPathsResult getAllPaths(1:Node header, 2:list<string> path, 3:bool withAlias)
+  GetAllPathsResult getAllPaths(1:Node header, 2:int raftId, 3:list<string> path, 4:bool withAlias)
 
   /**
    * Given path patterns (paths with wildcard), return all devices they match.
    **/
-  set<string> getAllDevices(1:Node header, 2:list<string> path)
+  set<string> getAllDevices(1:Node header, 2:int raftId, 3:list<string> path)
 
-  list<string> getNodeList(1:Node header, 2:string path, 3:int nodeLevel)
+  list<string> getNodeList(1:Node header, 2:int raftId, 3:string path, 4:int nodeLevel)
 
-  set<string> getChildNodePathInNextLevel(1: Node header, 2: string path)
+  set<string> getChildNodePathInNextLevel(1: Node header, 2:int raftId, 3: string path)
 
-  binary getAllMeasurementSchema(1: Node header, 2: binary planBinary)
+  binary getAllMeasurementSchema(1: Node header, 2:int raftId, 3: binary planBinary)
 
   list<binary> getAggrResult(1:GetAggrResultRequest request)
 
-  list<string> getUnregisteredTimeseries(1: Node header, 2: list<string> timeseriesList)
+  list<string> getUnregisteredTimeseries(1: Node header, 2:int raftId, 3: list<string> timeseriesList)
 
   PullSnapshotResp pullSnapshot(1:PullSnapshotRequest request)
 
@@ -385,7 +399,7 @@ service TSDataService extends RaftService {
   * @return the serialized AggregationResults, each is the result of one of the previously
   * required aggregations, and their orders are the same.
   **/
-  list<binary> getGroupByResult(1:Node header, 2:long executorId, 3:long startTime, 4:long endTime)
+  list<binary> getGroupByResult(1:Node header, 2:int raftId, 3:long executorId, 4:long startTime, 5:long endTime)
 
 
   /**
@@ -410,15 +424,15 @@ service TSDataService extends RaftService {
   **/
   binary last(1: LastQueryRequest request)
 
-  int getPathCount(1: Node header 2: list<string> pathsToQuery 3: int level)
+  int getPathCount(1: Node header, 2:int raftId, 3: list<string> pathsToQuery, 4: int level)
 
   /**
   * During slot transfer, when a member has pulled snapshot from a group, the member will use this
   * method to inform the group that one replica of such slots has been pulled.
   **/
-  bool onSnapshotApplied(1: Node header 2: list<int> slots)
+  bool onSnapshotApplied(1: Node header, 2:int raftId, 3: list<int> slots)
 
-  binary peekNextNotNullValue(1: Node header, 2: long executorId, 3: long startTime, 4: long
+  binary peekNextNotNullValue(1: Node header, 2:int raftId, 3: long executorId, 4: long startTime, 5: long
   endTime)
 
 }


[iotdb] 02/02: add multi-raft except for add/remove node

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch cluster_multi_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d5c075d26e216160c58a9353b7e196248356cb7f
Author: lta <li...@163.com>
AuthorDate: Tue Dec 15 15:29:32 2020 +0800

    add multi-raft except for add/remove node
---
 .../resources/conf/iotdb-cluster.properties        |   3 +
 .../cluster/client/sync/SyncClientAdaptor.java     |  49 ++--
 .../apache/iotdb/cluster/config/ClusterConfig.java |  11 +
 .../iotdb/cluster/config/ClusterDescriptor.java    |   5 +-
 .../iotdb/cluster/log/catchup/CatchUpTask.java     |  13 +-
 .../iotdb/cluster/log/catchup/LogCatchUpTask.java  |   7 +-
 .../cluster/log/catchup/SnapshotCatchUpTask.java   |   4 +-
 .../cluster/log/snapshot/PartitionedSnapshot.java  |   2 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  40 ++-
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |   2 +-
 .../iotdb/cluster/partition/PartitionGroup.java    |   1 -
 .../iotdb/cluster/partition/PartitionTable.java    |   9 +-
 .../cluster/partition/slot/SlotPartitionTable.java | 102 ++++----
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |  20 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |   4 +-
 .../iotdb/cluster/query/RemoteQueryContext.java    |   9 +-
 .../cluster/query/aggregate/ClusterAggregator.java |   4 +-
 .../cluster/query/fill/ClusterPreviousFill.java    |   4 +-
 .../query/groupby/RemoteGroupByExecutor.java       |  13 +-
 .../query/last/ClusterLastQueryExecutor.java       |   6 +-
 .../cluster/query/reader/ClusterReaderFactory.java |  22 +-
 .../iotdb/cluster/query/reader/DataSourceInfo.java |   6 +-
 .../reader/RemoteSeriesReaderByTimestamp.java      |   4 +-
 .../query/reader/RemoteSimpleSeriesReader.java     |   4 +-
 .../apache/iotdb/cluster/server/ClientServer.java  |   9 +-
 .../iotdb/cluster/server/DataClusterServer.java    | 272 ++++++++++-----------
 .../iotdb/cluster/server/MetaClusterServer.java    |  16 +-
 .../cluster/server/PullSnapshotHintService.java    |  21 +-
 .../iotdb/cluster/server/StoppedMemberManager.java |  27 +-
 .../cluster/server/member/DataGroupMember.java     |  30 +--
 .../cluster/server/member/MetaGroupMember.java     |  33 ++-
 .../iotdb/cluster/server/member/RaftMember.java    |  13 +-
 .../cluster/server/service/BaseAsyncService.java   |   1 +
 .../cluster/server/service/DataSyncService.java    |   4 +-
 .../apache/iotdb/cluster/utils/PartitionUtils.java |   3 +-
 .../cluster/utils/nodetool/ClusterMonitor.java     |  15 +-
 .../cluster/client/async/AsyncDataClientTest.java  |   2 +-
 .../cluster/client/async/AsyncMetaClientTest.java  |   2 +-
 .../cluster/client/sync/SyncClientAdaptorTest.java |  46 ++--
 .../iotdb/cluster/common/TestAsyncDataClient.java  |  16 +-
 .../iotdb/cluster/common/TestDataGroupMember.java  |   5 +-
 .../iotdb/cluster/common/TestMetaGroupMember.java  |   3 +-
 .../iotdb/cluster/log/LogDispatcherTest.java       |   3 +-
 .../cluster/log/applier/DataLogApplierTest.java    |   6 +-
 .../iotdb/cluster/log/catchup/CatchUpTaskTest.java |  16 +-
 .../cluster/log/catchup/LogCatchUpTaskTest.java    |  12 +-
 .../log/catchup/SnapshotCatchUpTaskTest.java       |  10 +-
 .../log/snapshot/PartitionedSnapshotTest.java      |   3 +-
 .../cluster/log/snapshot/PullSnapshotTaskTest.java |   1 -
 .../cluster/partition/SlotPartitionTableTest.java  |  18 +-
 .../reader/RemoteSeriesReaderByTimestampTest.java  |   2 +-
 .../query/reader/RemoteSimpleSeriesReaderTest.java |   2 +-
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |  10 +-
 .../cluster/server/member/DataGroupMemberTest.java |  31 +--
 .../iotdb/cluster/server/member/MemberTest.java    |   4 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   4 +-
 thrift/src/main/thrift/cluster.thrift              |   5 +
 57 files changed, 528 insertions(+), 461 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 640cfba..7cd8c22 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -62,6 +62,9 @@ max_concurrent_client_num=10000
 # number of replications for one partition
 default_replica_num=2
 
+# sub raft num for multi-raft
+multi_raft_factor=2
+
 # cluster name to identify different clusters
 # all node's cluster_name in one cluster are the same
 cluster_name=default
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index b4436f5..04d6ea1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -101,12 +101,12 @@ public class SyncClientAdaptor {
   }
 
   public static Boolean matchTerm(AsyncClient client, Node target, long prevLogIndex,
-      long prevLogTerm, Node header) throws TException, InterruptedException {
+      long prevLogTerm, Node header, int raftId) throws TException, InterruptedException {
     try {
       AtomicReference<Boolean> resultRef = new AtomicReference<>(null);
       GenericHandler<Boolean> matchTermHandler = new GenericHandler<>(target, resultRef);
 
-      client.matchTerm(prevLogIndex, prevLogTerm, header, matchTermHandler);
+      client.matchTerm(prevLogIndex, prevLogTerm, header, raftId, matchTermHandler);
       synchronized (resultRef) {
         if (resultRef.get() == null) {
           resultRef.wait(RaftServer.getConnectionTimeoutInMS());
@@ -157,14 +157,14 @@ public class SyncClientAdaptor {
     return result.get();
   }
 
-  public static List<String> getNodeList(AsyncDataClient client, Node header,
+  public static List<String> getNodeList(AsyncDataClient client, Node header, int raftId,
       String schemaPattern, int level) throws TException, InterruptedException {
     GetNodesListHandler handler = new GetNodesListHandler();
     AtomicReference<List<String>> response = new AtomicReference<>(null);
     handler.setResponse(response);
     handler.setContact(client.getNode());
 
-    client.getNodeList(header, schemaPattern, level, handler);
+    client.getNodeList(header, raftId, schemaPattern, level, handler);
     synchronized (response) {
       if (response.get() == null) {
         response.wait(RaftServer.getReadOperationTimeoutMS());
@@ -173,14 +173,14 @@ public class SyncClientAdaptor {
     return response.get();
   }
 
-  public static Set<String> getNextChildren(AsyncDataClient client, Node header, String path)
+  public static Set<String> getNextChildren(AsyncDataClient client, Node header, int raftId, String path)
       throws TException, InterruptedException {
     GetChildNodeNextLevelPathHandler handler = new GetChildNodeNextLevelPathHandler();
     AtomicReference<Set<String>> response = new AtomicReference<>(null);
     handler.setResponse(response);
     handler.setContact(client.getNode());
 
-    client.getChildNodePathInNextLevel(header, path, handler);
+    client.getChildNodePathInNextLevel(header, raftId, path, handler);
     synchronized (response) {
       if (response.get() == null) {
         response.wait(RaftServer.getReadOperationTimeoutMS());
@@ -190,7 +190,7 @@ public class SyncClientAdaptor {
   }
 
   public static ByteBuffer getAllMeasurementSchema(AsyncDataClient client,
-      Node header, ShowTimeSeriesPlan plan)
+      Node header, int raftId, ShowTimeSeriesPlan plan)
       throws IOException, InterruptedException, TException {
     GetTimeseriesSchemaHandler handler = new GetTimeseriesSchemaHandler();
     AtomicReference<ByteBuffer> response = new AtomicReference<>(null);
@@ -200,7 +200,7 @@ public class SyncClientAdaptor {
     DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
     plan.serialize(dataOutputStream);
 
-    client.getAllMeasurementSchema(header, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()),
+    client.getAllMeasurementSchema(header, raftId, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()),
         handler);
     synchronized (response) {
       if (response.get() == null) {
@@ -309,43 +309,43 @@ public class SyncClientAdaptor {
     return resultReference.get();
   }
 
-  public static List<String> getUnregisteredMeasurements(AsyncDataClient client, Node header,
+  public static List<String> getUnregisteredMeasurements(AsyncDataClient client, Node header, int raftId,
       List<String> seriesPaths) throws TException, InterruptedException {
     AtomicReference<List<String>> remoteResult = new AtomicReference<>();
     GenericHandler<List<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
 
-    client.getUnregisteredTimeseries(header, seriesPaths, handler);
+    client.getUnregisteredTimeseries(header, raftId, seriesPaths, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static GetAllPathsResult getAllPaths(AsyncDataClient client, Node header,
+  public static GetAllPathsResult getAllPaths(AsyncDataClient client, Node header, int raftId,
       List<String> pathsToQuery, boolean withAlias)
       throws InterruptedException, TException {
     AtomicReference<GetAllPathsResult> remoteResult = new AtomicReference<>();
     GenericHandler<GetAllPathsResult> handler = new GenericHandler<>(client.getNode(),
         remoteResult);
 
-    client.getAllPaths(header, pathsToQuery, withAlias, handler);
+    client.getAllPaths(header, raftId, pathsToQuery, withAlias, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static Integer getPathCount(AsyncDataClient client, Node header, List<String> pathsToQuery,
+  public static Integer getPathCount(AsyncDataClient client, Node header, int raftId, List<String> pathsToQuery,
       int level)
       throws InterruptedException, TException {
     AtomicReference<Integer> remoteResult = new AtomicReference<>(null);
     GenericHandler<Integer> handler = new GenericHandler<>(client.getNode(), remoteResult);
 
-    client.getPathCount(header, pathsToQuery, level, handler);
+    client.getPathCount(header, raftId, pathsToQuery, level, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static Set<String> getAllDevices(AsyncDataClient client, Node header,
+  public static Set<String> getAllDevices(AsyncDataClient client, Node header, int raftId,
       List<String> pathsToQuery)
       throws InterruptedException, TException {
     AtomicReference<Set<String>> remoteResult = new AtomicReference<>();
     GenericHandler<Set<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
 
-    client.getAllDevices(header, pathsToQuery, handler);
+    client.getAllDevices(header, raftId, pathsToQuery, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
@@ -395,23 +395,23 @@ public class SyncClientAdaptor {
     return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
   }
 
-  public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node header,
+  public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node header, int raftId,
       long executorId
       , long curStartTime, long curEndTime) throws InterruptedException, TException {
     AtomicReference<List<ByteBuffer>> fetchResult = new AtomicReference<>();
     GenericHandler<List<ByteBuffer>> handler = new GenericHandler<>(client.getNode(), fetchResult);
 
-    client.getGroupByResult(header, executorId, curStartTime, curEndTime, handler);
+    client.getGroupByResult(header, raftId, executorId, curStartTime, curEndTime, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static ByteBuffer peekNextNotNullValue(AsyncDataClient client, Node header,
+  public static ByteBuffer peekNextNotNullValue(AsyncDataClient client, Node header, int raftId,
       long executorId
       , long curStartTime, long curEndTime) throws InterruptedException, TException {
     AtomicReference<ByteBuffer> fetchResult = new AtomicReference<>();
     GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), fetchResult);
 
-    client.peekNextNotNullValue(header, executorId, curStartTime, curEndTime, handler);
+    client.peekNextNotNullValue(header, raftId, executorId, curStartTime, curEndTime, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
@@ -433,24 +433,23 @@ public class SyncClientAdaptor {
   public static ByteBuffer last(AsyncDataClient client, List<PartialPath> seriesPaths,
       List<Integer> dataTypeOrdinals, QueryContext context,
       Map<String, Set<String>> deviceMeasurements,
-      Node header)
+      Node header, int raftId)
       throws TException, InterruptedException {
     AtomicReference<ByteBuffer> result = new AtomicReference<>();
     GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), result);
     LastQueryRequest request = new LastQueryRequest(PartialPath.toStringList(seriesPaths),
-        dataTypeOrdinals,
-        context.getQueryId(), deviceMeasurements, header, client.getNode());
+        dataTypeOrdinals, context.getQueryId(), deviceMeasurements, header, raftId, client.getNode());
 
     client.last(request, handler);
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
-  public static boolean onSnapshotApplied(AsyncDataClient client, Node header, List<Integer> slots)
+  public static boolean onSnapshotApplied(AsyncDataClient client, Node header, int raftId, List<Integer> slots)
       throws TException, InterruptedException {
     AtomicReference<Boolean> result = new AtomicReference<>(false);
     GenericHandler<Boolean> handler = new GenericHandler<>(client.getNode(), result);
 
-    client.onSnapshotApplied(header, slots, handler);
+    client.onSnapshotApplied(header, raftId, slots, handler);
     return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 4d5d05b..306b081 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -46,6 +46,9 @@ public class ClusterConfig {
   private int replicationNum = 2;
 
   @ClusterConsistent
+  private int multiRaftFactor = 2;
+
+  @ClusterConsistent
   private String clusterName = "default";
 
   @ClusterConsistent
@@ -234,6 +237,14 @@ public class ClusterConfig {
     this.replicationNum = replicationNum;
   }
 
+  public int getMultiRaftFactor() {
+    return multiRaftFactor;
+  }
+
+  public void setMultiRaftFactor(int multiRaftFactor) {
+    this.multiRaftFactor = multiRaftFactor;
+  }
+
   void setClusterName(String clusterName) {
     this.clusterName = clusterName;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 3c9e8ec..d0c620b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -208,8 +208,11 @@ public class ClusterDescriptor {
     config.setMaxConcurrentClientNum(Integer.parseInt(properties.getProperty(
         "max_concurrent_client_num", String.valueOf(config.getMaxConcurrentClientNum()))));
 
+    config.setMultiRaftFactor(Integer.parseInt(properties.getProperty(
+        "multi_raft_factor", String.valueOf(config.getMultiRaftFactor()))));
+
     config.setReplicationNum(Integer.parseInt(properties.getProperty(
-        "default_replica_num", String.valueOf(config.getReplicationNum()))));
+        "", String.valueOf(config.getReplicationNum()))));
 
     config.setClusterName(properties.getProperty("cluster_name", config.getClusterName()));
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index 23687ad..e40aeba 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -55,9 +55,12 @@ public class CatchUpTask implements Runnable {
   private long lastLogIndex;
   private boolean abort;
   private String name;
+  private int raftId;
 
-  public CatchUpTask(Node node, Peer peer, RaftMember raftMember, long lastLogIdx) {
+
+  public CatchUpTask(Node node, int raftId, Peer peer, RaftMember raftMember, long lastLogIdx) {
     this.node = node;
+    this.raftId = raftId;
     this.peer = peer;
     this.raftMember = raftMember;
     this.logs = Collections.emptyList();
@@ -242,14 +245,14 @@ public class CatchUpTask implements Runnable {
         return false;
       }
       Node header = raftMember.getHeader();
-      matched = SyncClientAdaptor.matchTerm(client, node, logIndex, logTerm, header);
+      matched = SyncClientAdaptor.matchTerm(client, node, logIndex, logTerm, header, raftId);
     } else {
       Client client = raftMember.getSyncClient(node);
       if (client == null) {
         return false;
       }
       try {
-        matched = client.matchTerm(logIndex, logTerm, raftMember.getHeader());
+        matched = client.matchTerm(logIndex, logTerm, raftMember.getHeader(), raftId);
       } catch (TException e) {
         client.getInputProtocol().getTransport().close();
         throw e;
@@ -321,11 +324,11 @@ public class CatchUpTask implements Runnable {
         doSnapshot();
         // snapshot may overlap with logs
         removeSnapshotLogs();
-        SnapshotCatchUpTask task = new SnapshotCatchUpTask(logs, snapshot, node, raftMember);
+        SnapshotCatchUpTask task = new SnapshotCatchUpTask(logs, snapshot, node, raftId, raftMember);
         catchUpSucceeded = task.call();
       } else {
         logger.info("{}: performing a log catch-up to {}", raftMember.getName(), node);
-        LogCatchUpTask task = new LogCatchUpTask(logs, node, raftMember);
+        LogCatchUpTask task = new LogCatchUpTask(logs, node, raftId, raftMember);
         catchUpSucceeded = task.call();
       }
       if (catchUpSucceeded) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
index 3520ce4..8b69884 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
@@ -61,17 +61,20 @@ public class LogCatchUpTask implements Callable<Boolean> {
   private List<Log> logs;
   private boolean useBatch = ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
   boolean abort = false;
+  private int raftId;
 
-  LogCatchUpTask(List<Log> logs, Node node, RaftMember raftMember) {
+  LogCatchUpTask(List<Log> logs, Node node, int raftId, RaftMember raftMember) {
     this.logs = logs;
     this.node = node;
+    this.raftId = raftId;
     this.raftMember = raftMember;
   }
 
   @TestOnly
-  LogCatchUpTask(List<Log> logs, Node node, RaftMember raftMember, boolean useBatch) {
+  LogCatchUpTask(List<Log> logs, Node node, int raftId, RaftMember raftMember, boolean useBatch) {
     this.logs = logs;
     this.node = node;
+    this.raftId = raftId;
     this.raftMember = raftMember;
     this.useBatch = useBatch;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
index 1a858b0..548a2db 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
@@ -51,8 +51,8 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
       .getCatchUpTimeoutMS();
   private Snapshot snapshot;
 
-  SnapshotCatchUpTask(List<Log> logs, Snapshot snapshot, Node node, RaftMember raftMember) {
-    super(logs, node, raftMember);
+  SnapshotCatchUpTask(List<Log> logs, Snapshot snapshot, Node node, int raftId, RaftMember raftMember) {
+    super(logs, node, raftId, raftMember);
     this.snapshot = snapshot;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
index a0eff88..605d086 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
@@ -166,7 +166,7 @@ public class PartitionedSnapshot<T extends Snapshot> extends Snapshot {
       synchronized (dataGroupMember.getSnapshotApplyLock()) {
         List<Integer> slots =
             ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
-                .getNodeSlots(dataGroupMember.getHeader());
+                .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
         for (Integer slot : slots) {
           T subSnapshot = snapshot.getSnapshot(slot);
           if (subSnapshot != null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index ceef221..1997779 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -110,7 +110,7 @@ public class CMManager extends MManager {
 
   private static final Logger logger = LoggerFactory.getLogger(CMManager.class);
 
-  private ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
   // only cache the series who is writing, we need not to cache series who is reading
   // because the read is slow, so pull from remote is little cost comparing to the disk io
   private RemoteMetaCache mRemoteMetaCache;
@@ -578,7 +578,7 @@ public class CMManager extends MManager {
   private List<String> getUnregisteredSeriesListLocally(List<String> seriesList,
       PartitionGroup partitionGroup) throws CheckConsistencyException {
     DataGroupMember dataMember = metaGroupMember.getDataClusterServer()
-        .getDataMember(partitionGroup.getHeader(), null, null);
+        .getDataMember(partitionGroup.getHeader(), partitionGroup.getId(), null, null);
     return dataMember.getLocalQueryExecutor().getUnregisteredTimeseries(seriesList);
   }
 
@@ -591,12 +591,13 @@ public class CMManager extends MManager {
           AsyncDataClient client = metaGroupMember.getClientProvider().getAsyncDataClient(node,
               RaftServer.getReadOperationTimeoutMS());
           result = SyncClientAdaptor
-              .getUnregisteredMeasurements(client, partitionGroup.getHeader(), seriesList);
+              .getUnregisteredMeasurements(client, partitionGroup.getHeader(), partitionGroup.getId(), seriesList);
         } else {
           SyncDataClient syncDataClient =
               metaGroupMember.getClientProvider().getSyncDataClient(node,
                   RaftServer.getReadOperationTimeoutMS());
-          result = syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
+          result = syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(),
+              partitionGroup.getId(), seriesList);
           ClientUtils.putBackSyncClient(syncDataClient);
         }
 
@@ -671,7 +672,7 @@ public class CMManager extends MManager {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the node is in the target group, synchronize with leader should be enough
       metaGroupMember.getLocalDataMember(partitionGroup.getHeader(),
-          "Pull timeseries of " + prefixPaths).syncLeader();
+          partitionGroup.getId(), "Pull timeseries of " + prefixPaths).syncLeader();
       return;
     }
 
@@ -899,7 +900,7 @@ public class CMManager extends MManager {
       if (partitionGroup.contains(metaGroupMember.getThisNode())) {
         // this node is a member of the group, perform a local query after synchronizing with the
         // leader
-        metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader();
+        metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId()).syncLeader();
         List<PartialPath> allTimeseriesName = getMatchedPathsLocally(pathUnderSG, withAlias);
         logger.debug("{}: get matched paths of {} locally, result {}", metaGroupMember.getName(),
             partitionGroup, allTimeseriesName);
@@ -937,7 +938,7 @@ public class CMManager extends MManager {
     List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
     for (Node node : coordinatedNodes) {
       try {
-        List<PartialPath> paths = getMatchedPaths(node, partitionGroup.getHeader(), pathsToQuery,
+        List<PartialPath> paths = getMatchedPaths(node, partitionGroup.getHeader(), partitionGroup.getId(), pathsToQuery,
             withAlias);
         if (logger.isDebugEnabled()) {
           logger.debug("{}: get matched paths of {} and other {} paths from {} in {}, result {}",
@@ -960,19 +961,18 @@ public class CMManager extends MManager {
   }
 
   @SuppressWarnings("java:S1168") // null and empty list are different
-  private List<PartialPath> getMatchedPaths(Node node, Node header, List<String> pathsToQuery,
+  private List<PartialPath> getMatchedPaths(Node node, Node header, int raftId, List<String> pathsToQuery,
       boolean withAlias)
       throws IOException, TException, InterruptedException {
     GetAllPathsResult result;
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       AsyncDataClient client = metaGroupMember.getClientProvider().getAsyncDataClient(node,
           RaftServer.getReadOperationTimeoutMS());
-      result = SyncClientAdaptor.getAllPaths(client, header,
-          pathsToQuery, withAlias);
+      result = SyncClientAdaptor.getAllPaths(client, header, raftId, pathsToQuery, withAlias);
     } else {
       SyncDataClient syncDataClient = metaGroupMember.getClientProvider().getSyncDataClient(node,
           RaftServer.getReadOperationTimeoutMS());
-      result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
+      result = syncDataClient.getAllPaths(header, raftId, pathsToQuery, withAlias);
       ClientUtils.putBackSyncClient(syncDataClient);
     }
 
@@ -1019,7 +1019,7 @@ public class CMManager extends MManager {
       if (partitionGroup.contains(metaGroupMember.getThisNode())) {
         // this node is a member of the group, perform a local query after synchronizing with the
         // leader
-        metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader();
+        metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId()).syncLeader();
         Set<PartialPath> allDevices = getDevices(pathUnderSG);
         logger.debug("{}: get matched paths of {} locally, result {}", metaGroupMember.getName(),
             partitionGroup,
@@ -1050,7 +1050,7 @@ public class CMManager extends MManager {
     List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
     for (Node node : coordinatedNodes) {
       try {
-        Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
+        Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), partitionGroup.getId(), pathsToQuery);
         logger.debug("{}: get matched paths of {} from {}, result {}", metaGroupMember.getName(),
             partitionGroup,
             node, paths);
@@ -1073,18 +1073,17 @@ public class CMManager extends MManager {
     return Collections.emptySet();
   }
 
-  private Set<String> getMatchedDevices(Node node, Node header, List<String> pathsToQuery)
+  private Set<String> getMatchedDevices(Node node, Node header, int raftId, List<String> pathsToQuery)
       throws IOException, TException, InterruptedException {
     Set<String> paths;
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       AsyncDataClient client = metaGroupMember.getClientProvider().getAsyncDataClient(node,
           RaftServer.getReadOperationTimeoutMS());
-      paths = SyncClientAdaptor.getAllDevices(client, header,
-          pathsToQuery);
+      paths = SyncClientAdaptor.getAllDevices(client, header, raftId, pathsToQuery);
     } else {
       SyncDataClient syncDataClient = metaGroupMember.getClientProvider().getSyncDataClient(node,
           RaftServer.getReadOperationTimeoutMS());
-      paths = syncDataClient.getAllDevices(header, pathsToQuery);
+      paths = syncDataClient.getAllDevices(header, raftId, pathsToQuery);
       ClientUtils.putBackSyncClient(syncDataClient);
     }
     return paths;
@@ -1340,8 +1339,7 @@ public class CMManager extends MManager {
   private void showLocalTimeseries(PartitionGroup group, ShowTimeSeriesPlan plan,
       Set<ShowTimeSeriesResult> resultSet, QueryContext context)
       throws CheckConsistencyException, MetadataException {
-    Node header = group.getHeader();
-    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
     localDataMember.syncLeaderWithConsistencyCheck(false);
     try {
       List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context);
@@ -1392,14 +1390,14 @@ public class CMManager extends MManager {
       AsyncDataClient client = metaGroupMember
           .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
       resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(),
-          plan);
+          group.getId(), plan);
     } else {
       SyncDataClient syncDataClient = metaGroupMember
           .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
       ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
       DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
       plan.serialize(dataOutputStream);
-      resultBinary = syncDataClient.getAllMeasurementSchema(group.getHeader(),
+      resultBinary = syncDataClient.getAllMeasurementSchema(group.getHeader(), group.getId(),
           ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
       ClientUtils.putBackSyncClient(syncDataClient);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index 68306cd..e185e9d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -120,7 +120,7 @@ public class MetaPuller {
       List<PartialPath> prefixPaths, List<MeasurementSchema> results) {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the node is in the target group, synchronize with leader should be enough
-      metaGroupMember.getLocalDataMember(partitionGroup.getHeader(),
+      metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId(),
           "Pull timeseries of " + prefixPaths).syncLeader();
       int preSize = results.size();
       for (PartialPath prefixPath : prefixPaths) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index dc5cbcc..5ab4275 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -69,7 +69,6 @@ public class PartitionGroup extends ArrayList<Node> {
     return Objects.hash(id, getHeader());
   }
 
-
   public Node getHeader() {
     return get(0);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index a1a5ae6..f1c9a91 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.commons.collections4.map.MultiKeyMap;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -55,7 +56,7 @@ public interface PartitionTable {
    * @param timestamp
    * @return
    */
-  Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long timestamp);
+  RaftNode routeToHeaderByTime(String storageGroupName, long timestamp);
 
   /**
    * Add a new node to update the partition table.
@@ -79,10 +80,12 @@ public interface PartitionTable {
   List<PartitionGroup> getLocalGroups();
 
   /**
-   * @param pair
+   * @param raftNode
    * @return the partition group starting from the header.
    */
-  PartitionGroup getHeaderGroup(Pair<Node, Integer> pair);
+  PartitionGroup getHeaderGroup(RaftNode raftNode);
+
+  PartitionGroup getHeaderGroup(Node node);
 
   ByteBuffer serialize();
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index a1f98ce..355ac95 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotStrategy.DefaultStrategy;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.db.utils.SerializeUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +46,8 @@ public class SlotPartitionTable implements PartitionTable {
   private int replicationNum =
       ClusterDescriptor.getInstance().getConfig().getReplicationNum();
 
-  private int raftGroupNum = 2;
+  private int multiRaftFactor =
+      ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
 
   //all nodes
   private List<Node> nodeRing = new ArrayList<>();
@@ -55,12 +56,12 @@ public class SlotPartitionTable implements PartitionTable {
 
   //The following fields are used for determining which node a data item belongs to.
   // the slots held by each node
-  private Map<Pair<Node, Integer>, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>();
   // each slot is managed by whom
-  private Pair<Node, Integer>[] slotNodes = new Pair[ClusterConstant.SLOT_NUM];
+  private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM];
   // the nodes that each slot belongs to before a new node is added, used for the new node to
   // find the data source
-  private Map<Pair<Node, Integer>, Map<Integer, Node>> previousNodeMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, Map<Integer, RaftNode>> previousNodeMap = new ConcurrentHashMap<>();
 
   //the filed is used for determining which nodes need to be a group.
   // the data groups which this node belongs to.
@@ -112,30 +113,28 @@ public class SlotPartitionTable implements PartitionTable {
     // evenly assign the slots to each node
     int nodeNum = nodeRing.size();
     int slotsPerNode = totalSlotNumbers / nodeNum;
-    int slotsPerRaftGroup = slotsPerNode / raftGroupNum;
+    int slotsPerRaftGroup = slotsPerNode / multiRaftFactor;
     for (Node node : nodeRing) {
-      for (int i = 0; i < raftGroupNum; i++) {
-        nodeSlotMap.put(new Pair<>(node, i), new ArrayList<>());
+      for (int i = 0; i < multiRaftFactor; i++) {
+        nodeSlotMap.put(new RaftNode(node, i), new ArrayList<>());
       }
     }
 
     for (int i = 0; i < totalSlotNumbers; i++) {
       int nodeIdx = i / slotsPerNode;
+      int raftId = i % slotsPerNode / slotsPerRaftGroup;
       if (nodeIdx >= nodeNum) {
         // the last node may receive a little more if total slots cannot de divided by node number
         nodeIdx--;
       }
-      for (int j = 0; j < nodeIdx; j++) {
-        int groupIdx = j / slotsPerRaftGroup;
-        if (groupIdx >= raftGroupNum) {
-          groupIdx--;
-        }
-        nodeSlotMap.get(new Pair<>(nodeRing.get(nodeIdx), groupIdx)).add(i);
+      if (raftId >= multiRaftFactor) {
+        raftId--;
       }
+      nodeSlotMap.get(new RaftNode(nodeRing.get(nodeIdx), raftId)).add(i);
     }
 
     // build the index to find a node by slot
-    for (Entry<Pair<Node, Integer>, List<Integer>> entry : nodeSlotMap.entrySet()) {
+    for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
       for (Integer slot : entry.getValue()) {
         slotNodes[slot] = entry.getKey();
       }
@@ -155,8 +154,8 @@ public class SlotPartitionTable implements PartitionTable {
       if (startIndex < 0) {
         startIndex = startIndex + nodeRing.size();
       }
-      for (int j = 0; j < raftGroupNum; j++) {
-        ret.add(getHeaderGroup(new Pair<>(nodeRing.get(startIndex), j)));
+      for (int j = 0; j < multiRaftFactor; j++) {
+        ret.add(getHeaderGroup(new RaftNode(nodeRing.get(startIndex), j)));
       }
     }
 
@@ -165,13 +164,13 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public PartitionGroup getHeaderGroup(Pair<Node, Integer> pair) {
-    PartitionGroup ret = new PartitionGroup(pair.right);
+  public PartitionGroup getHeaderGroup(RaftNode raftNode) {
+    PartitionGroup ret = new PartitionGroup(raftNode.getRaftId());
 
     // assuming the nodes are [1,2,3,4,5]
-    int nodeIndex = nodeRing.indexOf(pair.left);
+    int nodeIndex = nodeRing.indexOf(raftNode.getNode());
     if (nodeIndex == -1) {
-      logger.error("Node {} is not in the cluster", pair.left);
+      logger.error("Node {} is not in the cluster", raftNode.getNode());
       return null;
     }
     int endIndex = nodeIndex + replicationNum;
@@ -187,10 +186,15 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
+  public PartitionGroup getHeaderGroup(Node node) {
+    return getHeaderGroup(new RaftNode(node, 0));
+  }
+
+  @Override
   public PartitionGroup route(String storageGroupName, long timestamp) {
     synchronized (nodeRing) {
-      Pair<Node, Integer> pair = routeToHeaderByTime(storageGroupName, timestamp);
-      return getHeaderGroup(pair);
+      RaftNode raftNode = routeToHeaderByTime(storageGroupName, timestamp);
+      return getHeaderGroup(raftNode);
     }
   }
 
@@ -200,24 +204,24 @@ public class SlotPartitionTable implements PartitionTable {
           Thread.currentThread().getStackTrace());
       return null;
     }
-    Pair<Node, Integer> pair = slotNodes[slot];
-    logger.debug("The slot of {} is held by {}", slot, pair);
-    if (pair.left == null) {
+    RaftNode raftNode = slotNodes[slot];
+    logger.debug("The slot of {} is held by {}", slot, raftNode);
+    if (raftNode.getNode() == null) {
       logger.warn("The slot {} is incorrect", slot);
       return null;
     }
-    return getHeaderGroup(pair);
+    return getHeaderGroup(raftNode);
   }
 
   @Override
-  public Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long timestamp) {
+  public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) {
     synchronized (nodeRing) {
       int slot = getSlotStrategy()
           .calculateSlotByTime(storageGroupName, timestamp, getTotalSlotNumbers());
-      Pair<Node, Integer> pair = slotNodes[slot];
+      RaftNode raftNode = slotNodes[slot];
       logger.trace("The slot of {}@{} is {}, held by {}", storageGroupName, timestamp,
-          slot, pair);
-      return pair;
+          slot, raftNode);
+      return raftNode;
     }
   }
 
@@ -325,9 +329,9 @@ public class SlotPartitionTable implements PartitionTable {
     try {
       dataOutputStream.writeInt(totalSlotNumbers);
       dataOutputStream.writeInt(nodeSlotMap.size());
-      for (Entry<Pair<Node, Integer>, List<Integer>> entry : nodeSlotMap.entrySet()) {
-        SerializeUtils.serialize(entry.getKey().left, dataOutputStream);
-        dataOutputStream.writeInt(entry.getKey().right);
+      for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
+        SerializeUtils.serialize(entry.getKey().getNode(), dataOutputStream);
+        dataOutputStream.writeInt(entry.getKey().getRaftId());
         SerializeUtils.serialize(entry.getValue(), dataOutputStream);
       }
 
@@ -363,11 +367,11 @@ public class SlotPartitionTable implements PartitionTable {
       SerializeUtils.deserialize(node, buffer);
       int id = buffer.getInt();
       SerializeUtils.deserialize(slots, buffer);
-      Pair pair = new Pair<>(node, id);
-      nodeSlotMap.put(pair, slots);
+      RaftNode raftNode = new RaftNode(node, id);
+      nodeSlotMap.put(raftNode, slots);
       idNodeMap.put(node.getNodeIdentifier(), node);
       for (Integer slot : slots) {
-        slotNodes[slot] = pair;
+        slotNodes[slot] = raftNode;
       }
     }
 
@@ -388,9 +392,9 @@ public class SlotPartitionTable implements PartitionTable {
 //    }
     lastLogIndex = buffer.getLong();
 
-    for (Pair<Node, Integer> nodeIntegerPair : nodeSlotMap.keySet()) {
-      if (!nodeRing.contains(nodeIntegerPair.left)) {
-        nodeRing.add(nodeIntegerPair.left);
+    for (RaftNode raftNode : nodeSlotMap.keySet()) {
+      if (!nodeRing.contains(raftNode.getNode())) {
+        nodeRing.add(raftNode.getNode());
       }
     }
     nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
@@ -404,15 +408,19 @@ public class SlotPartitionTable implements PartitionTable {
     return nodeRing;
   }
 
-  public Map<Integer, Node> getPreviousNodeMap(Node node) {
-    return previousNodeMap.get(node);
+  public Map<Integer, RaftNode> getPreviousNodeMap(RaftNode raftNode) {
+    return previousNodeMap.get(raftNode);
+  }
+
+  public List<Integer> getNodeSlots(Node header, int raftId) {
+    return getNodeSlots(new RaftNode(header, raftId));
   }
 
-  public List<Integer> getNodeSlots(Node header) {
+  public List<Integer> getNodeSlots(RaftNode header) {
     return nodeSlotMap.get(header);
   }
 
-  public Map<Pair<Node, Integer>, List<Integer>> getAllNodeSlots() {
+  public Map<RaftNode, List<Integer>> getAllNodeSlots() {
     return nodeSlotMap;
   }
 
@@ -516,9 +524,9 @@ public class SlotPartitionTable implements PartitionTable {
 
   private void calculateGlobalGroups() {
     globalGroups = new ArrayList<>();
-    for (Node n : getAllNodes()) {
-      for (int i = 0; i < raftGroupNum; i++) {
-        globalGroups.add(getHeaderGroup(new Pair<>(n, i)));
+    for (Node node : getAllNodes()) {
+      for (int i = 0; i < multiRaftFactor; i++) {
+        globalGroups.add(getHeaderGroup(new RaftNode(node, i)));
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 8b7116b..0acf992 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -177,7 +177,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
       if (partitionGroup.contains(metaGroupMember.getThisNode())) {
         // this node is a member of the group, perform a local query after synchronizing with the
         // leader
-        metaGroupMember.getLocalDataMember(partitionGroup.getHeader())
+        metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
             .syncLeaderWithConsistencyCheck(false);
         int localResult = getLocalPathCount(pathUnderSG, level);
         logger.debug("{}: get path count of {} locally, result {}", metaGroupMember.getName(),
@@ -254,13 +254,13 @@ public class ClusterPlanExecutor extends PlanExecutor {
           AsyncDataClient client = metaGroupMember
               .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
           client.setTimeout(RaftServer.getReadOperationTimeoutMS());
-          count = SyncClientAdaptor.getPathCount(client, partitionGroup.getHeader(),
+          count = SyncClientAdaptor.getPathCount(client, partitionGroup.getHeader(), partitionGroup.getId(),
               pathsToQuery, level);
         } else {
           SyncDataClient syncDataClient = metaGroupMember
               .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
           syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
-          count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
+          count = syncDataClient.getPathCount(partitionGroup.getHeader(), partitionGroup.getId(), pathsToQuery, level);
           ClientUtils.putBackSyncClient(syncDataClient);
         }
 
@@ -338,12 +338,12 @@ public class ClusterPlanExecutor extends PlanExecutor {
   private List<PartialPath> getLocalNodesList(PartitionGroup group, PartialPath schemaPattern,
       int level) throws CheckConsistencyException, MetadataException {
     Node header = group.getHeader();
-    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header, group.getId());
     localDataMember.syncLeaderWithConsistencyCheck(false);
     try {
       return IoTDB.metaManager.getNodesList(schemaPattern, level,
           new SlotSgFilter(
-              ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header)));
+              ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header, group.getId())));
     } catch (MetadataException e) {
       logger
           .error("Cannot not get node list of {}@{} from {} locally", schemaPattern, level, group);
@@ -360,11 +360,11 @@ public class ClusterPlanExecutor extends PlanExecutor {
           AsyncDataClient client = metaGroupMember
               .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
           paths = SyncClientAdaptor
-              .getNodeList(client, group.getHeader(), schemaPattern.getFullPath(), level);
+              .getNodeList(client, group.getHeader(), group.getId(), schemaPattern.getFullPath(), level);
         } else {
           SyncDataClient syncDataClient = metaGroupMember
               .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
-          paths = syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
+          paths = syncDataClient.getNodeList(group.getHeader(), group.getId(), schemaPattern.getFullPath(), level);
           ClientUtils.putBackSyncClient(syncDataClient);
         }
 
@@ -446,7 +446,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
   private Set<String> getLocalNextChildren(PartitionGroup group, PartialPath path)
       throws CheckConsistencyException {
     Node header = group.getHeader();
-    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header, group.getId());
     localDataMember.syncLeaderWithConsistencyCheck(false);
     try {
       return IoTDB.metaManager.getChildNodePathInNextLevel(path);
@@ -465,12 +465,12 @@ public class ClusterPlanExecutor extends PlanExecutor {
           AsyncDataClient client = metaGroupMember
               .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
           nextChildren = SyncClientAdaptor
-              .getNextChildren(client, group.getHeader(), path.getFullPath());
+              .getNextChildren(client, group.getHeader(), group.getId(), path.getFullPath());
         } else {
           SyncDataClient syncDataClient = metaGroupMember
               .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
           nextChildren = syncDataClient
-              .getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
+              .getChildNodePathInNextLevel(group.getHeader(), group.getId(), path.getFullPath());
           ClientUtils.putBackSyncClient(syncDataClient);
         }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index d4fe221..49e20af 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -425,7 +425,7 @@ public class LocalQueryExecutor {
     }
     List<Integer> nodeSlots =
         ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable()).getNodeSlots(
-            dataGroupMember.getHeader());
+            dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
     try {
       if (ascending) {
         AggregationExecutor.aggregateOneSeries(new PartialPath(path), allSensors, context, timeFilter,
@@ -490,7 +490,7 @@ public class LocalQueryExecutor {
 
     ClusterQueryUtils.checkPathExistence(path);
     List<Integer> nodeSlots = ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
-        .getNodeSlots(dataGroupMember.getHeader());
+        .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
     LocalGroupByExecutor executor = new LocalGroupByExecutor(path,
         deviceMeasurements, dataType, context, timeFilter, new SlotTsFileFilter(nodeSlots), ascending);
     for (Integer aggregationType : aggregationTypes) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java
index 439dbe0..cb7abd3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java
@@ -25,13 +25,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.db.query.context.QueryContext;
 
 public class RemoteQueryContext extends QueryContext {
   /**
    * The remote nodes that are queried in this query, grouped by the header nodes.
    */
-  private Map<Node, Set<Node>> queriedNodesMap = new HashMap<>();
+  private Map<RaftNode, Set<Node>> queriedNodesMap = new HashMap<>();
   /**
    * The readers constructed locally to respond a remote query.
    */
@@ -46,8 +47,8 @@ public class RemoteQueryContext extends QueryContext {
     super(jobId);
   }
 
-  public void registerRemoteNode(Node node, Node header) {
-    queriedNodesMap.computeIfAbsent(header, n -> new HashSet<>()).add(node);
+  public void registerRemoteNode(Node node, Node header, int raftId) {
+    queriedNodesMap.computeIfAbsent(new RaftNode(header, raftId), n -> new HashSet<>()).add(node);
   }
 
   public void registerLocalReader(long readerId) {
@@ -66,7 +67,7 @@ public class RemoteQueryContext extends QueryContext {
     return localGroupByExecutorIds;
   }
 
-  public Map<Node, Set<Node>> getQueriedNodesMap() {
+  public Map<RaftNode, Set<Node>> getQueriedNodesMap() {
     return queriedNodesMap;
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index 99067bc..c2b5974 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -125,7 +125,7 @@ public class ClusterAggregator {
           , partitionGroup, context, ascending);
     } else {
       // perform the aggregations locally
-      DataGroupMember dataMember = metaGroupMember.getLocalDataMember(partitionGroup.getHeader());
+      DataGroupMember dataMember = metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId());
       LocalQueryExecutor localQueryExecutor = new LocalQueryExecutor(dataMember);
       try {
         logger
@@ -186,7 +186,7 @@ public class ClusterAggregator {
             results.add(result);
           }
           // register the queried node to release resources when the query ends
-          ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader());
+          ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId());
           logger.debug("{}: queried aggregation {} of {} from {} of {} are {}",
               metaGroupMember.getName(),
               aggregations, path, node, partitionGroup.getHeader(), results);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index 3887b34..7ebc018 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -138,7 +138,7 @@ public class ClusterPreviousFill extends PreviousFill {
   private void localPreviousFill(PreviousFillArguments arguments, QueryContext context,
       PartitionGroup group,
       PreviousFillHandler fillHandler) {
-    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader());
+    DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
     try {
       fillHandler
           .onComplete(
@@ -157,7 +157,7 @@ public class ClusterPreviousFill extends PreviousFill {
     PreviousFillRequest request = new PreviousFillRequest(arguments.getPath().getFullPath(),
         arguments.getQueryTime(),
         arguments.getBeforeRange(), context.getQueryId(), metaGroupMember.getThisNode(),
-        group.getHeader(),
+        group.getHeader(), group.getId(),
         arguments.getDataType().ordinal(),
         arguments.getDeviceMeasurements());
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
index f333baa..2078f09 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
@@ -47,17 +47,18 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
   private MetaGroupMember metaGroupMember;
   private Node source;
   private Node header;
+  private int raftId;
 
   private List<AggregateResult> results = new ArrayList<>();
 
 
   public RemoteGroupByExecutor(long executorId,
-      MetaGroupMember metaGroupMember, Node source, Node header) {
+      MetaGroupMember metaGroupMember, Node source, Node header, int raftId) {
     this.executorId = executorId;
     this.metaGroupMember = metaGroupMember;
     this.source = source;
     this.header = header;
-
+    this.raftId = raftId;
   }
 
   @Override
@@ -81,11 +82,11 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
         AsyncDataClient client = metaGroupMember
             .getClientProvider().getAsyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
         aggrBuffers = SyncClientAdaptor
-            .getGroupByResult(client, header, executorId, curStartTime, curEndTime);
+            .getGroupByResult(client, header, raftId, executorId, curStartTime, curEndTime);
       } else {
         SyncDataClient syncDataClient = metaGroupMember
             .getClientProvider().getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
-        aggrBuffers = syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
+        aggrBuffers = syncDataClient.getGroupByResult(header, raftId, executorId, curStartTime, curEndTime);
         ClientUtils.putBackSyncClient(syncDataClient);
       }
 
@@ -116,11 +117,11 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
         AsyncDataClient client = metaGroupMember
             .getClientProvider().getAsyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
         aggrBuffer = SyncClientAdaptor
-            .peekNextNotNullValue(client, header, executorId, nextStartTime, nextEndTime);
+            .peekNextNotNullValue(client, header, raftId,executorId, nextStartTime, nextEndTime);
       } else {
         SyncDataClient syncDataClient = metaGroupMember
             .getClientProvider().getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
-        aggrBuffer = syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
+        aggrBuffer = syncDataClient.peekNextNotNullValue(header, raftId, executorId, nextStartTime, nextEndTime);
         ClientUtils.putBackSyncClient(syncDataClient);
       }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index 1dac967..21b400a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -172,7 +172,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
         List<PartialPath> seriesPaths,
         QueryContext context)
         throws StorageEngineException, QueryProcessException, IOException {
-      DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader());
+      DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
       try {
         localDataMember.syncLeaderWithConsistencyCheck(false);
       } catch (CheckConsistencyException e) {
@@ -231,7 +231,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
       }
       buffer = SyncClientAdaptor
           .last(asyncDataClient, seriesPaths, dataTypeOrdinals, context, queryPlan.getDeviceToMeasurements(),
-              group.getHeader());
+              group.getHeader(), group.getId());
       return buffer;
     }
 
@@ -240,7 +240,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
           .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
       ByteBuffer result = syncDataClient
           .last(new LastQueryRequest(PartialPath.toStringList(seriesPaths), dataTypeOrdinals,
-              context.getQueryId(), queryPlan.getDeviceToMeasurements(), group.getHeader(),
+              context.getQueryId(), queryPlan.getDeviceToMeasurements(), group.getHeader(), group.getId(),
               syncDataClient.getNode()));
       ClientUtils.putBackSyncClient(syncDataClient);
       return result;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 3baab29..d99bbbf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -128,7 +128,7 @@ public class ClusterReaderFactory {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the target storage group contains this node, perform a local query
       DataGroupMember dataGroupMember =
-          metaGroupMember.getLocalDataMember(partitionGroup.getHeader());
+          metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId());
       if (logger.isDebugEnabled()) {
         logger.debug("{}: creating a local reader for {}#{}", metaGroupMember.getName(),
             path.getFullPath(),
@@ -224,7 +224,7 @@ public class ClusterReaderFactory {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the target storage group contains this node, perform a local query
       DataGroupMember dataGroupMember =
-          metaGroupMember.getLocalDataMember(partitionGroup.getHeader(),
+          metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId(),
               String.format("Query: %s, time filter: %s, queryId: %d", path, timeFilter,
                   context.getQueryId()));
       IPointReader seriesPointReader = getSeriesPointReader(path, deviceMeasurements, dataType,
@@ -268,7 +268,7 @@ public class ClusterReaderFactory {
     }
     return new SeriesRawDataPointReader(
         getSeriesReader(path, allSensors, dataType, timeFilter,
-            valueFilter, context, dataGroupMember.getHeader(), ascending));
+            valueFilter, context, dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId(), ascending));
 
   }
 
@@ -287,11 +287,11 @@ public class ClusterReaderFactory {
   private SeriesReader getSeriesReader(PartialPath path, Set<String> allSensors, TSDataType
       dataType,
       Filter timeFilter,
-      Filter valueFilter, QueryContext context, Node header, boolean ascending)
+      Filter valueFilter, QueryContext context, Node header, int raftId, boolean ascending)
       throws StorageEngineException, QueryProcessException {
     ClusterQueryUtils.checkPathExistence(path);
     List<Integer> nodeSlots =
-        ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header);
+        ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header, raftId);
     QueryDataSource queryDataSource =
         QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
     return new SeriesReader(path, allSensors, dataType, context, queryDataSource,
@@ -409,7 +409,7 @@ public class ClusterReaderFactory {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the target storage group contains this node, perform a local query
       DataGroupMember dataGroupMember = metaGroupMember
-          .getLocalDataMember(partitionGroup.getHeader());
+          .getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId());
       LocalQueryExecutor localQueryExecutor = new LocalQueryExecutor(dataGroupMember);
       logger.debug("{}: creating a local group by executor for {}#{}", metaGroupMember.getName(),
           path.getFullPath(), context.getQueryId());
@@ -461,13 +461,13 @@ public class ClusterReaderFactory {
 
         if (executorId != -1) {
           // record the queried node to release resources later
-          ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader());
+          ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId());
           logger.debug("{}: get an executorId {} for {}@{} from {}", metaGroupMember.getName(),
               executorId,
               aggregationTypes, path, node);
           // create a remote executor with the return id
           RemoteGroupByExecutor remoteGroupByExecutor = new RemoteGroupByExecutor(executorId,
-              metaGroupMember, node, partitionGroup.getHeader());
+              metaGroupMember, node, partitionGroup.getHeader(), partitionGroup.getId());
           for (Integer aggregationType : aggregationTypes) {
             remoteGroupByExecutor.addAggregateResult(AggregateResultFactory.getAggrResultByType(
                 AggregationType.values()[aggregationType], dataType, ascending));
@@ -530,7 +530,7 @@ public class ClusterReaderFactory {
     }
 
     SeriesReader seriesReader = getSeriesReader(path, allSensors, dataType, timeFilter,
-        valueFilter, context, dataGroupMember.getHeader(), ascending);
+        valueFilter, context, dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId(), ascending);
     if (seriesReader.isEmpty()) {
       return null;
     }
@@ -556,8 +556,8 @@ public class ClusterReaderFactory {
       throw new StorageEngineException(e);
     }
     SeriesReader seriesReader = getSeriesReader(path, allSensors, dataType,
-        TimeFilter.gtEq(Long.MIN_VALUE),
-        null, context, dataGroupMember.getHeader(), ascending);
+        TimeFilter.gtEq(Long.MIN_VALUE), null, context, dataGroupMember.getHeader(),
+        dataGroupMember.getRaftGroupId(), ascending);
     try {
       if (seriesReader.isEmpty()) {
         return null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 7f408de..b3adb16 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -92,7 +92,7 @@ public class DataSourceInfo {
           logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node);
           if (newReaderId != -1) {
             // register the node so the remote resources can be released
-            context.registerRemoteNode(node, partitionGroup.getHeader());
+            context.registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId());
             this.readerId = newReaderId;
             this.curSource = node;
             this.curPos = nextNodePos;
@@ -170,6 +170,10 @@ public class DataSourceInfo {
     }
   }
 
+  public int getRaftId() {
+    return partitionGroup.getId();
+  }
+
   public long getReaderId() {
     return this.readerId;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index 208ac95..3264398 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -67,7 +67,7 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
       fetchResult.set(null);
       try {
         sourceInfo.getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
-            .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(),
+            .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(), sourceInfo.getRaftId(),
                 sourceInfo.getReaderId(), timestamp, handler);
         fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
       } catch (TException e) {
@@ -90,7 +90,7 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
       SyncDataClient curSyncClient = sourceInfo
           .getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
       ByteBuffer buffer = curSyncClient
-          .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(),
+          .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(), sourceInfo.getRaftId(),
               sourceInfo.getReaderId(), timestamp);
       curSyncClient.putBack();
       return buffer;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
index 3cee99b..3c6d745 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
@@ -117,7 +117,7 @@ public class RemoteSimpleSeriesReader implements IPointReader {
       fetchResult.set(null);
       try {
         sourceInfo.getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
-            .fetchSingleSeries(sourceInfo.getHeader(),
+            .fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getRaftId(),
                 sourceInfo.getReaderId(), handler);
         fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
       } catch (TException e) {
@@ -140,7 +140,7 @@ public class RemoteSimpleSeriesReader implements IPointReader {
       SyncDataClient curSyncClient = sourceInfo
           .getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
       ByteBuffer buffer = curSyncClient
-          .fetchSingleSeries(sourceInfo.getHeader(),
+          .fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getRaftId(),
               sourceInfo.getReaderId());
       curSyncClient.putBack();
       return buffer;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index f0c554e..2fdc3f9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
 import org.apache.iotdb.cluster.query.ClusterPlanner;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -294,8 +295,8 @@ public class ClientServer extends TSServiceImpl {
     RemoteQueryContext context = queryContextMap.remove(queryId);
     if (context != null) {
       // release the resources in every queried node
-      for (Entry<Node, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
-        Node header = headerEntry.getKey();
+      for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
+        RaftNode header = headerEntry.getKey();
         Set<Node> queriedNodes = headerEntry.getValue();
 
         for (Node queriedNode : queriedNodes) {
@@ -305,12 +306,12 @@ public class ClientServer extends TSServiceImpl {
               AsyncDataClient client = metaGroupMember
                   .getClientProvider().getAsyncDataClient(queriedNode,
                       RaftServer.getReadOperationTimeoutMS());
-              client.endQuery(header, metaGroupMember.getThisNode(), queryId, handler);
+              client.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId, handler);
             } else {
               SyncDataClient syncDataClient = metaGroupMember
                   .getClientProvider().getSyncDataClient(queriedNode,
                       RaftServer.getReadOperationTimeoutMS());
-              syncDataClient.endQuery(header, metaGroupMember.getThisNode(), queryId);
+              syncDataClient.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId);
             }
           } catch (IOException | TException e) {
             logger.error("Cannot end query {} in {}", queryId, queriedNode);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 76fca3b..e5843f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
@@ -63,7 +64,6 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.service.DataAsyncService;
 import org.apache.iotdb.cluster.server.service.DataSyncService;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -81,9 +81,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   // key: the header of a data group, value: the member representing this node in this group and
   // it is currently at service
-  private Map<Pair<Node, Integer>, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>();
-  private Map<Pair<Node, Integer>, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>();
-  private Map<Pair<Node, Integer>, DataSyncService> syncServiceMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, DataSyncService> syncServiceMap = new ConcurrentHashMap<>();
   // key: the header of a data group, value: the member representing this node in this group but
   // it is out of service because another node has joined the group and expelled this node, or
   // the node itself is removed, but it is still stored to provide snapshot for other nodes
@@ -116,56 +116,66 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    * @param dataGroupMember
    */
   public void addDataGroupMember(DataGroupMember dataGroupMember) {
-    Pair<Node, Integer> pair = new Pair<>(dataGroupMember.getHeader(),
+    RaftNode raftNode = new RaftNode(dataGroupMember.getHeader(),
         dataGroupMember.getRaftGroupId());
-    DataGroupMember removedMember = headerGroupMap
-        .remove(pair);
+    DataGroupMember removedMember = headerGroupMap.remove(raftNode);
     if (removedMember != null) {
       removedMember.stop();
-      asyncServiceMap.remove(pair);
-      syncServiceMap.remove(pair);
+      asyncServiceMap.remove(raftNode);
+      syncServiceMap.remove(raftNode);
     }
-    stoppedMemberManager.remove(pair);
+    stoppedMemberManager.remove(raftNode);
 
-    headerGroupMap.put(pair, dataGroupMember);
+    headerGroupMap.put(raftNode, dataGroupMember);
   }
 
-  private <T> DataAsyncService getDataAsyncService(Node header, Integer id,
+  private <T> DataAsyncService getDataAsyncService(Node node, int raftId,
       AsyncMethodCallback<T> resultHandler, Object request) {
-    Pair<Node, Integer> pair = new Pair<>(header, id);
-    return asyncServiceMap.computeIfAbsent(pair, h -> {
-      DataGroupMember dataMember = getDataMember(pair, resultHandler, request);
+      return getDataAsyncService(new RaftNode(node, raftId), resultHandler, request);
+  }
+
+  private <T> DataAsyncService getDataAsyncService(RaftNode raftNode,
+      AsyncMethodCallback<T> resultHandler, Object request) {
+    return asyncServiceMap.computeIfAbsent(raftNode, h -> {
+      DataGroupMember dataMember = getDataMember(raftNode, resultHandler, request);
       return dataMember != null ? new DataAsyncService(dataMember) : null;
     });
   }
 
-  private DataSyncService getDataSyncService(Node header, Integer id) {
-    Pair<Node, Integer> pair = new Pair<>(header, id);
-    return syncServiceMap.computeIfAbsent(pair, h -> {
-      DataGroupMember dataMember = getDataMember(pair, null, null);
+  private DataSyncService getDataSyncService(Node header, int raftId) {
+    return getDataSyncService(new RaftNode(header, raftId));
+  }
+
+  private DataSyncService getDataSyncService(RaftNode header) {
+    return syncServiceMap.computeIfAbsent(header, h -> {
+      DataGroupMember dataMember = getDataMember(header, null, null);
       return dataMember != null ? new DataSyncService(dataMember) : null;
     });
   }
 
+  public <T> DataGroupMember getDataMember(Node node, int raftId,
+      AsyncMethodCallback<T> resultHandler, Object request) {
+    return getDataMember(new RaftNode(node, raftId), resultHandler, request);
+  }
+
   /**
-   * @param pair          the header of the group which the local node is in
+   * @param raftNode          the header of the group which the local node is in
    * @param resultHandler can be set to null if the request is an internal request
    * @param request       the toString() of this parameter should explain what the request is and it
    *                      is only used in logs for tracing
    * @return
    */
-  public <T> DataGroupMember getDataMember(Pair<Node, Integer> pair,
-      AsyncMethodCallback<T> resultHandler,
-      Object request) {
+  public <T> DataGroupMember getDataMember(RaftNode raftNode,
+      AsyncMethodCallback<T> resultHandler, Object request) {
     // if the resultHandler is not null, then the request is a external one and must be with a
     // header
-    if (pair.left == null) {
+    if (raftNode.getNode() == null) {
       if (resultHandler != null) {
         resultHandler.onError(new NoHeaderNodeException());
       }
       return null;
     }
-    DataGroupMember member = stoppedMemberManager.get(pair);
+    DataGroupMember member = stoppedMemberManager.get(raftNode);
     if (member != null) {
       return member;
     }
@@ -173,14 +183,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     // avoid creating two members for a header
     Exception ex = null;
     synchronized (headerGroupMap) {
-      member = headerGroupMap.get(pair);
+      member = headerGroupMap.get(raftNode);
       if (member != null) {
         return member;
       }
-      logger.info("Received a request \"{}\" from unregistered header {}", request, pair);
+      logger.info("Received a request \"{}\" from unregistered header {}", request, raftNode);
       if (partitionTable != null) {
         try {
-          member = createNewMember(pair);
+          member = createNewMember(raftNode);
         } catch (NotInSameGroupException | CheckConsistencyException e) {
           ex = e;
         }
@@ -196,37 +206,37 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   /**
-   * @param pair
+   * @param raftNode
    * @return A DataGroupMember representing this node in the data group of the header.
    * @throws NotInSameGroupException If this node is not in the group of the header.
    */
-  private DataGroupMember createNewMember(Pair<Node, Integer> pair)
+  private DataGroupMember createNewMember(RaftNode raftNode)
       throws NotInSameGroupException, CheckConsistencyException {
     DataGroupMember member;
     PartitionGroup partitionGroup;
-    partitionGroup = partitionTable.getHeaderGroup(pair);
+    partitionGroup = partitionTable.getHeaderGroup(raftNode);
     if (partitionGroup == null || !partitionGroup.contains(thisNode)) {
       // if the partition table is old, this node may have not been moved to the new group
       metaGroupMember.syncLeaderWithConsistencyCheck(true);
-      partitionGroup = partitionTable.getHeaderGroup(pair);
+      partitionGroup = partitionTable.getHeaderGroup(raftNode);
     }
     if (partitionGroup != null && partitionGroup.contains(thisNode)) {
       // the two nodes are in the same group, create a new data member
       member = dataMemberFactory.create(partitionGroup, thisNode);
-      DataGroupMember prevMember = headerGroupMap.put(pair, member);
+      DataGroupMember prevMember = headerGroupMap.put(raftNode, member);
       if (prevMember != null) {
         prevMember.stop();
       }
-      logger.info("Created a member for header {}", pair);
+      logger.info("Created a member for header {}", raftNode);
       member.start();
     } else {
       // the member may have been stopped after syncLeader
-      member = stoppedMemberManager.get(pair);
+      member = stoppedMemberManager.get(raftNode);
       if (member != null) {
         return member;
       }
       logger.info("This node {} does not belong to the group {}, header {}", thisNode,
-          partitionGroup, pair);
+          partitionGroup, raftNode);
       throw new NotInSameGroupException(partitionGroup, thisNode);
     }
     return member;
@@ -256,8 +266,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.appendEntries(request, resultHandler);
     }
@@ -265,8 +274,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.appendEntry(request, resultHandler);
     }
@@ -274,8 +282,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.sendSnapshot(request, resultHandler);
     }
@@ -284,8 +291,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void pullSnapshot(PullSnapshotRequest request,
       AsyncMethodCallback<PullSnapshotResp> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.pullSnapshot(request, resultHandler);
     }
@@ -294,26 +300,24 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void executeNonQueryPlan(ExecutNonQueryReq request,
       AsyncMethodCallback<TSStatus> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.executeNonQueryPlan(request, resultHandler);
     }
   }
 
   @Override
-  public void requestCommitIndex(Node header, int id, AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
-        "Request commit index");
+  public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) {
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Request commit index");
     if (service != null) {
-      service.requestCommitIndex(header, resultHandler);
+      service.requestCommitIndex(header, raftId, resultHandler);
     }
   }
 
   @Override
   public void readFile(String filePath, long offset, int length,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(thisNode, 0, resultHandler,
+    DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, 0), resultHandler,
         "Read file:" + filePath);
     if (service != null) {
       service.readFile(filePath, offset, length, resultHandler);
@@ -324,46 +328,44 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   public void querySingleSeries(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
     DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler,
-        "Query series:" + request.getPath());
+        resultHandler, "Query series:" + request.getPath());
     if (service != null) {
       service.querySingleSeries(request, resultHandler);
     }
   }
 
   @Override
-  public void fetchSingleSeries(Node header, int id, long readerId,
+  public void fetchSingleSeries(Node header, int raftId, long readerId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Fetch reader:" + readerId);
     if (service != null) {
-      service.fetchSingleSeries(header, readerId, resultHandler);
+      service.fetchSingleSeries(header, raftId, readerId, resultHandler);
     }
   }
 
   @Override
-  public void getAllPaths(Node header, int id, List<String> paths, boolean withAlias,
+  public void getAllPaths(Node header, int raftId, List<String> paths, boolean withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Find path:" + paths);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Find path:" + paths);
     if (service != null) {
-      service.getAllPaths(header, paths, withAlias, resultHandler);
+      service.getAllPaths(header, raftId, paths, withAlias, resultHandler);
     }
   }
 
   @Override
-  public void endQuery(Node header, int id, Node thisNode, long queryId,
+  public void endQuery(Node header, int raftId, Node thisNode, long queryId,
       AsyncMethodCallback<Void> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "End query");
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "End query");
     if (service != null) {
-      service.endQuery(header, thisNode, queryId, resultHandler);
+      service.endQuery(header, raftId, thisNode, queryId, resultHandler);
     }
   }
 
   @Override
   public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler,
         "Query by timestamp:" + request.getQueryId() + "#" + request.getPath() + " of " + request
             .getRequester());
     if (service != null) {
@@ -372,20 +374,19 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(Node header, int id, long readerId, long time,
+  public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Fetch by timestamp:" + readerId);
     if (service != null) {
-      service.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler);
+      service.fetchSingleSeriesByTimestamp(header, raftId, readerId, time, resultHandler);
     }
   }
 
   @Override
   public void pullTimeSeriesSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     if (service != null) {
       service.pullTimeSeriesSchema(request, resultHandler);
     }
@@ -394,70 +395,67 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void pullMeasurementSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler,
         "Pull measurement schema");
     service.pullMeasurementSchema(request, resultHandler);
   }
 
   @Override
-  public void getAllDevices(Node header, int id, List<String> paths,
+  public void getAllDevices(Node header, int raftId, List<String> paths,
       AsyncMethodCallback<Set<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get all devices");
-    service.getAllDevices(header, paths, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Get all devices");
+    service.getAllDevices(header, raftId, paths, resultHandler);
   }
 
   @Override
-  public void getNodeList(Node header, int id, String path, int nodeLevel,
+  public void getNodeList(Node header, int raftId, String path, int nodeLevel,
       AsyncMethodCallback<List<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get node list");
-    service.getNodeList(header, path, nodeLevel, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Get node list");
+    service.getNodeList(header, raftId, path, nodeLevel, resultHandler);
   }
 
   @Override
-  public void getChildNodePathInNextLevel(Node header, int id, String path,
+  public void getChildNodePathInNextLevel(Node header, int raftId, String path,
       AsyncMethodCallback<Set<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Get child node path in next level");
-    service.getChildNodePathInNextLevel(header, path, resultHandler);
+    service.getChildNodePathInNextLevel(header, raftId, path, resultHandler);
   }
 
   @Override
-  public void getAllMeasurementSchema(Node header, int id, ByteBuffer planBytes,
+  public void getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBytes,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Get all measurement schema");
-    service.getAllMeasurementSchema(header, planBytes, resultHandler);
+    service.getAllMeasurementSchema(header, raftId, planBytes, resultHandler);
   }
 
   @Override
   public void getAggrResult(GetAggrResultRequest request,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     service.getAggrResult(request, resultHandler);
   }
 
   @Override
-  public void getUnregisteredTimeseries(Node header, int id, List<String> timeseriesList,
+  public void getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList,
       AsyncMethodCallback<List<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Check if measurements are registered");
-    service.getUnregisteredTimeseries(header, timeseriesList, resultHandler);
+    service.getUnregisteredTimeseries(header, raftId, timeseriesList, resultHandler);
   }
 
   @Override
   public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     service.getGroupByExecutor(request, resultHandler);
   }
 
   @Override
-  public void getGroupByResult(Node header, int id, long executorId, long startTime, long endTime,
+  public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Fetch group by");
-    service.getGroupByResult(header, executorId, startTime, endTime, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Fetch group by");
+    service.getGroupByResult(header, raftId, executorId, startTime, endTime, resultHandler);
   }
 
   @Override
@@ -633,9 +631,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    * which has no data. This is to make that member pull data from other nodes.
    */
   public void pullSnapshots() {
-    List<Integer> slots = ((SlotPartitionTable) partitionTable).getNodeSlots(thisNode);
-    DataGroupMember dataGroupMember = headerGroupMap.get(thisNode);
-    dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
+    for (int raftId = 0; raftId < ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor(); raftId++) {
+      RaftNode raftNode = new RaftNode(thisNode, raftId);
+      List<Integer> slots = ((SlotPartitionTable) partitionTable).getNodeSlots(raftNode);
+      DataGroupMember dataGroupMember = headerGroupMap.get(raftNode);
+      dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
+    }
   }
 
   /**
@@ -653,8 +654,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void previousFill(PreviousFillRequest request,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
     service.previousFill(request, resultHandler);
   }
 
@@ -665,31 +665,30 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void matchTerm(long index, long term, Node header, int id,
+  public void matchTerm(long index, long term, Node header, int raftId,
       AsyncMethodCallback<Boolean> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Match term");
-    service.matchTerm(index, term, header, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Match term");
+    service.matchTerm(index, term, header, raftId, resultHandler);
   }
 
   @Override
   public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
-        resultHandler, "last");
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, "last");
     service.last(request, resultHandler);
   }
 
   @Override
-  public void getPathCount(Node header, int id, List<String> pathsToQuery, int level,
+  public void getPathCount(Node header, int raftId, List<String> pathsToQuery, int level,
       AsyncMethodCallback<Integer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "count path");
-    service.getPathCount(header, pathsToQuery, level, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "count path");
+    service.getPathCount(header, raftId, pathsToQuery, level, resultHandler);
   }
 
   @Override
-  public void onSnapshotApplied(Node header, int id, List<Integer> slots,
+  public void onSnapshotApplied(Node header, int raftId, List<Integer> slots,
       AsyncMethodCallback<Boolean> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Snapshot applied");
-    service.onSnapshotApplied(header, slots, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Snapshot applied");
+    service.onSnapshotApplied(header, raftId, slots, resultHandler);
   }
 
   @Override
@@ -699,13 +698,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public ByteBuffer fetchSingleSeries(Node header, int raftId, long readerId) throws TException {
-    return getDataSyncService(header, raftId).fetchSingleSeries(header, readerId);
+    return getDataSyncService(header, raftId).fetchSingleSeries(header, raftId, readerId);
   }
 
   @Override
   public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) throws TException {
-    return getDataSyncService(request.getHeader(), request.getRaftId())
-        .querySingleSeriesByTimestamp(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).querySingleSeriesByTimestamp(request);
   }
 
   @Override
@@ -713,42 +711,42 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       long timestamp)
       throws TException {
     return getDataSyncService(header, raftId)
-        .fetchSingleSeriesByTimestamp(header, readerId, timestamp);
+        .fetchSingleSeriesByTimestamp(header, raftId, readerId, timestamp);
   }
 
   @Override
   public void endQuery(Node header, int raftId, Node thisNode, long queryId) throws TException {
-    getDataSyncService(header, raftId).endQuery(header, thisNode, queryId);
+    getDataSyncService(header, raftId).endQuery(header, raftId, thisNode, queryId);
   }
 
   @Override
   public GetAllPathsResult getAllPaths(Node header, int raftId, List<String> path,
       boolean withAlias)
       throws TException {
-    return getDataSyncService(header, raftId).getAllPaths(header, path, withAlias);
+    return getDataSyncService(header, raftId).getAllPaths(header, raftId, path, withAlias);
   }
 
   @Override
   public Set<String> getAllDevices(Node header, int raftId, List<String> path) throws TException {
-    return getDataSyncService(header, raftId).getAllDevices(header, path);
+    return getDataSyncService(header, raftId).getAllDevices(header, raftId, path);
   }
 
   @Override
   public List<String> getNodeList(Node header, int raftId, String path, int nodeLevel)
       throws TException {
-    return getDataSyncService(header, raftId).getNodeList(header, path, nodeLevel);
+    return getDataSyncService(header, raftId).getNodeList(header, raftId, path, nodeLevel);
   }
 
   @Override
   public Set<String> getChildNodePathInNextLevel(Node header, int raftId, String path)
       throws TException {
-    return getDataSyncService(header, raftId).getChildNodePathInNextLevel(header, path);
+    return getDataSyncService(header, raftId).getChildNodePathInNextLevel(header, raftId, path);
   }
 
   @Override
   public ByteBuffer getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary)
       throws TException {
-    return getDataSyncService(header, raftId).getAllMeasurementSchema(header, planBinary);
+    return getDataSyncService(header, raftId).getAllMeasurementSchema(header, raftId, planBinary);
   }
 
   @Override
@@ -760,7 +758,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   public List<String> getUnregisteredTimeseries(Node header, int raftId,
       List<String> timeseriesList)
       throws TException {
-    return getDataSyncService(header, raftId).getUnregisteredTimeseries(header, timeseriesList);
+    return getDataSyncService(header, raftId).getUnregisteredTimeseries(header, raftId, timeseriesList);
   }
 
   @Override
@@ -776,20 +774,17 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public List<ByteBuffer> getGroupByResult(Node header, int raftId, long executorId, long startTime,
       long endTime) throws TException {
-    return getDataSyncService(header, raftId)
-        .getGroupByResult(header, executorId, startTime, endTime);
+    return getDataSyncService(header, raftId).getGroupByResult(header, raftId, executorId, startTime, endTime);
   }
 
   @Override
   public PullSchemaResp pullTimeSeriesSchema(PullSchemaRequest request) throws TException {
-    return getDataSyncService(request.getHeader(), request.getRaftId())
-        .pullTimeSeriesSchema(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).pullTimeSeriesSchema(request);
   }
 
   @Override
   public PullSchemaResp pullMeasurementSchema(PullSchemaRequest request) throws TException {
-    return getDataSyncService(request.getHeader(), request.getRaftId())
-        .pullMeasurementSchema(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).pullMeasurementSchema(request);
   }
 
   @Override
@@ -805,12 +800,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public int getPathCount(Node header, int raftId, List<String> pathsToQuery, int level)
       throws TException {
-    return getDataSyncService(header, raftId).getPathCount(header, pathsToQuery, level);
+    return getDataSyncService(header, raftId).getPathCount(header, raftId, pathsToQuery, level);
   }
 
   @Override
   public boolean onSnapshotApplied(Node header, int raftId, List<Integer> slots) {
-    return getDataSyncService(header, raftId).onSnapshotApplied(header, slots);
+    return getDataSyncService(header, raftId).onSnapshotApplied(header, raftId, slots);
   }
 
   @Override
@@ -840,51 +835,48 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   @Override
   public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
-    return getDataSyncService(request.getHeader(), request.getRaftId())
-        .executeNonQueryPlan(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId()).executeNonQueryPlan(request);
   }
 
   @Override
   public long requestCommitIndex(Node header, int raftId) throws TException {
-    return getDataSyncService(header, raftId).requestCommitIndex(header);
+    return getDataSyncService(header, raftId).requestCommitIndex(header, raftId);
   }
 
   @Override
   public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
-    return getDataSyncService(thisNode, 0).readFile(filePath, offset, length);
+    return getDataSyncService(new RaftNode(thisNode, 0)).readFile(filePath, offset, length);
   }
 
   @Override
   public boolean matchTerm(long index, long term, Node header, int raftId) {
-    return getDataSyncService(header, raftId).matchTerm(index, term, header);
+    return getDataSyncService(header, raftId).matchTerm(index, term, header, raftId);
   }
 
   @Override
   public ByteBuffer peekNextNotNullValue(Node header, int raftId, long executorId, long startTime,
       long endTime)
       throws TException {
-    return getDataSyncService(header, raftId)
-        .peekNextNotNullValue(header, executorId, startTime, endTime);
+    return getDataSyncService(header, raftId).peekNextNotNullValue(header, raftId, executorId, startTime, endTime);
   }
 
   @Override
   public void peekNextNotNullValue(Node header, int raftId, long executorId, long startTime,
       long endTime,
       AsyncMethodCallback<ByteBuffer> resultHandler) throws TException {
-    resultHandler.onComplete(
-        getDataSyncService(header, raftId)
-            .peekNextNotNullValue(header, executorId, startTime, endTime));
+    resultHandler.onComplete(getDataSyncService(header, raftId)
+            .peekNextNotNullValue(header, raftId, executorId, startTime, endTime));
   }
 
   @Override
   public void removeHardLink(String hardLinkPath) throws TException {
-    getDataSyncService(thisNode, 0).removeHardLink(hardLinkPath);
+    getDataSyncService(new RaftNode(thisNode, 0)).removeHardLink(hardLinkPath);
   }
 
   @Override
   public void removeHardLink(String hardLinkPath,
       AsyncMethodCallback<Void> resultHandler) {
-    getDataAsyncService(thisNode, 0, resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
+    getDataAsyncService(new RaftNode(thisNode, 0), resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
         resultHandler);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index a0fb04d..f8830c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -212,8 +212,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
-    asyncService.requestCommitIndex(header, resultHandler);
+  public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) {
+    asyncService.requestCommitIndex(header, raftId, resultHandler);
   }
 
   @Override
@@ -253,9 +253,9 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void matchTerm(long index, long term, Node header,
+  public void matchTerm(long index, long term, Node header, int raftId,
       AsyncMethodCallback<Boolean> resultHandler) {
-    asyncService.matchTerm(index, term, header, resultHandler);
+    asyncService.matchTerm(index, term, header, raftId, resultHandler);
   }
 
   @Override
@@ -319,8 +319,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public long requestCommitIndex(Node header) throws TException {
-    return syncService.requestCommitIndex(header);
+  public long requestCommitIndex(Node header, int raftId) throws TException {
+    return syncService.requestCommitIndex(header, raftId);
   }
 
   @Override
@@ -329,8 +329,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public boolean matchTerm(long index, long term, Node header) {
-    return syncService.matchTerm(index, term, header);
+  public boolean matchTerm(long index, long term, Node header, int raftId) {
+    return syncService.matchTerm(index, term, header, raftId);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
index b2bebbf..d60ba42 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.server;
 
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -31,6 +30,7 @@ import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTaskDescriptor;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.thrift.TException;
@@ -72,8 +72,7 @@ public class PullSnapshotHintService {
 
   public void registerHint(PullSnapshotTaskDescriptor descriptor) {
     PullSnapshotHint hint = new PullSnapshotHint();
-    hint.receivers = new ArrayList<>(descriptor.getPreviousHolders());
-    hint.header = descriptor.getPreviousHolders().getHeader();
+    hint.receivers = new PartitionGroup(descriptor.getPreviousHolders());
     hint.slots = descriptor.getSlots();
     hints.add(hint);
   }
@@ -116,7 +115,7 @@ public class PullSnapshotHintService {
   private boolean sendHintsAsync(Node receiver, PullSnapshotHint hint)
       throws TException, InterruptedException {
     AsyncDataClient asyncDataClient = (AsyncDataClient) member.getAsyncClient(receiver);
-    return SyncClientAdaptor.onSnapshotApplied(asyncDataClient, hint.header, hint.slots);
+    return SyncClientAdaptor.onSnapshotApplied(asyncDataClient, hint.getHeader(), hint.getRaftId(), hint.slots);
   }
 
   private boolean sendHintSync(Node receiver, PullSnapshotHint hint) throws TException {
@@ -124,7 +123,7 @@ public class PullSnapshotHintService {
     if (syncDataClient == null) {
       return false;
     }
-    return syncDataClient.onSnapshotApplied(hint.header, hint.slots);
+    return syncDataClient.onSnapshotApplied(hint.getHeader(), hint.getRaftId(), hint.slots);
   }
 
   private static class PullSnapshotHint {
@@ -132,10 +131,16 @@ public class PullSnapshotHintService {
     /**
      * Nodes to send this hint;
      */
-    private List<Node> receivers;
-
-    private Node header;
+    private PartitionGroup receivers;
 
     private List<Integer> slots;
+
+    public Node getHeader() {
+      return receivers.getHeader();
+    }
+
+    public int getRaftId() {
+      return receivers.getId();
+    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
index b203aaf..d941f84 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
@@ -54,7 +55,7 @@ public class StoppedMemberManager {
   private static final String REMOVED = "0";
   private static final String RESUMED = "1";
 
-  private Map<Pair<Node, Integer>, DataGroupMember> removedMemberMap = new HashMap<>();
+  private Map<RaftNode, DataGroupMember> removedMemberMap = new HashMap<>();
   private DataGroupMember.Factory memberFactory;
   private Node thisNode;
 
@@ -69,11 +70,11 @@ public class StoppedMemberManager {
    * When a DataGroupMember is removed, add it here and record this removal, so in next start-up we
    * can recover it as a data source for data transfers.
    *
-   * @param pair
+   * @param raftNode
    * @param dataGroupMember
    */
-  public synchronized void put(Pair<Node, Integer> pair, DataGroupMember dataGroupMember) {
-    removedMemberMap.put(pair, dataGroupMember);
+  public synchronized void put(RaftNode raftNode, DataGroupMember dataGroupMember) {
+    removedMemberMap.put(raftNode, dataGroupMember);
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(stoppedMembersFileName, true))) {
       StringBuilder builder = new StringBuilder(REMOVED);
       for (Node node : dataGroupMember.getAllNodes()) {
@@ -82,7 +83,7 @@ public class StoppedMemberManager {
       writer.write(builder.toString());
       writer.newLine();
     } catch (IOException e) {
-      logger.error("Cannot record removed member of header {}", pair, e);
+      logger.error("Cannot record removed member of header {}", raftNode, e);
     }
   }
 
@@ -90,20 +91,20 @@ public class StoppedMemberManager {
    * When a DataGroupMember is resumed, add it here and record this removal, so in next start-up we
    * will not recover it here.
    *
-   * @param pair
+   * @param raftNode
    */
-  public synchronized void remove(Pair<Node, Integer> pair) {
-    removedMemberMap.remove(pair);
+  public synchronized void remove(RaftNode raftNode) {
+    removedMemberMap.remove(raftNode);
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(stoppedMembersFileName, true))) {
-      writer.write(RESUMED + ";" + pair.toString());
+      writer.write(RESUMED + ";" + raftNode.toString());
       writer.newLine();
     } catch (IOException e) {
-      logger.error("Cannot record resumed member of header {}", pair, e);
+      logger.error("Cannot record resumed member of header {}", raftNode, e);
     }
   }
 
-  public synchronized DataGroupMember get(Pair<Node, Integer> pair) {
-    return removedMemberMap.get(pair);
+  public synchronized DataGroupMember get(RaftNode raftNode) {
+    return removedMemberMap.get(raftNode);
   }
 
   private void recover() {
@@ -147,7 +148,7 @@ public class StoppedMemberManager {
     DataGroupMember member = memberFactory.create(partitionGroup, thisNode);
     member.setReadOnly();
     //TODO CORRECT
-    removedMemberMap.put(new Pair(partitionGroup.getHeader(), 0), member);
+    removedMemberMap.put(new RaftNode(partitionGroup.getHeader(), 0), member);
   }
 
   private void parseResumed(String[] split) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 62f2b71..8a43818 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -75,6 +75,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
@@ -237,10 +238,6 @@ public class DataGroupMember extends RaftMember {
     return allNodes.get(0);
   }
 
-  public Integer getRaftGroupId() {
-    return allNodes.getId();
-  }
-
   public ClusterQueryManager getQueryManager() {
     return queryManager;
   }
@@ -470,29 +467,28 @@ public class DataGroupMember extends RaftMember {
     synchronized (logManager) {
       logger.info("{} pulling {} slots from remote", name, slots.size());
       PartitionedSnapshot<Snapshot> snapshot = (PartitionedSnapshot) logManager.getSnapshot();
-      Map<Integer, Node> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
-          .getPreviousNodeMap(newNode);
+      Map<Integer, RaftNode> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
+          .getPreviousNodeMap(new RaftNode(newNode, getRaftGroupId()));
 
       // group the slots by their owners
-      Map<Node, List<Integer>> holderSlotsMap = new HashMap<>();
+      Map<RaftNode, List<Integer>> holderSlotsMap = new HashMap<>();
       for (int slot : slots) {
         // skip the slot if the corresponding data is already replicated locally
         if (snapshot.getSnapshot(slot) == null) {
-          Node node = prevHolders.get(slot);
-          if (node != null) {
-            holderSlotsMap.computeIfAbsent(node, n -> new ArrayList<>()).add(slot);
+          RaftNode raftNode = prevHolders.get(slot);
+          if (raftNode != null) {
+            holderSlotsMap.computeIfAbsent(raftNode, n -> new ArrayList<>()).add(slot);
           }
         }
       }
 
       // pull snapshots from each owner's data group
-      for (Entry<Node, List<Integer>> entry : holderSlotsMap.entrySet()) {
-        Node node = entry.getKey();
+      for (Entry<RaftNode, List<Integer>> entry : holderSlotsMap.entrySet()) {
+        RaftNode raftNode = entry.getKey();
         List<Integer> nodeSlots = entry.getValue();
         PullSnapshotTaskDescriptor taskDescriptor =
             new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable()
-                .getHeaderGroup(new Pair<>(node, getRaftGroupId())),
-                nodeSlots, false);
+                .getHeaderGroup(raftNode), nodeSlots, false);
         pullFileSnapshot(taskDescriptor, null);
       }
     }
@@ -626,9 +622,9 @@ public class DataGroupMember extends RaftMember {
       List<Pair<Long, Boolean>> tmpPairList = entry.getValue();
       for (Pair<Long, Boolean> pair : tmpPairList) {
         long partitionId = pair.left;
-        Pair<Node, Integer> pair = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
+        RaftNode raftNode = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
             partitionId * StorageEngine.getTimePartitionInterval());
-        DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(pair);
+        DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(raftNode);
         if (localDataMember.getHeader().equals(this.getHeader())) {
           localListPair.add(new Pair<>(partitionId, pair.right));
         }
@@ -741,7 +737,7 @@ public class DataGroupMember extends RaftMember {
     synchronized (allNodes) {
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
-        allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+        allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
         initPeerMap();
         if (removedNode.equals(leader.get())) {
           // if the leader is removed, also start an election immediately
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 9f2fbb6..dcc4105 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -95,6 +95,7 @@ import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
@@ -147,7 +148,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransportException;
@@ -305,12 +305,11 @@ public class MetaGroupMember extends RaftMember {
    * close the partition through that member. Notice: only partitions owned by this node can be
    * closed by the method.
    *
-   * @return true if the member is a leader and the partition is closed, false otherwise
    */
   public void closePartition(String storageGroupName, long partitionId, boolean isSeq) {
-    Pair<Node, Integer> pair = partitionTable.routeToHeaderByTime(storageGroupName,
+    RaftNode raftNode = partitionTable.routeToHeaderByTime(storageGroupName,
         partitionId * StorageEngine.getTimePartitionInterval());
-    DataGroupMember localDataMember = getLocalDataMember(pair);
+    DataGroupMember localDataMember = getLocalDataMember(raftNode);
     if (localDataMember == null || localDataMember.getCharacter() != NodeCharacter.LEADER) {
       return;
     }
@@ -485,7 +484,6 @@ public class MetaGroupMember extends RaftMember {
    * This node is not a seed node and wants to join an established cluster. Pick up a node randomly
    * from the seed nodes and send a join request to it.
    *
-   * @return true if the node has successfully joined the cluster, false otherwise.
    */
   public void joinCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
     if (allNodes.size() == 1) {
@@ -1662,7 +1660,7 @@ public class MetaGroupMember extends RaftMember {
           .getOperationStartTime();
       logger.debug("Execute {} in a local group of {}", entry.getKey(),
           entry.getValue().getHeader());
-      result = getLocalDataMember(entry.getValue().getHeader())
+      result = getLocalDataMember(entry.getValue().getHeader(), entry.getValue().getId())
           .executeNonQueryPlan(entry.getKey());
       Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
           .calOperationCostTimeFromStart(startTime);
@@ -1839,7 +1837,7 @@ public class MetaGroupMember extends RaftMember {
       try {
         PartialPath storageGroupName = IoTDB.metaManager
             .getStorageGroupPath(path);
-        Set<Node> groupHeaders = new HashSet<>();
+        Set<RaftNode> groupHeaders = new HashSet<>();
         for (int i = 0; i < intervals.getIntervalSize(); i++) {
           // compute the headers of groups involved in every interval
           PartitionUtils
@@ -1847,7 +1845,7 @@ public class MetaGroupMember extends RaftMember {
                   intervals.getUpperBound(i), partitionTable, groupHeaders);
         }
         // translate the headers to groups
-        for (Node groupHeader : groupHeaders) {
+        for (RaftNode groupHeader : groupHeaders) {
           partitionGroups.add(partitionTable.getHeaderGroup(groupHeader));
         }
       } catch (MetadataException e) {
@@ -2130,17 +2128,26 @@ public class MetaGroupMember extends RaftMember {
    * @param request the toString() of this parameter should explain what the request is and it is
    *                only used in logs for tracing
    */
-  public DataGroupMember getLocalDataMember(Node header, Object request) {
-    return dataClusterServer.getDataMember(header, null, request);
+  public DataGroupMember getLocalDataMember(Node header, int raftId, Object request) {
+    return dataClusterServer.getDataMember(new RaftNode(header, raftId), null, request);
   }
 
   /**
    * Get a local DataGroupMember that is in the group of "header" for an internal request.
    *
-   * @param pair the header of the group which the local node is in
+   * @param node the header of the group which the local node is in
    */
-  public DataGroupMember getLocalDataMember(Pair<Node, Integer> pair) {
-    return dataClusterServer.getDataMember(pair, null, "Internal call");
+  public DataGroupMember getLocalDataMember(Node node, int raftId) {
+    return dataClusterServer.getDataMember(new RaftNode(node, raftId), null, "Internal call");
+  }
+
+  /**
+   * Get a local DataGroupMember that is in the group of "header" for an internal request.
+   *
+   * @param raftNode the header of the group which the local node is in
+   */
+  public DataGroupMember getLocalDataMember(RaftNode raftNode) {
+    return dataClusterServer.getDataMember(raftNode, null, "Internal call");
   }
 
   public DataClientProvider getClientProvider() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index f396e9c..aa9f8c8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -143,7 +143,9 @@ public abstract class RaftMember {
    * the lock is to make sure that only one thread can apply snapshot at the same time
    */
   private final Object snapshotApplyLock = new Object();
+
   protected Node thisNode = ClusterConstant.EMPTY_NODE;
+
   /**
    * the nodes that belong to the same raft group as thisNode.
    */
@@ -702,7 +704,7 @@ public abstract class RaftMember {
     }
     logger.info("{}: Start to make {} catch up", name, follower);
     if (!catchUpService.isShutdown()) {
-      Future<?> future = catchUpService.submit(new CatchUpTask(follower, peerMap.get(follower),
+      Future<?> future = catchUpService.submit(new CatchUpTask(follower, getRaftGroupId(), peerMap.get(follower),
           this, lastLogIdx));
       catchUpService.submit(() -> {
         try {
@@ -1007,7 +1009,7 @@ public abstract class RaftMember {
       return commitIdResult.get();
     }
     synchronized (commitIdResult) {
-      client.requestCommitIndex(getHeader(), get,new GenericHandler<>(leader.get(), commitIdResult));
+      client.requestCommitIndex(getHeader(), getRaftGroupId(), new GenericHandler<>(leader.get(), commitIdResult));
       commitIdResult.wait(RaftServer.getSyncLeaderMaxWaitMs());
     }
     return commitIdResult.get();
@@ -1023,7 +1025,7 @@ public abstract class RaftMember {
     }
     long commitIndex;
     try {
-      commitIndex = client.requestCommitIndex(getHeader());
+      commitIndex = client.requestCommitIndex(getHeader(), getRaftGroupId());
     } catch (TException e) {
       client.getInputProtocol().getTransport().close();
       throw e;
@@ -1864,7 +1866,12 @@ public abstract class RaftMember {
     return Response.RESPONSE_AGREE;
   }
 
+  public int getRaftGroupId() {
+    return allNodes.getId();
+  }
+
   enum AppendLogResult {
     OK, TIME_OUT, LEADERSHIP_STALE
   }
+
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 521050e..a059846 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.server.NodeCharacter;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index d974bf3..35af6e5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
@@ -280,8 +281,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public List<String> getUnregisteredTimeseries(Node header, int raftId,
-      List<String> timeseriesList)
+  public List<String> getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList)
       throws TException {
     try {
       return dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 6f5836e..0abf7d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -389,7 +390,7 @@ public class PartitionUtils {
    */
   public static void getIntervalHeaders(String storageGroupName, long timeLowerBound,
       long timeUpperBound,
-      PartitionTable partitionTable, Set<Node> result) {
+      PartitionTable partitionTable, Set<RaftNode> result) {
     long partitionInterval = StorageEngine.getTimePartitionInterval();
     long currPartitionStart = timeLowerBound / partitionInterval * partitionInterval;
     while (currPartitionStart <= timeUpperBound) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 2892b55..a0e5f56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -23,10 +23,12 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.collections4.map.MultiKeyMap;
 import org.apache.iotdb.cluster.ClusterMain;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.MetaClusterServer;
 import org.apache.iotdb.cluster.server.Timer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -104,10 +106,10 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
       return null;
     }
     List<PartitionGroup> localGroups = partitionTable.getLocalGroups();
-    Map<Node, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
+    Map<RaftNode, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
     Map<PartitionGroup, Integer> raftGroupMapSlotNum = new HashMap<>();
     for (PartitionGroup group : localGroups) {
-      raftGroupMapSlotNum.put(group, nodeSlotMap.get(group.getHeader()).size());
+      raftGroupMapSlotNum.put(group, nodeSlotMap.get(new RaftNode(group.getHeader(), group.getId())).size());
     }
     return raftGroupMapSlotNum;
   }
@@ -119,11 +121,14 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
       return null;
     }
     List<Node> allNodes = partitionTable.getAllNodes();
-    Map<Node, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
+    Map<RaftNode, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
     Map<PartitionGroup, Integer> raftGroupMapSlotNum = new HashMap<>();
     for (Node header : allNodes) {
-      raftGroupMapSlotNum
-          .put(partitionTable.getHeaderGroup(header), nodeSlotMap.get(header).size());
+      for(int raftId = 0; raftId < ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor(); raftId++) {
+        RaftNode raftNode = new RaftNode(header, raftId);
+        raftGroupMapSlotNum
+            .put(partitionTable.getHeaderGroup(raftNode), nodeSlotMap.get(raftNode).size());
+      }
     }
     return raftGroupMapSlotNum;
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
index 7030fc1..a29f1ca 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
@@ -37,7 +37,7 @@ public class AsyncDataClientTest {
 
     assertEquals(TestUtils.getNode(0), client.getNode());
 
-    client.matchTerm(0, 0, TestUtils.getNode(0), new AsyncMethodCallback<Boolean>() {
+    client.matchTerm(0, 0, TestUtils.getNode(0), 0, new AsyncMethodCallback<Boolean>() {
       @Override
       public void onComplete(Boolean aBoolean) {
         // do nothing
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
index ffca8c5..698357a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
@@ -37,7 +37,7 @@ public class AsyncMetaClientTest {
 
     assertEquals(TestUtils.getNode(0), client.getNode());
 
-    client.matchTerm(0, 0, TestUtils.getNode(0), new AsyncMethodCallback<Boolean>() {
+    client.matchTerm(0, 0, TestUtils.getNode(0), 0, new AsyncMethodCallback<Boolean>() {
       @Override
       public void onComplete(Boolean aBoolean) {
         // do nothing
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 8bd2812..36b17b1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -120,7 +120,7 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void matchTerm(long index, long term, Node header,
+      public void matchTerm(long index, long term, Node header, int raftId,
           AsyncMethodCallback<Boolean> resultHandler) {
         resultHandler.onComplete(true);
       }
@@ -163,19 +163,19 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void getNodeList(Node header, String path, int nodeLevel,
+      public void getNodeList(Node header, int raftId, String path, int nodeLevel,
           AsyncMethodCallback<List<String>> resultHandler) {
         resultHandler.onComplete(Arrays.asList("1", "2", "3"));
       }
 
       @Override
-      public void getChildNodePathInNextLevel(Node header, String path,
+      public void getChildNodePathInNextLevel(Node header, int raftId, String path,
           AsyncMethodCallback<Set<String>> resultHandler) {
         resultHandler.onComplete(new HashSet<>(Arrays.asList("1", "2", "3")));
       }
 
       @Override
-      public void getAllMeasurementSchema(Node header, ByteBuffer planBinary,
+      public void getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary,
           AsyncMethodCallback<ByteBuffer> resultHandler) {
         resultHandler.onComplete(getAllMeasurementSchemaResult);
       }
@@ -211,25 +211,25 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void getUnregisteredTimeseries(Node header, List<String> timeseriesList,
+      public void getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList,
           AsyncMethodCallback<List<String>> resultHandler) {
         resultHandler.onComplete(timeseriesList.subList(0, timeseriesList.size() / 2));
       }
 
       @Override
-      public void getAllPaths(Node header, List<String> path, boolean withAlias,
+      public void getAllPaths(Node header, int raftId, List<String> path, boolean withAlias,
           AsyncMethodCallback<GetAllPathsResult> resultHandler) {
         resultHandler.onComplete(new GetAllPathsResult(path));
       }
 
       @Override
-      public void getPathCount(Node header, List<String> pathsToQuery, int level,
+      public void getPathCount(Node header, int raftId, List<String> pathsToQuery, int level,
           AsyncMethodCallback<Integer> resultHandler) {
         resultHandler.onComplete(pathsToQuery.size());
       }
 
       @Override
-      public void getAllDevices(Node header, List<String> path,
+      public void getAllDevices(Node header, int raftId, List<String> path,
           AsyncMethodCallback<Set<String>> resultHandler) {
         resultHandler.onComplete(new HashSet<>(path));
       }
@@ -253,13 +253,13 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void getGroupByResult(Node header, long executorId, long startTime, long endTime,
+      public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
           AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
         resultHandler.onComplete(aggregateResults);
       }
 
       @Override
-      public void peekNextNotNullValue(Node header, long executorId, long startTime, long endTime,
+      public void peekNextNotNullValue(Node header, int raftId, long executorId, long startTime, long endTime,
           AsyncMethodCallback<ByteBuffer> resultHandler) {
         resultHandler.onComplete(peekNextNotNullValueResult);
       }
@@ -283,7 +283,7 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void onSnapshotApplied(Node header, List<Integer> slots,
+      public void onSnapshotApplied(Node header, int raftId, List<Integer> slots,
           AsyncMethodCallback<Boolean> resultHandler) {
         resultHandler.onComplete(true);
       }
@@ -296,7 +296,7 @@ public class SyncClientAdaptorTest {
     assertEquals(Response.RESPONSE_AGREE, (long) SyncClientAdaptor.removeNode(metaClient,
         TestUtils.getNode(0)));
     assertTrue(SyncClientAdaptor.matchTerm(metaClient, TestUtils.getNode(0), 1, 1,
-        TestUtils.getNode(0)));
+        TestUtils.getNode(0), 0));
     assertEquals(nodeStatus, SyncClientAdaptor.queryNodeStatus(metaClient));
     assertEquals(checkStatusResponse,
         SyncClientAdaptor.checkStatus(metaClient, new StartUpStatus()));
@@ -315,11 +315,11 @@ public class SyncClientAdaptorTest {
     assertEquals(1L, (long) SyncClientAdaptor.querySingleSeries(dataClient,
         new SingleSeriesQueryRequest(), 0));
     assertEquals(Arrays.asList("1", "2", "3"), SyncClientAdaptor.getNodeList(dataClient,
-        TestUtils.getNode(0), "root", 0));
+        TestUtils.getNode(0), 0, "root", 0));
     assertEquals(new HashSet<>(Arrays.asList("1", "2", "3")),
-        SyncClientAdaptor.getNextChildren(dataClient, TestUtils.getNode(0), "root"));
+        SyncClientAdaptor.getNextChildren(dataClient, TestUtils.getNode(0), 0, "root"));
     assertEquals(getAllMeasurementSchemaResult,
-        SyncClientAdaptor.getAllMeasurementSchema(dataClient, TestUtils.getNode(0),
+        SyncClientAdaptor.getAllMeasurementSchema(dataClient, TestUtils.getNode(0), 0,
             new ShowTimeSeriesPlan(new PartialPath("root"))));
     assertEquals(measurementSchemas, SyncClientAdaptor.pullMeasurementSchema(dataClient,
         new PullSchemaRequest()));
@@ -328,21 +328,21 @@ public class SyncClientAdaptorTest {
     assertEquals(aggregateResults, SyncClientAdaptor.getAggrResult(dataClient
         , new GetAggrResultRequest()));
     assertEquals(paths.subList(0, paths.size() / 2),
-        SyncClientAdaptor.getUnregisteredMeasurements(dataClient, TestUtils.getNode(0), paths));
-    assertEquals(paths, SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getNode(0), paths,
+        SyncClientAdaptor.getUnregisteredMeasurements(dataClient, TestUtils.getNode(0), 0, paths));
+    assertEquals(paths, SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getNode(0), 0, paths,
         false).paths);
     assertEquals(paths.size(), (int) SyncClientAdaptor.getPathCount(dataClient,
-        TestUtils.getNode(0),
+        TestUtils.getNode(0), 0,
         paths, 0));
     assertEquals(new HashSet<>(paths), SyncClientAdaptor.getAllDevices(dataClient,
-        TestUtils.getNode(0), paths));
+        TestUtils.getNode(0), 0, paths));
     assertEquals(1L, (long) SyncClientAdaptor.getGroupByExecutor(dataClient, new GroupByRequest()));
     assertEquals(fillResult, SyncClientAdaptor.previousFill(dataClient, new PreviousFillRequest()));
     assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000));
     assertEquals(aggregateResults, SyncClientAdaptor.getGroupByResult(dataClient,
-        TestUtils.getNode(0), 1, 1, 2));
+        TestUtils.getNode(0), 0, 1, 1, 2));
     assertEquals(peekNextNotNullValueResult, SyncClientAdaptor.peekNextNotNullValue(dataClient,
-        TestUtils.getNode(0), 1, 1, 1));
+        TestUtils.getNode(0), 0, 1, 1, 1));
     assertEquals(snapshotMap, SyncClientAdaptor.pullSnapshot(dataClient,
         new PullSnapshotRequest(), Arrays.asList(0, 1, 2),
         new SnapshotFactory<Snapshot>() {
@@ -359,8 +359,8 @@ public class SyncClientAdaptorTest {
     assertEquals(lastResult, SyncClientAdaptor.last(dataClient,
         Collections.singletonList(new PartialPath("1")),
         Collections.singletonList(TSDataType.INT64.ordinal()),
-        new QueryContext(), Collections.emptyMap(), TestUtils.getNode(0)));
-    assertTrue(SyncClientAdaptor.onSnapshotApplied(dataClient, TestUtils.getNode(0),
+        new QueryContext(), Collections.emptyMap(), TestUtils.getNode(0), 0));
+    assertTrue(SyncClientAdaptor.onSnapshotApplied(dataClient, TestUtils.getNode(0), 0,
         Arrays.asList(0, 1, 2)));
   }
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 118f7e5..63ce1b7 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -68,9 +68,9 @@ public class TestAsyncDataClient extends AsyncDataClient {
   }
 
   @Override
-  public void fetchSingleSeries(Node header, long readerId,
+  public void fetchSingleSeries(Node header, int raftId, long readerId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeries(header, readerId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeries(header, raftId, readerId,
         resultHandler)).start();
   }
 
@@ -89,16 +89,16 @@ public class TestAsyncDataClient extends AsyncDataClient {
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(Node header, long readerId, long time,
+  public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeriesByTimestamp(header,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeriesByTimestamp(header, raftId,
         readerId, time, resultHandler)).start();
   }
 
   @Override
-  public void getAllPaths(Node header, List<String> paths, boolean withAlias,
+  public void getAllPaths(Node header, int raftId, List<String> paths, boolean withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getAllPaths(header,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getAllPaths(header, raftId,
         paths, withAlias, resultHandler)).start();
   }
 
@@ -176,9 +176,9 @@ public class TestAsyncDataClient extends AsyncDataClient {
   }
 
   @Override
-  public void getGroupByResult(Node header, long executorId, long startTime, long endTime,
+  public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getGroupByResult(header, executorId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getGroupByResult(header, raftId, executorId,
         startTime, endTime, resultHandler)).start();
   }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
index b945af2..6f182d2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.common;
 import java.util.Collections;
 import java.util.List;
 import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.slot.SlotManager;
 import org.apache.iotdb.cluster.query.manage.ClusterQueryManager;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -33,10 +34,10 @@ public class TestDataGroupMember extends DataGroupMember {
     super();
     setQueryManager(new ClusterQueryManager());
     this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null);
-    this.allNodes = Collections.singletonList(TestUtils.getNode(0));
+    this.allNodes = new PartitionGroup(Collections.singletonList(TestUtils.getNode(0)));
   }
 
-  public TestDataGroupMember(Node thisNode, List<Node> allNodes) {
+  public TestDataGroupMember(Node thisNode, PartitionGroup allNodes) {
     super();
     this.thisNode = thisNode;
     this.allNodes = allNodes;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
index ce8ebd6..2a4e67a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
@@ -21,13 +21,14 @@ package org.apache.iotdb.cluster.common;
 
 import java.util.ArrayList;
 import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 
 public class TestMetaGroupMember extends MetaGroupMember {
 
   public TestMetaGroupMember() {
     super();
-    allNodes = new ArrayList<>();
+    allNodes = new PartitionGroup();
     thisNode = TestUtils.getNode(0);
     for (int i = 0; i < 10; i++) {
       allNodes.add(TestUtils.getNode(i));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
index a1c711a..2a13a79 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -118,7 +119,7 @@ public class LogDispatcherTest {
         };
       }
     };
-    List<Node> allNodes = new ArrayList<>();
+    PartitionGroup allNodes = new PartitionGroup();
     for (int i = 0; i < 10; i++) {
       allNodes.add(TestUtils.getNode(i));
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index df3cabc..0ca2a3f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -92,7 +92,7 @@ public class DataLogApplierTest extends IoTDBTest {
     }
 
     @Override
-    public DataGroupMember getLocalDataMember(Node header, Object request) {
+    public DataGroupMember getLocalDataMember(Node header, int raftId, Object request) {
       return testDataGroupMember;
     }
 
@@ -146,9 +146,9 @@ public class DataLogApplierTest extends IoTDBTest {
       public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
         return new AsyncDataClient(null, null, node, null) {
           @Override
-          public void getAllPaths(Node header, List<String> path, boolean withAlias,
+          public void getAllPaths(Node header, int raftId, List<String> path, boolean withAlias,
               AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-            new Thread(() -> new DataAsyncService(testDataGroupMember).getAllPaths(header, path,
+            new Thread(() -> new DataAsyncService(testDataGroupMember).getAllPaths(header, raftId, path,
                 withAlias, resultHandler)).start();
           }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index c431064..51f0aa6 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -84,7 +84,7 @@ public class CatchUpTaskTest {
         }
 
         @Override
-        public boolean matchTerm(long index, long term, Node header) {
+        public boolean matchTerm(long index, long term, Node header, int raftId) {
           return dummyMatchTerm(index, term);
         }
 
@@ -111,7 +111,7 @@ public class CatchUpTaskTest {
         }
 
         @Override
-        public void matchTerm(long index, long term, Node header,
+        public void matchTerm(long index, long term, Node header, int raftId,
             AsyncMethodCallback<Boolean> resultHandler) {
           new Thread(() -> resultHandler.onComplete(dummyMatchTerm(index, term))).start();
         }
@@ -217,7 +217,7 @@ public class CatchUpTaskTest {
     sender.setCharacter(NodeCharacter.LEADER);
     Peer peer = new Peer(10);
     peer.setMatchIndex(9);
-    CatchUpTask task = new CatchUpTask(receiver, peer, sender, 9);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 9);
     task.run();
 
     assertTrue(receivedLogs.isEmpty());
@@ -242,7 +242,7 @@ public class CatchUpTaskTest {
     sender.setCharacter(NodeCharacter.LEADER);
     Peer peer = new Peer(10);
     peer.setMatchIndex(0);
-    CatchUpTask task = new CatchUpTask(receiver, peer, sender, 5);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 5);
     task.run();
 
     assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -273,7 +273,7 @@ public class CatchUpTaskTest {
       sender.setCharacter(NodeCharacter.LEADER);
       Peer peer = new Peer(10);
       peer.setMatchIndex(0);
-      CatchUpTask task = new CatchUpTask(receiver, peer, sender, 5);
+      CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 5);
       task.run();
 
       assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -299,7 +299,7 @@ public class CatchUpTaskTest {
     sender.setCharacter(NodeCharacter.LEADER);
     Peer peer = new Peer(10);
     peer.setNextIndex(0);
-    CatchUpTask task = new CatchUpTask(receiver, peer, sender, 0);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
     ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
     task.run();
 
@@ -323,7 +323,7 @@ public class CatchUpTaskTest {
     sender.setCharacter(NodeCharacter.LEADER);
     Peer peer = new Peer(10);
     peer.setNextIndex(0);
-    CatchUpTask task = new CatchUpTask(receiver, peer, sender, 0);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
     task.run();
 
     assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -354,7 +354,7 @@ public class CatchUpTaskTest {
     peer.setMatchIndex(0);
     peer.setNextIndex(0);
 
-    CatchUpTask task = new CatchUpTask(receiver, peer, sender, 0);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
     task.setLogs(logList);
     try {
       // 1. case 1: the matched index is in the middle of the logs interval
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
index 8ed04b6..35852f4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
@@ -179,7 +179,7 @@ public class LogCatchUpTaskTest {
     List<Log> logList = TestUtils.prepareTestLogs(logSize);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, useBatch);
+    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, useBatch);
     task.call();
 
     assertEquals(logList, receivedLogs);
@@ -194,7 +194,7 @@ public class LogCatchUpTaskTest {
       List<Log> logList = TestUtils.prepareTestLogs(10);
       Node receiver = new Node();
       sender.setCharacter(NodeCharacter.LEADER);
-      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, false);
+      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, false);
       task.call();
 
       assertEquals(logList, receivedLogs);
@@ -210,7 +210,7 @@ public class LogCatchUpTaskTest {
     List<Log> logList = TestUtils.prepareTestLogs(10);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, false);
+    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, false);
     task.setUseBatch(false);
     try {
       task.call();
@@ -242,7 +242,7 @@ public class LogCatchUpTaskTest {
     List<Log> logList = TestUtils.prepareTestLogs(1030);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, true);
+    LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, true);
     task.call();
 
     assertEquals(logList.subList(0, 1024), receivedLogs);
@@ -259,7 +259,7 @@ public class LogCatchUpTaskTest {
           + IoTDBConstant.LEFT_SIZE_IN_REQUEST);
       Node receiver = new Node();
       sender.setCharacter(NodeCharacter.LEADER);
-      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, true);
+      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, true);
       task.call();
 
       assertEquals(logList, receivedLogs);
@@ -278,7 +278,7 @@ public class LogCatchUpTaskTest {
       IoTDBDescriptor.getInstance().getConfig().setThriftMaxFrameSize(0);
       Node receiver = new Node();
       sender.setCharacter(NodeCharacter.LEADER);
-      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, true);
+      LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, true);
       task.call();
 
       assertTrue(receivedLogs.isEmpty());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
index c1a2e6e..aea8c33 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
@@ -152,7 +152,7 @@ public class SnapshotCatchUpTaskTest {
     Snapshot snapshot = new TestSnapshot(9989);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+    SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
     task.call();
 
     assertEquals(logList, receivedLogs);
@@ -172,7 +172,7 @@ public class SnapshotCatchUpTaskTest {
     Snapshot snapshot = new TestSnapshot(9989);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+    SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
     task.call();
 
     assertTrue(receivedLogs.isEmpty());
@@ -195,7 +195,7 @@ public class SnapshotCatchUpTaskTest {
       Snapshot snapshot = new TestSnapshot(9989);
       Node receiver = new Node();
       sender.setCharacter(NodeCharacter.LEADER);
-      SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+      SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
       task.call();
 
       assertEquals(logList, receivedLogs);
@@ -213,7 +213,7 @@ public class SnapshotCatchUpTaskTest {
     Snapshot snapshot = new TestSnapshot(9989);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+    LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
     try {
       task.call();
       fail("Expected LeaderUnknownException");
@@ -244,7 +244,7 @@ public class SnapshotCatchUpTaskTest {
     Snapshot snapshot = new TestSnapshot(9989);
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.ELECTOR);
-    LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+    LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
     try {
       task.call();
       fail("Expected LeaderUnknownException");
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
index 04b0959..f0fc359 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
@@ -84,8 +84,7 @@ public class PartitionedSnapshotTest extends DataSnapshotTest {
     snapshot.setLastLogIndex(10);
     snapshot.setLastLogTerm(5);
 
-    SnapshotInstaller<PartitionedSnapshot> defaultInstaller = snapshot
-        .getDefaultInstaller(dataGroupMember);
+    SnapshotInstaller<PartitionedSnapshot> defaultInstaller = snapshot.getDefaultInstaller(dataGroupMember);
     for (int i = 0; i < 10; i++) {
       dataGroupMember.getSlotManager().setToPulling(i, TestUtils.getNode(0));
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
index ae49671..d9ce485 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
@@ -61,7 +61,6 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
index 801d0c1..9501962 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.cluster.partition.slot.SlotNodeRemovalResult;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.query.ClusterPlanRouter;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -95,6 +96,7 @@ public class SlotPartitionTableTest {
   SlotPartitionTable localTable;
   Node localNode;
   int replica_size = 5;
+  int raftId = 0;
   MManager[] mManager;
 
   SlotPartitionTable[] tables;//The PartitionTable on each node.
@@ -133,11 +135,11 @@ public class SlotPartitionTableTest {
     for (int i = 0; i < 20; i++) {
       storageNames[i] = String.format("root.sg.l2.l3.%d", i);
       //determine which node the sg belongs to
-      Node node = localTable.routeToHeaderByTime(storageNames[i], 0);
-      nodeSGs[node.getMetaPort() - 30000].add(storageNames[i]);
+      RaftNode node = localTable.routeToHeaderByTime(storageNames[i], 0);
+      nodeSGs[node.getNode().getMetaPort() - 30000].add(storageNames[i]);
       storageNames[i + 20] = String.format("root.sg.l2.l3.l4.%d", i + 20);
       node = localTable.routeToHeaderByTime(storageNames[i + 20], 0);
-      nodeSGs[node.getMetaPort() - 30000].add(storageNames[i + 20]);
+      nodeSGs[node.getNode().getMetaPort() - 30000].add(storageNames[i + 20]);
     }
     for (int i = 0; i < 20; i++) {
       mManager[i] = MManagerWhiteBox.newMManager("target/schemas/mlog_" + i);
@@ -238,9 +240,9 @@ public class SlotPartitionTableTest {
 
   @Test
   public void routeToHeader() {
-    Node node1 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 0);
-    Node node2 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 1);
-    Node node3 = localTable
+    RaftNode node1 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 0);
+    RaftNode node2 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 1);
+    RaftNode node3 = localTable
         .routeToHeaderByTime("root.sg.l2.l3.l4.28", 1 + StorageEngine.getTimePartitionInterval());
     assertEquals(node1, node2);
     assertNotEquals(node2, node3);
@@ -283,7 +285,7 @@ public class SlotPartitionTableTest {
   @Test
   public void getPreviousNodeMap() {
     //before adding or deleting node, it should be null
-    assertNull(localTable.getPreviousNodeMap(localNode));
+    assertNull(localTable.getPreviousNodeMap(new RaftNode(localNode, 0)));
     //TODO after adding or deleting node, it has data
   }
 
@@ -510,7 +512,7 @@ public class SlotPartitionTableTest {
 
   @Test
   public void testRemoveNode() {
-    List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0));
+    List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0), raftId);
     NodeRemovalResult nodeRemovalResult = localTable.removeNode(getNode(0));
     assertFalse(localTable.getAllNodes().contains(getNode(0)));
     PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
index d2f7d82..085bcf8 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
@@ -63,7 +63,7 @@ public class RemoteSeriesReaderByTimestampTest {
       public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
         return new AsyncDataClient(null, null, node, null) {
           @Override
-          public void fetchSingleSeriesByTimestamp(Node header, long readerId, long time,
+          public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time,
               AsyncMethodCallback<ByteBuffer> resultHandler) throws TException {
             if (failedNodes.contains(node)) {
               throw new TException("Node down.");
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
index 550feee..82bceba 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
@@ -73,7 +73,7 @@ public class RemoteSimpleSeriesReaderTest {
       public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
         return new AsyncDataClient(null, null, node, null) {
           @Override
-          public void fetchSingleSeries(Node header, long readerId,
+          public void fetchSingleSeries(Node header, int raftId, long readerId,
               AsyncMethodCallback<ByteBuffer> resultHandler)
               throws TException {
             if (failedNodes.contains(node)) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index d4bf336..e112d31 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -57,7 +58,7 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
-    public Node routeToHeaderByTime(String storageGroupName, long timestamp) {
+    public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) {
       return null;
     }
 
@@ -77,7 +78,12 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
-    public PartitionGroup getHeaderGroup(Node header) {
+    public PartitionGroup getHeaderGroup(RaftNode header) {
+      return null;
+    }
+
+    @Override
+    public PartitionGroup getHeaderGroup(Node node) {
       return null;
     }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 96dc257..8180d4e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -123,6 +123,7 @@ public class DataGroupMemberTest extends MemberTest {
   private boolean hasInitialSnapshots;
   private boolean enableSyncLeader;
   private int prevReplicationNum;
+  private int raftId = 0;
 
   @Before
   public void setUp() throws Exception {
@@ -210,7 +211,7 @@ public class DataGroupMemberTest extends MemberTest {
             }
 
             @Override
-            public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
+            public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) {
               new Thread(() -> {
                 if (enableSyncLeader) {
                   resultHandler.onComplete(-1L);
@@ -253,7 +254,7 @@ public class DataGroupMemberTest extends MemberTest {
   @Test
   public void testAddNode() {
     System.out.println("Start testAddNode()");
-    PartitionGroup partitionGroup = new PartitionGroup(TestUtils.getNode(0),
+    PartitionGroup partitionGroup = new PartitionGroup(raftId, TestUtils.getNode(0),
         TestUtils.getNode(50), TestUtils.getNode(90));
     DataGroupMember firstMember = getDataGroupMember(TestUtils.getNode(0),
         new PartitionGroup(partitionGroup));
@@ -632,7 +633,7 @@ public class DataGroupMemberTest extends MemberTest {
     GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0),
         dataResult);
     new DataAsyncService(dataGroupMember)
-        .fetchSingleSeries(TestUtils.getNode(0), readerId, dataHandler);
+        .fetchSingleSeries(TestUtils.getNode(0), raftId, readerId, dataHandler);
     ByteBuffer dataBuffer = dataResult.get();
     BatchData batchData = SerializeUtils.deserializeBatchData(dataBuffer);
     for (int i = 5; i < 10; i++) {
@@ -643,7 +644,7 @@ public class DataGroupMemberTest extends MemberTest {
     }
     assertFalse(batchData.hasCurrent());
 
-    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
         new GenericHandler<>(TestUtils.getNode(0), null));
   }
 
@@ -690,7 +691,7 @@ public class DataGroupMemberTest extends MemberTest {
     GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0),
         dataResult);
     new DataAsyncService(dataGroupMember)
-        .fetchSingleSeries(TestUtils.getNode(0), readerId, dataHandler);
+        .fetchSingleSeries(TestUtils.getNode(0), raftId, readerId, dataHandler);
     ByteBuffer dataBuffer = dataResult.get();
     BatchData batchData = SerializeUtils.deserializeBatchData(dataBuffer);
     for (int i = 5; i < 9; i++) {
@@ -701,7 +702,7 @@ public class DataGroupMemberTest extends MemberTest {
     }
     assertFalse(batchData.hasCurrent());
 
-    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
         new GenericHandler<>(TestUtils.getNode(0), null));
   }
 
@@ -751,13 +752,13 @@ public class DataGroupMemberTest extends MemberTest {
 
     for (int i = 5; i < 10; i++) {
       new DataAsyncService(dataGroupMember)
-          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i,
+          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), raftId, readerId, i,
               dataHandler);
       Object value = SerializeUtils.deserializeObject(dataResult.get());
       assertEquals(i * 1.0, (Double) value, 0.00001);
     }
 
-    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
         new GenericHandler<>(TestUtils.getNode(0), null));
   }
 
@@ -806,13 +807,13 @@ public class DataGroupMemberTest extends MemberTest {
         dataResult);
     for (int i = 5; i < 9; i++) {
       new DataAsyncService(dataGroupMember)
-          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i,
+          .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), raftId, readerId, i,
               dataHandler);
       Object value = SerializeUtils.deserializeObject(dataResult.get());
       assertEquals(i * 1.0, (Double) value, 0.00001);
     }
 
-    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+    new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
         new GenericHandler<>(TestUtils.getNode(0), null));
   }
 
@@ -823,7 +824,7 @@ public class DataGroupMemberTest extends MemberTest {
     AtomicReference<GetAllPathsResult> pathResult = new AtomicReference<>();
     GenericHandler<GetAllPathsResult> handler = new GenericHandler<>(TestUtils.getNode(0), pathResult);
     new DataAsyncService(dataGroupMember)
-        .getAllPaths(TestUtils.getNode(0), Collections.singletonList(path), false, handler);
+        .getAllPaths(TestUtils.getNode(0), raftId, Collections.singletonList(path), false, handler);
     List<String> result = pathResult.get().paths;
     assertEquals(20, result.size());
     for (int i = 0; i < 10; i++) {
@@ -835,7 +836,7 @@ public class DataGroupMemberTest extends MemberTest {
   public void testFetchWithoutQuery() {
     System.out.println("Start testFetchWithoutQuery()");
     AtomicReference<Exception> result = new AtomicReference<>();
-    new DataAsyncService(dataGroupMember).fetchSingleSeriesByTimestamp(TestUtils.getNode(0), 0, 0,
+    new DataAsyncService(dataGroupMember).fetchSingleSeriesByTimestamp(TestUtils.getNode(0), raftId, 0, 0,
         new AsyncMethodCallback<ByteBuffer>() {
           @Override
           public void onComplete(ByteBuffer buffer) {
@@ -850,7 +851,7 @@ public class DataGroupMemberTest extends MemberTest {
     assertTrue(exception instanceof ReaderNotFoundException);
     assertEquals("The requested reader 0 is not found", exception.getMessage());
 
-    new DataAsyncService(dataGroupMember).fetchSingleSeries(TestUtils.getNode(0), 0,
+    new DataAsyncService(dataGroupMember).fetchSingleSeries(TestUtils.getNode(0), raftId, 0,
         new AsyncMethodCallback<ByteBuffer>() {
           @Override
           public void onComplete(ByteBuffer buffer) {
@@ -995,7 +996,7 @@ public class DataGroupMemberTest extends MemberTest {
       aggrResultRef = new AtomicReference<>();
       aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
       new DataAsyncService(dataGroupMember)
-          .getGroupByResult(TestUtils.getNode(10), executorId, 0, 20, aggrResultHandler);
+          .getGroupByResult(TestUtils.getNode(10), raftId, executorId, 0, 20, aggrResultHandler);
 
       byteBuffers = aggrResultRef.get();
       assertNotNull(byteBuffers);
@@ -1024,7 +1025,7 @@ public class DataGroupMemberTest extends MemberTest {
       aggrResultRef = new AtomicReference<>();
       aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
       new DataAsyncService(dataGroupMember)
-          .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20, aggrResultHandler);
+          .getGroupByResult(TestUtils.getNode(30), raftId, executorId, 0, 20, aggrResultHandler);
 
       byteBuffers = aggrResultRef.get();
       assertNull(byteBuffers);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 4f8a07b..6065370 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -230,13 +230,13 @@ public class MemberTest {
     MetaGroupMember ret = new TestMetaGroupMember() {
 
       @Override
-      public DataGroupMember getLocalDataMember(Node header,
+      public DataGroupMember getLocalDataMember(Node header, int raftId,
           Object request) {
         return getDataGroupMember(header);
       }
 
       @Override
-      public DataGroupMember getLocalDataMember(Node header) {
+      public DataGroupMember getLocalDataMember(Node header, int raftId) {
         return getDataGroupMember(header);
       }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 32b0b30..1badcd2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -305,13 +305,13 @@ public class MetaGroupMemberTest extends MemberTest {
       }
 
       @Override
-      public DataGroupMember getLocalDataMember(Node header,
+      public DataGroupMember getLocalDataMember(Node header, int raftId,
           Object request) {
         return getDataGroupMember(header);
       }
 
       @Override
-      public DataGroupMember getLocalDataMember(Node header) {
+      public DataGroupMember getLocalDataMember(Node header, int raftId) {
         return getDataGroupMember(header);
       }
 
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 56f1dd4..549cd55 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -121,6 +121,11 @@ struct Node {
   5: required int clientPort
 }
 
+struct RaftNode {
+  1: required Node node
+  2: required int raftId
+}
+
 // leader -> follower
 struct StartUpStatus {
   1: required long partitionInterval