You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/01/14 08:09:42 UTC

[iotdb] 02/04: fix bugs of wrong previous groups, pull snapshot from self and wrong remove local data

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 325e4f556cb3d520b6a365a89317e997291979c2
Author: lta <li...@163.com>
AuthorDate: Mon Jan 11 11:26:00 2021 +0800

    fix bugs of wrong previous groups, pull snapshot from self and wrong remove local data
---
 .../iotdb/cluster/log/snapshot/FileSnapshot.java   |  6 +--
 .../cluster/log/snapshot/PullSnapshotTask.java     | 56 ++++++++++---------
 .../iotdb/cluster/partition/NodeRemovalResult.java |  5 +-
 .../iotdb/cluster/partition/PartitionTable.java    |  6 +++
 .../cluster/partition/slot/SlotPartitionTable.java | 59 ++++++++++++++------
 .../iotdb/cluster/server/DataClusterServer.java    | 30 ++++-------
 .../cluster/server/member/DataGroupMember.java     | 62 +++++++++++++++++-----
 .../cluster/server/member/MetaGroupMember.java     |  1 -
 .../iotdb/cluster/server/member/RaftMember.java    | 35 ++++++++++++
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |  5 ++
 thrift/src/main/thrift/cluster.thrift              |  1 -
 11 files changed, 185 insertions(+), 81 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index 9f1b562..b559879 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -227,12 +227,10 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         throw new SnapshotInstallationException(e);
       }
 
-      for (FileSnapshot value : snapshotMap.values()) {
-        installFileSnapshotSchema(value);
-      }
-
       for (Entry<Integer, FileSnapshot> integerSnapshotEntry : snapshotMap.entrySet()) {
         Integer slot = integerSnapshotEntry.getKey();
+        FileSnapshot snapshot = integerSnapshotEntry.getValue();
+        installFileSnapshotSchema(snapshot);
         SlotStatus status = slotManager.getStatus(slot);
         if (status == SlotStatus.PULLING) {
           // as schemas are set, writes can proceed
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 9dc6231..4a79485 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.log.Snapshot;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
@@ -163,30 +164,37 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
 
   @Override
   public Void call() {
-    request = new PullSnapshotRequest();
-    request.setHeader(descriptor.getPreviousHolders().getHeader());
-    request.setRaftId(descriptor.getPreviousHolders().getId());
-    request.setRequiredSlots(descriptor.getSlots());
-    request.setRequireReadOnly(descriptor.isRequireReadOnly());
-
-    boolean finished = false;
-    int nodeIndex = -1;
-    while (!finished) {
-      try {
-        // sequentially pick up a node that may have this slot
-        nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
-        finished = pullSnapshot(nodeIndex);
-        if (!finished) {
-          Thread
-              .sleep(ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs());
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        finished = true;
-      } catch (TException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(),
-              descriptor.getPreviousHolders().get(nodeIndex), e);
+    // If this node is the member of previous holder, it's unnecessary to pull data again
+    if (descriptor.getPreviousHolders().contains(newMember.getThisNode())) {
+      // inform the previous holders that one member has successfully pulled snapshot directly
+      newMember.registerPullSnapshotHint(descriptor);
+    } else {
+      request = new PullSnapshotRequest();
+      request.setHeader(descriptor.getPreviousHolders().getHeader());
+      request.setRaftId(descriptor.getPreviousHolders().getId());
+      request.setRequiredSlots(descriptor.getSlots());
+      request.setRequireReadOnly(descriptor.isRequireReadOnly());
+
+      boolean finished = false;
+      int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode());
+      while (!finished) {
+        try {
+          // sequentially pick up a node that may have this slot
+          nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
+          finished = pullSnapshot(nodeIndex);
+          if (!finished) {
+            Thread
+                .sleep(
+                    ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs());
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          finished = true;
+        } catch (TException e) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(),
+                descriptor.getPreviousHolders().get(nodeIndex), e);
+          }
         }
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 457af85..5493980 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.partition;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -26,10 +27,10 @@ import java.util.List;
  */
 public class NodeRemovalResult {
 
-  private List<PartitionGroup> removedGroupList;
+  private List<PartitionGroup> removedGroupList = new ArrayList<>();
   // if the removed group contains the local node, the local node should join a new group to
   // preserve the replication number
-  private List<PartitionGroup> newGroupList;
+  private List<PartitionGroup> newGroupList = new ArrayList<>();
 
   public PartitionGroup getRemovedGroup(int raftId) {
     for (PartitionGroup group : removedGroupList) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index bd8e518..079aad1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -95,6 +95,12 @@ public interface PartitionTable {
   List<PartitionGroup> getGlobalGroups();
 
   /**
+   * Judge whether the data of slot is held by node
+   * @param node target node
+   */
+  boolean judgeHoldSlot(Node node, int slot);
+
+  /**
    * @param path      can be an incomplete path (but should contain a storage group name) e.g., if
    *                  "root.sg" is a storage group, then path can not be "root".
    * @param timestamp
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index f8f89b9..2a5ae3c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -62,7 +62,7 @@ public class SlotPartitionTable implements PartitionTable {
   private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM];
   // the nodes that each slot belongs to before a new node is added, used for the new node to
   // find the data source
-  private Map<RaftNode, Map<Integer, RaftNode>> previousNodeMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = new ConcurrentHashMap<>();
 
   //the filed is used for determining which nodes need to be a group.
   // the data groups which this node belongs to.
@@ -164,8 +164,7 @@ public class SlotPartitionTable implements PartitionTable {
     return ret;
   }
 
-  @Override
-  public PartitionGroup getHeaderGroup(RaftNode raftNode) {
+  private PartitionGroup getHeaderGroup(RaftNode raftNode, List<Node> nodeRing) {
     PartitionGroup ret = new PartitionGroup(raftNode.getRaftId());
 
     // assuming the nodes are [1,2,3,4,5]
@@ -187,6 +186,11 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
+  public PartitionGroup getHeaderGroup(RaftNode raftNode) {
+    return getHeaderGroup(raftNode, this.nodeRing);
+  }
+
+  @Override
   public PartitionGroup getHeaderGroup(Node node) {
     return getHeaderGroup(new RaftNode(node, 0));
   }
@@ -228,11 +232,13 @@ public class SlotPartitionTable implements PartitionTable {
 
   @Override
   public NodeAdditionResult addNode(Node node) {
+    List<Node> oldRing;
     synchronized (nodeRing) {
       if (nodeRing.contains(node)) {
         return null;
       }
 
+      oldRing = new ArrayList<>(nodeRing);
       nodeRing.add(node);
       nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
 
@@ -277,7 +283,7 @@ public class SlotPartitionTable implements PartitionTable {
 
     // the slots movement is only done logically, the new node itself will pull data from the
     // old node
-    result.setLostSlots(moveSlotsToNew(node));
+    result.setLostSlots(moveSlotsToNew(node, oldRing));
 
     return result;
   }
@@ -290,7 +296,7 @@ public class SlotPartitionTable implements PartitionTable {
    * @param newNode
    * @return a map recording what slots each group lost.
    */
-  private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode) {
+  private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode, List<Node> oldRing) {
     Map<RaftNode, Set<Integer>> result = new HashMap<>();
     // as a node is added, the average slots for each node decrease
     // move the slots to the new node if any previous node have more slots than the new average
@@ -315,7 +321,7 @@ public class SlotPartitionTable implements PartitionTable {
         nodeSlotMap.get(curNode).addAll(slotsToMove);
         for (Integer slot : slotsToMove) {
           // record what node previously hold the integer
-          previousNodeMap.get(curNode).put(slot, entry.getKey());
+          previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
           slotNodes[slot] = curNode;
         }
         result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove);
@@ -326,7 +332,7 @@ public class SlotPartitionTable implements PartitionTable {
           nodeSlotMap.get(curNode).addAll(slotsToMove);
           for (Integer slot : slotsToMove) {
             // record what node previously hold the integer
-            previousNodeMap.get(curNode).put(slot, entry.getKey());
+            previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
             slotNodes[slot] = curNode;
           }
           result.get(entry.getKey()).addAll(slotsToMove);
@@ -357,15 +363,19 @@ public class SlotPartitionTable implements PartitionTable {
       }
 
       dataOutputStream.writeInt(previousNodeMap.size());
-      for (Entry<RaftNode, Map<Integer, RaftNode>> nodeMapEntry : previousNodeMap.entrySet()) {
+      for (Entry<RaftNode, Map<Integer, PartitionGroup>> nodeMapEntry : previousNodeMap.entrySet()) {
         dataOutputStream.writeInt(nodeMapEntry.getKey().getNode().getNodeIdentifier());
         dataOutputStream.writeInt(nodeMapEntry.getKey().getRaftId());
-        Map<Integer, RaftNode> prevHolders = nodeMapEntry.getValue();
+        Map<Integer, PartitionGroup> prevHolders = nodeMapEntry.getValue();
         dataOutputStream.writeInt(prevHolders.size());
-        for (Entry<Integer, RaftNode> integerNodeEntry : prevHolders.entrySet()) {
+        for (Entry<Integer, PartitionGroup> integerNodeEntry : prevHolders.entrySet()) {
           dataOutputStream.writeInt(integerNodeEntry.getKey());
-          dataOutputStream.writeInt(integerNodeEntry.getValue().getNode().getNodeIdentifier());
-          dataOutputStream.writeInt(integerNodeEntry.getValue().getRaftId());
+          PartitionGroup group = integerNodeEntry.getValue();
+          dataOutputStream.writeInt(group.getId());
+          dataOutputStream.writeInt(group.size());
+          for (Node node : group) {
+            dataOutputStream.writeInt(node.getNodeIdentifier());
+          }
         }
       }
 
@@ -402,12 +412,16 @@ public class SlotPartitionTable implements PartitionTable {
       int nodeId = buffer.getInt();
       RaftNode node = new RaftNode(idNodeMap.get(nodeId), buffer.getInt());
 
-      Map<Integer, RaftNode> prevHolders = new HashMap<>();
+      Map<Integer, PartitionGroup> prevHolders = new HashMap<>();
       int holderNum = buffer.getInt();
       for (int i1 = 0; i1 < holderNum; i1++) {
         int slot = buffer.getInt();
-        RaftNode holder = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt());
-        prevHolders.put(slot, holder);
+        PartitionGroup group = new PartitionGroup(buffer.getInt());
+        int nodeNum = buffer.getInt();
+        for (int i2 = 0 ; i2 < nodeNum; i2++) {
+          group.add(idNodeMap.get(buffer.getInt()));
+        }
+        prevHolders.put(slot, group);
       }
       previousNodeMap.put(node, prevHolders);
     }
@@ -429,7 +443,7 @@ public class SlotPartitionTable implements PartitionTable {
     return nodeRing;
   }
 
-  public Map<Integer, RaftNode> getPreviousNodeMap(RaftNode raftNode) {
+  public Map<Integer, PartitionGroup> getPreviousNodeMap(RaftNode raftNode) {
     return previousNodeMap.get(raftNode);
   }
 
@@ -503,6 +517,12 @@ public class SlotPartitionTable implements PartitionTable {
         // each node exactly joins replicationNum groups, so when a group is removed, the node
         // should join a new one
         int thisNodeIdx = nodeRing.indexOf(thisNode);
+
+        // check if this node is to be removed
+        if (thisNodeIdx == -1) {
+          continue;
+        }
+
         // this node must be the last node of the new group
         int headerNodeIdx = thisNodeIdx - (replicationNum - 1);
         headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx;
@@ -531,7 +551,7 @@ public class SlotPartitionTable implements PartitionTable {
         int slot = slots.get(i);
         RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), raftId);
         slotNodes[slot] = newHolder;
-        nodeSlotMap.get(newHolder).add(slot);
+        nodeSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
         newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
       }
     }
@@ -549,6 +569,11 @@ public class SlotPartitionTable implements PartitionTable {
     }
   }
 
+  @Override
+  public boolean judgeHoldSlot(Node node, int slot) {
+    return getHeaderGroup(slotNodes[slot]).contains(node);
+  }
+
   private void calculateGlobalGroups() {
     globalGroups = new ArrayList<>();
     for (Node node : getAllNodes()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 81ae373..b023c36 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -526,9 +526,15 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     }
   }
 
+  /**
+   * Make sure the group will not receive new raft logs
+   * @param header
+   * @param dataGroupMember
+   */
   private void removeMember(RaftNode header, DataGroupMember dataGroupMember) {
-    dataGroupMember.syncLeader();
+    dataGroupMember.getStopStatus().setSyncSuccess(dataGroupMember.syncLeader());
     dataGroupMember.setReadOnly();
+    dataGroupMember.waitFollowersToSync();
     dataGroupMember.stop();
     stoppedMemberManager.put(header, dataGroupMember);
   }
@@ -578,8 +584,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   /**
    * Try removing a node from the groups of each DataGroupMember. If the node is the header of some
    * group, set the member to read only so that it can still provide data for other nodes that has
-   * not yet pulled its data. If the node is the local node, remove all members whose group is not
-   * headed by this node. Otherwise, just change the node list of the member and pull new data. And
+   * not yet pulled its data. Otherwise, just change the node list of the member and pull new data. And
    * create a new DataGroupMember if this node should join a new group because of this removal.
    *
    * @param node
@@ -591,25 +596,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       while (entryIterator.hasNext()) {
         Entry<RaftNode, DataGroupMember> entry = entryIterator.next();
         DataGroupMember dataGroupMember = entry.getValue();
-        if (dataGroupMember.getHeader().equals(node)) {
-          // the group is removed as the node is removed, so new writes should be rejected as
-          // they belong to the new holder, but the member is kept alive for other nodes to pull
-          // snapshots
+        if (dataGroupMember.getHeader().equals(node) || node.equals(thisNode)) {
           entryIterator.remove();
           removeMember(entry.getKey(), entry.getValue());
         } else {
-          if (node.equals(thisNode)) {
-            // this node is removed, it is no more replica of other groups
-            List<Integer> nodeSlots =
-                ((SlotPartitionTable) partitionTable)
-                    .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
-            dataGroupMember.removeLocalData(nodeSlots);
-            entryIterator.remove();
-            dataGroupMember.stop();
-          } else {
-            // the group should be updated and pull new slots from the removed node
-            dataGroupMember.removeNode(node, removalResult);
-          }
+          // the group should be updated and pull new slots from the removed node
+          dataGroupMember.removeNode(node, removalResult);
         }
       }
       for (PartitionGroup newGroup : removalResult.getNewGroupList()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 2cd675f..4737520 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -402,16 +402,27 @@ public class DataGroupMember extends RaftMember {
    * @param request
    */
   public PullSnapshotResp getSnapshot(PullSnapshotRequest request) throws IOException {
-    waitLeader();
-    if (character != NodeCharacter.LEADER && !readOnly) {
-      return null;
-    }
     // if the requester pulls the snapshots because the header of the group is removed, then the
     // member should no longer receive new data
     if (request.isRequireReadOnly()) {
       setReadOnly();
     }
 
+    boolean canGetSnapshot;
+    /**
+     * There are two conditions that can get snapshot:
+     * 1. The raft member is stopped and sync status is successful which means it has synced leader successfully before stop.
+     * 2. The raft member is not stopped and syncing leader is successful.
+     */
+    if (stopStatus.stop) {
+      canGetSnapshot = stopStatus.syncSuccess;
+    } else {
+      canGetSnapshot = syncLeader();
+    }
+    if (!canGetSnapshot) {
+      return null;
+    }
+
     List<Integer> requiredSlots = request.getRequiredSlots();
     for (Integer requiredSlot : requiredSlots) {
       // wait if the data of the slot is in another node
@@ -467,28 +478,26 @@ public class DataGroupMember extends RaftMember {
     synchronized (logManager) {
       logger.info("{} pulling {} slots from remote", name, slots.size());
       PartitionedSnapshot<Snapshot> snapshot = (PartitionedSnapshot) logManager.getSnapshot();
-      Map<Integer, RaftNode> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
+      Map<Integer, PartitionGroup> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
           .getPreviousNodeMap(new RaftNode(newNode, getRaftGroupId()));
 
       // group the slots by their owners
-      Map<RaftNode, List<Integer>> holderSlotsMap = new HashMap<>();
+      Map<PartitionGroup, List<Integer>> holderSlotsMap = new HashMap<>();
       for (int slot : slots) {
         // skip the slot if the corresponding data is already replicated locally
         if (snapshot.getSnapshot(slot) == null) {
-          RaftNode raftNode = prevHolders.get(slot);
-          if (raftNode != null) {
-            holderSlotsMap.computeIfAbsent(raftNode, n -> new ArrayList<>()).add(slot);
+          PartitionGroup group = prevHolders.get(slot);
+          if (group != null) {
+            holderSlotsMap.computeIfAbsent(group, n -> new ArrayList<>()).add(slot);
           }
         }
       }
 
       // pull snapshots from each owner's data group
-      for (Entry<RaftNode, List<Integer>> entry : holderSlotsMap.entrySet()) {
-        RaftNode raftNode = entry.getKey();
+      for (Entry<PartitionGroup, List<Integer>> entry : holderSlotsMap.entrySet()) {
         List<Integer> nodeSlots = entry.getValue();
         PullSnapshotTaskDescriptor taskDescriptor =
-            new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable()
-                .getHeaderGroup(raftNode), nodeSlots, false);
+            new PullSnapshotTaskDescriptor(entry.getKey(), nodeSlots, false);
         pullFileSnapshot(taskDescriptor, null);
       }
     }
@@ -760,6 +769,27 @@ public class DataGroupMember extends RaftMember {
     }
   }
 
+  public void waitFollowersToSync() {
+    if (character != NodeCharacter.LEADER) {
+      return;
+    }
+    for (Map.Entry<Node, Peer> entry: peerMap.entrySet()) {
+      Node node = entry.getKey();
+      Peer peer = entry.getValue();
+      while (peer.getMatchIndex() < logManager.getCommitLogIndex()) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          logger.warn("{}: Unexpected interruption when waiting follower {} to sync, raft id is {}",
+              name, node, getRaftGroupId());
+        }
+      }
+      logger.info("{}: Follower {} has synced with leader, raft id is {}", name, node,
+          getRaftGroupId());
+    }
+  }
+
   /**
    * Generate a report containing the character, leader, term, last log term, last log index, header
    * and readOnly or not of this member.
@@ -800,6 +830,12 @@ public class DataGroupMember extends RaftMember {
   public boolean onSnapshotInstalled(List<Integer> slots) {
     List<Integer> removableSlots = new ArrayList<>();
     for (Integer slot : slots) {
+      /**
+       * If this slot is just held by different raft groups in the same node, it should keep the data of slot.
+       */
+      if (metaGroupMember.getPartitionTable().judgeHoldSlot(thisNode, slot)) {
+        continue;
+      }
       int sentReplicaNum = slotManager.sentOneReplication(slot);
       if (sentReplicaNum >= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) {
         removableSlots.add(slot);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index d93efc0..7e73f61 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -382,7 +382,6 @@ public class MetaGroupMember extends RaftMember {
         logger.error("Unexpected interruption when waiting for hardlinkCleaner to end", e);
       }
     }
-
     logger.info("{}: stopped", name);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 3a5b51b..0526285 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -249,6 +249,8 @@ public abstract class RaftMember {
    */
   private LogDispatcher logDispatcher;
 
+  protected StopStatus stopStatus;
+
   protected RaftMember() {
   }
 
@@ -260,6 +262,7 @@ public abstract class RaftMember {
     this.asyncHeartbeatClientPool = asyncHeartbeatPool;
     this.syncHeartbeatClientPool = syncHeartbeatPool;
     this.asyncSendLogClientPool = asyncClientPool;
+    this.stopStatus = new StopStatus();
   }
 
   protected RaftMember(String name, AsyncClientPool asyncPool, SyncClientPool syncPool,
@@ -365,9 +368,11 @@ public abstract class RaftMember {
         logger.error("Unexpected interruption when waiting for commitLogPool to end", e);
       }
     }
+    leader.set(ClusterConstant.EMPTY_NODE);
     catchUpService = null;
     heartBeatService = null;
     appendLogThreadPool = null;
+    stopStatus.setStop(true);
     logger.info("Member {} stopped", name);
   }
 
@@ -801,6 +806,9 @@ public abstract class RaftMember {
    * Wait until the leader of this node becomes known or time out.
    */
   public void waitLeader() {
+    if (stopStatus.isStop()) {
+      return;
+    }
     long startTime = System.currentTimeMillis();
     while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
       synchronized (waitLeaderCondition) {
@@ -1876,4 +1884,31 @@ public abstract class RaftMember {
     OK, TIME_OUT, LEADERSHIP_STALE
   }
 
+  public class StopStatus {
+
+    boolean stop;
+
+    boolean syncSuccess;
+
+    public boolean isStop() {
+      return stop;
+    }
+
+    public void setStop(boolean stop) {
+      this.stop = stop;
+    }
+
+    public boolean isSyncSuccess() {
+      return syncSuccess;
+    }
+
+    public void setSyncSuccess(boolean syncSuccess) {
+      this.syncSuccess = syncSuccess;
+    }
+  }
+
+  public StopStatus getStopStatus() {
+    return stopStatus;
+  }
+
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index e112d31..f6bb254 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -106,6 +106,11 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     public List<PartitionGroup> getGlobalGroups() {
       return null;
     }
+
+    @Override
+    public boolean judgeHoldSlot(Node node, int slot) {
+      return true;
+    }
   };
 
   @Override
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 9019680..2a24106 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -315,7 +315,6 @@ service RaftService {
   **/
   long requestCommitIndex(1:Node header, 2:int raftId)
 
-
   /**
   * Read a chunk of a file from the client. If the remaining of the file does not have enough
   * bytes, only the remaining will be returned.