You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/02/10 09:56:09 UTC

[incubator-iotdb] branch cluster_node_deletion updated: add unit tests

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_node_deletion by this push:
     new ad5be9f  add unit tests
ad5be9f is described below

commit ad5be9ff3b8794cbcbd4c1391615bc393ee92d58
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Feb 10 17:55:54 2020 +0800

    add unit tests
---
 cluster/partitions.tmp                             | Bin 0 -> 511 bytes
 cluster/src/assembly/resources/conf/logback.xml    |   1 +
 .../java/org/apache/iotdb/cluster/ClusterMain.java |   2 +-
 .../apache/iotdb/cluster/config/ClusterConfig.java |   2 +-
 .../java/org/apache/iotdb/cluster/log/Log.java     |   1 +
 .../org/apache/iotdb/cluster/log/LogParser.java    |   6 +
 .../iotdb/cluster/log/logtypes/RemoveNodeLog.java  |   9 +
 .../cluster/log/snapshot/PullSnapshotTask.java     |   9 +-
 .../cluster/partition/SlotPartitionTable.java      |   3 +-
 .../cluster/query/manage/ClusterQueryManager.java  |  11 ++
 .../iotdb/cluster/server/DataClusterServer.java    |  16 +-
 .../iotdb/cluster/server/MetaClusterServer.java    |   7 +-
 .../apache/iotdb/cluster/server/NodeReport.java    | 111 ++++++++++++
 .../handlers/caller/AppendGroupEntryHandler.java   |   5 +-
 .../server/handlers/caller/HeartBeatHandler.java   |   4 +-
 .../cluster/server/heartbeat/HeartBeatThread.java  |   1 +
 .../server/heartbeat/MetaHeartBeatThread.java      |   1 +
 .../cluster/server/member/DataGroupMember.java     |  52 ++++--
 .../cluster/server/member/MetaGroupMember.java     |  70 +++++++-
 .../iotdb/cluster/server/member/RaftMember.java    |  28 +--
 .../apache/iotdb/cluster/log/LogParserTest.java    |  15 ++
 .../log/log/snapshot/PullSnapshotTaskTest.java     |   5 +-
 .../cluster/log/logtypes/SerializeLogTest.java     |  26 +++
 .../log/partition/SlotPartitionTableTest.java      |  32 +++-
 .../cluster/log/snapshot/PullSnapshotTaskTest.java |  97 ----------
 .../log/snapshot/SnapshotSerializeTest.java        | 195 ---------------------
 .../cluster/server/member/DataGroupMemberTest.java |  84 ++++++++-
 .../iotdb/cluster/server/member/MemberTest.java    |  12 +-
 .../cluster/server/member/MetaGroupMemberTest.java | 174 +++++++++++++++++-
 service-rpc/src/main/thrift/cluster.thrift         |  11 ++
 30 files changed, 627 insertions(+), 363 deletions(-)

diff --git a/cluster/partitions.tmp b/cluster/partitions.tmp
new file mode 100644
index 0000000..569a578
Binary files /dev/null and b/cluster/partitions.tmp differ
diff --git a/cluster/src/assembly/resources/conf/logback.xml b/cluster/src/assembly/resources/conf/logback.xml
index 4401a8b..37e8d32 100644
--- a/cluster/src/assembly/resources/conf/logback.xml
+++ b/cluster/src/assembly/resources/conf/logback.xml
@@ -159,4 +159,5 @@
     <logger level="debug" name="org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter">
         <appender-ref ref="FILE_DYNAMIC_PARAMETER"/>
     </logger>
+    <logger level="info" name="org.apache.iotdb.cluster.server.heartbeat"/>
 </configuration>
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 8e76928..bcd6690 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -115,7 +115,7 @@ public class ClusterMain {
             logger.error("Cluster size is too small, cannot remove any node");
             return;
           } else if (response == Response.RESPONSE_REJECT) {
-            logger.error("Node {} is not found in the cluster, please check", node);
+            logger.error("Node {} is not found in the cluster, please check", nodeToRemove);
             return;
           }
         }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 576e373..e3e7537 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -40,7 +40,7 @@ public class ClusterConfig {
   private int maxConcurrentClientNum = 1024;
 
   @ClusterConsistent
-  private int replicationNum = 2;
+  private int replicationNum = 3;
 
   private int connectionTimeoutInMS = 20 * 1000;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 354d322..d84f315 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -41,6 +41,7 @@ public abstract class Log {
 
   public enum Types {
     // TODO-Cluster#348 support more logs
+    // DO CHECK LogParser when you add a new type of log
     ADD_NODE, PHYSICAL_PLAN, CLOSE_FILE, REMOVE_NODE
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index 4dc708c..1bda735 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.log.Log.Types;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,6 +75,11 @@ public class LogParser {
         closeFileLog.deserialize(buffer);
         log = closeFileLog;
         break;
+      case REMOVE_NODE:
+        RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+        removeNodeLog.deserialize(buffer);
+        log = removeNodeLog;
+        break;
       default:
         throw new IllegalArgumentException(type.toString());
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 204e675..6a12af0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -18,6 +18,10 @@ public class RemoveNodeLog extends Log {
         DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
         try {
             dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
+            dataOutputStream.writeLong(getPreviousLogIndex());
+            dataOutputStream.writeLong(getPreviousLogTerm());
+            dataOutputStream.writeLong(getCurrLogIndex());
+            dataOutputStream.writeLong(getCurrLogTerm());
         } catch (IOException e) {
             // ignored
         }
@@ -27,6 +31,11 @@ public class RemoveNodeLog extends Log {
 
     @Override
     public void deserialize(ByteBuffer buffer) {
+        setPreviousLogIndex(buffer.getLong());
+        setPreviousLogTerm(buffer.getLong());
+        setCurrLogIndex(buffer.getLong());
+        setCurrLogTerm(buffer.getLong());
+
         removedNode = new Node();
         SerializeUtils.deserialize(removedNode, buffer);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 3c00dd5..333c710 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -53,17 +53,23 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
   private List<Node> previousHolders;
   // the header of the old members
   private Node header;
+  // set to true if the previous holder has been removed from the cluster.
+  // This will make the previous holder read-only so that different new
+  // replicas can pull the same snapshot.
+  private boolean requireReadOnly;
 
   private PullSnapshotRequest request;
   private SnapshotFactory snapshotFactory;
 
   public PullSnapshotTask(Node header, List<Integer> slots,
-      DataGroupMember newMember, List<Node> previousHolders, SnapshotFactory snapshotFactory) {
+      DataGroupMember newMember, List<Node> previousHolders, SnapshotFactory snapshotFactory,
+      boolean requireReadOnly) {
     this.header = header;
     this.slots = slots;
     this.newMember = newMember;
     this.previousHolders = previousHolders;
     this.snapshotFactory = snapshotFactory;
+    this.requireReadOnly = requireReadOnly;
   }
 
   private boolean pullSnapshot(AtomicReference<Map<Integer, T>> snapshotRef, int nodeIndex)
@@ -103,6 +109,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
     request = new PullSnapshotRequest();
     request.setHeader(header);
     request.setRequiredSlots(slots);
+    request.setRequireReadOnly(requireReadOnly);
     AtomicReference<Map<Integer, T>> snapshotRef = new AtomicReference<>();
     boolean finished = false;
     int nodeIndex = -1;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
index b1d7e31..03c9593 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
@@ -317,7 +317,6 @@ public class SlotPartitionTable implements PartitionTable {
     return previousNodeMap.get(node);
   }
 
-
   @Override
   public List<Integer> getNodeSlots(Node header) {
     return nodeSlotMap.get(header);
@@ -377,7 +376,6 @@ public class SlotPartitionTable implements PartitionTable {
         } else {
           PartitionGroup newGrp = getHeaderGroup(header);
           localGroups.set(i, newGrp);
-          result.setNewGroup(newGrp);
         }
       }
       if (removedGroupIdx != -1) {
@@ -391,6 +389,7 @@ public class SlotPartitionTable implements PartitionTable {
         Node header = nodeRing.get(headerNodeIdx);
         PartitionGroup newGrp = getHeaderGroup(header);
         localGroups.add(newGrp);
+        result.setNewGroup(newGrp);
       }
 
       // the slots movement is only done logically, the new node itself will pull data from the
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManager.java
index 3ec5863..7f7f2fb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.query.manage;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -94,4 +95,14 @@ public class ClusterQueryManager {
   public IReaderByTimestamp getReaderByTimestamp(long readerId) {
     return seriesReaderByTimestampMap.get(readerId);
   }
+
+  public void endAllQueries() throws StorageEngineException {
+    for (Map<Long, RemoteQueryContext> contextMap : queryContextMap.values()) {
+      for (RemoteQueryContext context : contextMap.values()) {
+        QueryResourceManager.getInstance().endQuery(context.getQueryId());
+      }
+    }
+    seriesReaderByTimestampMap.clear();
+    seriesReaderMap.clear();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 3a89b66..27deddf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.server;
 
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -46,6 +47,7 @@ import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncProcessor;
+import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -69,6 +71,10 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   public void addDataGroupMember(DataGroupMember dataGroupMember) {
+    DataGroupMember removedMember = headerGroupMap.remove(dataGroupMember.getHeader());
+    if (removedMember != null) {
+      removedMember.stop();
+    }
     headerGroupMap.put(dataGroupMember.getHeader(), dataGroupMember);
   }
 
@@ -319,6 +325,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
             List<Integer> nodeSlots = partitionTable.getNodeSlots(dataGroupMember.getHeader());
             dataGroupMember.removeLocalData(nodeSlots);
             entryIterator.remove();
+            dataGroupMember.stop();
           } else {
             // the group should be updated and pull new slots from the removed node
             dataGroupMember.removeNode(node, removalResult);
@@ -327,6 +334,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       }
       PartitionGroup newGroup = removalResult.getNewGroup();
       if (newGroup != null) {
+        logger.info("{} should join a new group {}", thisNode, newGroup);
         try {
           createNewMember(newGroup.getHeader());
         } catch (NotInSameGroupException | TTransportException e) {
@@ -360,5 +368,11 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
   }
 
-
+  public List<DataMemberReport> genMemberReports() {
+    List<DataMemberReport> dataMemberReports = new ArrayList<>();
+    for (DataGroupMember value : headerGroupMap.values()) {
+      dataMemberReports.add(value.genReport());
+    }
+    return dataMemberReports;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 61b1042..c3957b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -178,7 +178,12 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) throws TException {
+  public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
     member.removeNode(node, resultHandler);
   }
+
+  @Override
+  public void exile(AsyncMethodCallback<Void> resultHandler) {
+    member.exile(resultHandler);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
new file mode 100644
index 0000000..7dacae6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
@@ -0,0 +1,111 @@
+package org.apache.iotdb.cluster.server;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+/**
+ * A node report collects the current runtime information of the local node, which contains:
+ * 1. The MetaMemberReport of the meta member.
+ * 2. The DataMemberReports of each data member.
+ */
+public class NodeReport {
+
+  private Node thisNode;
+  private MetaMemberReport metaMemberReport;
+  private List<DataMemberReport> dataMemberReportList;
+
+  public NodeReport(Node thisNode) {
+    this.thisNode = thisNode;
+    dataMemberReportList = new ArrayList<>();
+  }
+
+  public void setMetaMemberReport(
+      MetaMemberReport metaMemberReport) {
+    this.metaMemberReport = metaMemberReport;
+  }
+
+  public void setDataMemberReportList(
+      List<DataMemberReport> dataMemberReportList) {
+    this.dataMemberReportList = dataMemberReportList;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append("Report of ").append(thisNode).append(System.lineSeparator());
+    stringBuilder.append(metaMemberReport).append(System.lineSeparator());
+    for (DataMemberReport dataMemberReport : dataMemberReportList) {
+      stringBuilder.append(dataMemberReport).append(System.lineSeparator());
+    }
+    return stringBuilder.toString();
+  }
+
+  /**
+   * A RaftMemberReport contains the character, leader, term, last log term/index of a raft member.
+   */
+  public static class RaftMemberReport {
+    NodeCharacter character;
+    Node leader;
+    long term;
+    long lastLogTerm;
+    long lastLogIndex;
+    boolean isReadOnly;
+
+    public RaftMemberReport(NodeCharacter character, Node leader, long term, long lastLogTerm,
+        long lastLogIndex, boolean isReadOnly) {
+      this.character = character;
+      this.leader = leader;
+      this.term = term;
+      this.lastLogTerm = lastLogTerm;
+      this.lastLogIndex = lastLogIndex;
+      this.isReadOnly = isReadOnly;
+    }
+  }
+
+  public static class MetaMemberReport extends RaftMemberReport {
+
+    public MetaMemberReport(NodeCharacter character, Node leader, long term, long lastLogTerm,
+        long lastLogIndex, boolean isReadOnly) {
+      super(character, leader, term, lastLogTerm, lastLogIndex, isReadOnly);
+    }
+
+    @Override
+    public String toString() {
+      return "MetaMemberReport{" +
+          "character=" + character +
+          ", Leader=" + leader +
+          ", term=" + term +
+          ", lastLogTerm=" + lastLogTerm +
+          ", lastLogIndex=" + lastLogIndex +
+          ", readOnly=" + isReadOnly +
+          '}';
+    }
+  }
+
+  /**
+   * A DataMemberReport additionally contains header.
+   */
+  public static class DataMemberReport extends RaftMemberReport {
+    Node header;
+
+    public DataMemberReport(NodeCharacter character, Node leader, long term, long lastLogTerm,
+        long lastLogIndex, Node header, boolean isReadOnly) {
+      super(character, leader, term, lastLogTerm, lastLogIndex, isReadOnly);
+      this.header = header;
+    }
+
+    @Override
+    public String toString() {
+      return "DataMemberReport{" +
+          "header=" + header +
+          ", character=" + character +
+          ", Leader=" + leader +
+          ", term=" + term +
+          ", lastLogTerm=" + lastLogTerm +
+          ", lastLogIndex=" + lastLogIndex +
+          ", readOnly=" + isReadOnly +
+          '}';
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
index 893bec5..07b8c76 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
@@ -119,9 +119,6 @@ public class AppendGroupEntryHandler implements AsyncMethodCallback<Long> {
 
   @Override
   public void onError(Exception exception) {
-    synchronized (groupReceivedCounter) {
-      logger.error("Cannot send the add node request to node {}", receiverNode, exception);
-      groupReceivedCounter.notifyAll();
-    }
+    logger.error("Cannot send the add node request to node {}", receiverNode, exception);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartBeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartBeatHandler.java
index 7834e4f..cf225db 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartBeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartBeatHandler.java
@@ -49,7 +49,7 @@ public class HeartBeatHandler implements AsyncMethodCallback<HeartBeatResponse>
 
   @Override
   public void onComplete(HeartBeatResponse resp) {
-    logger.debug("{}: Received a heartbeat response", memberName);
+    logger.trace("{}: Received a heartbeat response", memberName);
     long followerTerm = resp.getTerm();
     if (followerTerm == RESPONSE_AGREE) {
       // current leadership is still valid
@@ -60,7 +60,7 @@ public class HeartBeatHandler implements AsyncMethodCallback<HeartBeatResponse>
       long lastLogTerm = resp.getLastLogTerm();
       long localLastLogIdx = localMember.getLogManager().getLastLogIndex();
       long localLastLogTerm = localMember.getLogManager().getLastLogTerm();
-      logger.debug("{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
+      logger.trace("{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
           memberName, follower, lastLogIdx
           ,localLastLogIdx, lastLogTerm, localLastLogTerm);
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartBeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartBeatThread.java
index f96815d..6df5c4a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartBeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartBeatThread.java
@@ -178,6 +178,7 @@ public class HeartBeatThread implements Runnable {
 
       requestVote(localMember.getAllNodes(), electionRequest, nextTerm, quorum,
           electionTerminated, electionValid);
+      electionRequest.unsetLastLogIndex();
 
       try {
         logger.info("{}: Wait for {}ms until election time out", memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThread.java
index 735e0db..e969bbc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThread.java
@@ -58,5 +58,6 @@ public class MetaHeartBeatThread extends HeartBeatThread {
     }
 
     super.sendHeartbeat(node, client);
+    request.unsetPartitionTableBytes();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 305158a..cb01ec5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
 import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
@@ -144,6 +145,11 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
     super.stop();
     pullSnapshotService.shutdownNow();
     pullSnapshotService = null;
+    try {
+      queryManager.endAllQueries();
+    } catch (StorageEngineException e) {
+      logger.error("Cannot release queries of {}", name, e);
+    }
   }
 
   /**
@@ -452,29 +458,26 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       }
       return;
     }
+    if (request.isRequireReadOnly()) {
+      setReadOnly();
+    }
 
     // this synchronized should work with the one in AppendEntry when a log is going to commit,
     // which may prevent the newly arrived data from being invisible to the new header.
     synchronized (logManager) {
       List<Integer> requiredSlots = request.getRequiredSlots();
       logger.debug("{}: {} slots are requested", name, requiredSlots.size());
-      // check whether this slot is held by the node
-      List<Integer> heldSlots = metaGroupMember.getPartitionTable().getNodeSlots(getHeader());
 
       PullSnapshotResp resp = new PullSnapshotResp();
       Map<Integer, ByteBuffer> resultMap = new HashMap<>();
       logManager.takeSnapshot();
 
+      PartitionedSnapshot allSnapshot = (PartitionedSnapshot) logManager.getSnapshot();
       for (int requiredSlot : requiredSlots) {
-        if (!heldSlots.contains(requiredSlot)) {
-          // logger.debug("{}: the required slot {} is not held by the node", name,
-          // requiredSlot);
-          continue;
-        }
-
-        PartitionedSnapshot allSnapshot = (PartitionedSnapshot) logManager.getSnapshot();
         Snapshot snapshot = allSnapshot.getSnapshot(requiredSlot);
-        resultMap.put(requiredSlot, snapshot.serialize());
+        if (snapshot != null) {
+          resultMap.put(requiredSlot, snapshot.serialize());
+        }
       }
       resp.setSnapshotBytes(resultMap);
       logger.debug("{}: Sending {} snapshots to the requester", name, resultMap.size());
@@ -507,15 +510,23 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       for (Entry<Node, List<Integer>> entry : holderSlotsMap.entrySet()) {
         Node node = entry.getKey();
         List<Integer> nodeSlots = entry.getValue();
-        pullFileSnapshot(metaGroupMember.getPartitionTable().getHeaderGroup(node),  nodeSlots);
+        pullFileSnapshot(metaGroupMember.getPartitionTable().getHeaderGroup(node),  nodeSlots, false);
       }
     }
   }
 
-  private void pullFileSnapshot(PartitionGroup prevHolders,  List<Integer> nodeSlots) {
+  /**
+   *
+   * @param prevHolders
+   * @param nodeSlots
+   * @param requireReadOnly set to true if the previous holder has been removed from the cluster.
+   *                       This will make the previous holder read-only so that different new
+   *                        replicas can pull the same snapshot.
+   */
+  private void pullFileSnapshot(PartitionGroup prevHolders,  List<Integer> nodeSlots, boolean requireReadOnly) {
     Future<Map<Integer, FileSnapshot>> snapshotFuture =
         pullSnapshotService.submit(new PullSnapshotTask(prevHolders.getHeader(), nodeSlots, this,
-            prevHolders, FileSnapshot::new));
+            prevHolders, FileSnapshot::new, requireReadOnly));
     for (int slot : nodeSlots) {
       logManager.setSnapshot(new RemoteFileSnapshot(snapshotFuture), slot);
     }
@@ -759,7 +770,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
   }
 
   /**
-   * When a node is removed and it is not the header of the group, the member should take over
+   * When a node is removed and IT IS NOT THE HEADER of the group, the member should take over
    * some slots from the removed group, and add a new node to the group the removed node was in the
    * group.
    */
@@ -768,13 +779,24 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
         allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+        if (removedNode.equals(leader)) {
+          synchronized (term) {
+            setCharacter(NodeCharacter.ELECTOR);
+            setLastHeartBeatReceivedTime(Long.MIN_VALUE);
+          }
+        }
       }
       List<Integer> slotsToPull = removalResult.getNewSlotOwners().get(getHeader());
       if (slotsToPull != null) {
         // pull the slots that should be taken over
-        pullFileSnapshot(removalResult.getRemovedGroup(), slotsToPull);
+        pullFileSnapshot(removalResult.getRemovedGroup(), slotsToPull, true);
       }
     }
   }
+
+  public DataMemberReport genReport() {
+    return new DataMemberReport(character, leader, term.get(),
+        logManager.getLastLogTerm(), logManager.getLastLogIndex(), getHeader(), readOnly);
+  }
 }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 5cdf481..9069a95 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -35,6 +35,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -42,6 +43,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -80,6 +84,8 @@ import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
 import org.apache.iotdb.cluster.server.ClientServer;
 import org.apache.iotdb.cluster.server.DataClusterServer;
 import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.cluster.server.NodeReport;
+import org.apache.iotdb.cluster.server.NodeReport.MetaMemberReport;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendGroupEntryHandler;
@@ -128,6 +134,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
 
   private static final Logger logger = LoggerFactory.getLogger(MetaGroupMember.class);
   private static final int DEFAULT_JOIN_RETRY = 10;
+  private static final int REPORT_INTERVAL_SEC = 10;
   public static final int REPLICATION_NUM =
       ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum();
 
@@ -147,6 +154,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
 
   private ClientPool dataClientPool;
 
+  private ScheduledExecutorService reportThread;
+
   @TestOnly
   public MetaGroupMember() {
   }
@@ -220,6 +229,10 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
     queryProcessor = new ClusterQueryParser(this);
     QueryCoordinator.getINSTANCE().setMetaGroupMember(this);
     StorageEngine.getInstance().setFileFlushPolicy(new ClusterFileFlushPolicy(this));
+    reportThread = Executors.newSingleThreadScheduledExecutor(n -> new Thread(n,
+        "NodeReportThread"));
+    reportThread.scheduleAtFixedRate(()-> logger.info(genNodeReport().toString()),
+        REPORT_INTERVAL_SEC, REPORT_INTERVAL_SEC, TimeUnit.SECONDS);
   }
 
   @Override
@@ -229,6 +242,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
       getDataClusterServer().stop();
       clientServer.stop();
     }
+    reportThread.shutdownNow();
   }
 
   private void initSubServers() throws TTransportException, StartupException {
@@ -634,9 +648,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
     // each node will form a group
     int nodeSize = nodeRing.size();
     int[] groupRemainings = new int[nodeSize];
-    for (int i = 0; i < groupRemainings.length; i++) {
-      groupRemainings[i] = groupQuorum;
-    }
+    Arrays.fill(groupRemainings, groupQuorum);
 
     AtomicLong newLeaderTerm = new AtomicLong(term.get());
     AtomicBoolean leaderShipStale = new AtomicBoolean(false);
@@ -1296,12 +1308,12 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
 
         switch (result) {
           case OK:
-            logger.info("Join request of {} is accepted", target);
+            logger.info("Removal request of {} is accepted", target);
             logManager.commitLog(removeNodeLog.getCurrLogIndex());
             resultHandler.onComplete(Response.RESPONSE_AGREE);
             return true;
           case TIME_OUT:
-            logger.info("Join request of {} timed out", target);
+            logger.info("Removal request of {} timed out", target);
             resultHandler.onError(new RequestTimeOutException(removeNodeLog));
             logManager.removeLastLog();
             return true;
@@ -1326,11 +1338,57 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
         NodeRemovalResult result = partitionTable.removeNode(oldNode);
 
         getDataClusterServer().removeNode(oldNode, result);
-        if (oldNode == leader) {
+        if (oldNode.equals(leader)) {
           setCharacter(NodeCharacter.ELECTOR);
           lastHeartBeatReceivedTime = Long.MIN_VALUE;
         }
+
+        if (oldNode.equals(thisNode)) {
+          // use super.stop() so that the data server will not be closed
+          super.stop();
+          if (clientServer != null) {
+            clientServer.stop();
+          }
+        } else if (thisNode.equals(leader)) {
+          // as the old node is removed, it cannot know this by heartbeat, so it should be
+          // directly kicked out of the cluster
+          MetaClient metaClient = (MetaClient) connectNode(oldNode);
+          try {
+            metaClient.exile(new GenericHandler<>(oldNode, null));
+          } catch (TException e) {
+            logger.warn("Cannot inform {} its removal", oldNode, e);
+          }
+        }
+
+        savePartitionTable();
       }
     }
   }
+
+  @Override
+  public void exile(AsyncMethodCallback<Void> resultHandler) {
+    applyRemoveNode(thisNode);
+    resultHandler.onComplete(null);
+  }
+
+  private MetaMemberReport genMemberReport() {
+    return new MetaMemberReport(character, leader, term.get(),
+        logManager.getLastLogTerm(), logManager.getLastLogIndex(), readOnly);
+  }
+
+  private NodeReport genNodeReport() {
+    NodeReport report = new NodeReport(thisNode);
+    report.setMetaMemberReport(genMemberReport());
+    report.setDataMemberReportList(dataClusterServer.genMemberReports());
+    return report;
+  }
+
+  @Override
+  public void setAllNodes(List<Node> allNodes) {
+    super.setAllNodes(allNodes);
+    idNodeMap = new HashMap<>();
+    for (Node node : allNodes) {
+      idNodeMap.put(node.getNodeIdentifier(), node);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index f4789bf..ea021b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -25,10 +25,7 @@ import java.io.DataOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -108,7 +105,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
   // when the header of the group is removed from the cluster, the members of the group should no
   // longer accept writes, but they still can be read candidates for weak consistency reads and
   // provide snapshots for the new holders
-  private volatile boolean readOnly = false;
+  volatile boolean readOnly = false;
 
   public RaftMember() {
   }
@@ -145,11 +142,12 @@ public abstract class RaftMember implements RaftService.AsyncIface {
     catchUpService.shutdownNow();
     catchUpService = null;
     heartBeatService = null;
+    logger.info("{} stopped", name);
   }
 
   @Override
   public void sendHeartBeat(HeartBeatRequest request, AsyncMethodCallback resultHandler) {
-    logger.debug("{} received a heartbeat", name);
+    logger.trace("{} received a heartbeat", name);
     synchronized (term) {
       long thisTerm = term.get();
       long leaderTerm = request.getTerm();
@@ -158,8 +156,8 @@ public abstract class RaftMember implements RaftService.AsyncIface {
       if (leaderTerm < thisTerm) {
         // the leader is invalid
         response.setTerm(thisTerm);
-        if (logger.isDebugEnabled()) {
-          logger.debug("{} received a heartbeat from a stale leader {}", name, request.getLeader());
+        if (logger.isTraceEnabled()) {
+          logger.trace("{} received a heartbeat from a stale leader {}", name, request.getLeader());
         }
       } else {
         processValidHeartbeatReq(request, response);
@@ -184,8 +182,8 @@ public abstract class RaftMember implements RaftService.AsyncIface {
           setCharacter(NodeCharacter.FOLLOWER);
         }
         setLastHeartBeatReceivedTime(System.currentTimeMillis());
-        if (logger.isDebugEnabled()) {
-          logger.debug("{} received heartbeat from a valid leader {}", name, request.getLeader());
+        if (logger.isTraceEnabled()) {
+          logger.trace("{} received heartbeat from a valid leader {}", name, request.getLeader());
         }
       }
       resultHandler.onComplete(response);
@@ -445,8 +443,10 @@ public abstract class RaftMember implements RaftService.AsyncIface {
   }
 
   public void setLeader(Node leader) {
-    logger.info("{} has become a {}", leader, character);
-    this.leader = leader;
+    if (!Objects.equals(leader, this.leader)) {
+      logger.info("{} has become a follower of {}", getName(), leader);
+      this.leader = leader;
+    }
   }
 
   public Node getThisNode() {
@@ -813,4 +813,8 @@ public abstract class RaftMember implements RaftService.AsyncIface {
       readOnly = true;
     }
   }
+
+  public void setAllNodes(List<Node> allNodes) {
+    this.allNodes = allNodes;
+  }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index de2622e..cba67cc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.junit.Test;
@@ -77,4 +78,18 @@ public class LogParserTest {
     Log serialized = logParser.parse(buffer);
     assertEquals(log, serialized);
   }
+
+  @Test
+  public void testRemoveNodeLog() throws UnknownLogTypeException {
+    RemoveNodeLog log = new RemoveNodeLog();
+    log.setRemovedNode(TestUtils.getNode(0));
+    log.setCurrLogIndex(8);
+    log.setCurrLogTerm(8);
+    log.setPreviousLogIndex(7);
+    log.setPreviousLogTerm(7);
+
+    ByteBuffer buffer = log.serialize();
+    Log serialized = logParser.parse(buffer);
+    assertEquals(log, serialized);
+  }
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java
index 16f1bab..55196a3 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java
@@ -90,8 +90,9 @@ public class PullSnapshotTaskTest {
       slots.add(i);
       previousHolders.add(TestUtils.getNode(i + 1));
     }
-    PullSnapshotTask<TestSnapshot> task = new PullSnapshotTask<>(TestUtils.getNode(0), slots,
-        newMember, previousHolders, TestSnapshot::new);
+
+    PullSnapshotTask<TestSnapshot> task = new PullSnapshotTask<TestSnapshot>(TestUtils.getNode(0), slots,
+        newMember, previousHolders, TestSnapshot::new, false);
     Map<Integer, TestSnapshot> result = task.call();
     assertEquals(snapshotMap, result);
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 0e4ac6f..e3f0fee 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
+import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogParser;
@@ -38,6 +39,10 @@ public class SerializeLogTest {
   @Test
   public void testPhysicalPlanLog() throws UnknownLogTypeException {
     PhysicalPlanLog log = new PhysicalPlanLog();
+    log.setPreviousLogIndex(1);
+    log.setPreviousLogTerm(1);
+    log.setCurrLogIndex(2);
+    log.setCurrLogTerm(2);
     InsertPlan plan = new InsertPlan();
     plan.setDeviceId("root.d1");
     plan.setMeasurements(new String[]{"s1,s2,s3"});
@@ -59,6 +64,10 @@ public class SerializeLogTest {
   @Test
   public void testAddNodeLog() throws UnknownLogTypeException {
     AddNodeLog log = new AddNodeLog();
+    log.setPreviousLogIndex(1);
+    log.setPreviousLogTerm(1);
+    log.setCurrLogIndex(2);
+    log.setCurrLogTerm(2);
     log.setNewNode(new Node("apache.iotdb.com", 1234, 1, 4321));
     ByteBuffer byteBuffer = log.serialize();
     Log logPrime = LogParser.getINSTANCE().parse(byteBuffer);
@@ -68,6 +77,10 @@ public class SerializeLogTest {
   @Test
   public void testCloseFileLog() throws UnknownLogTypeException {
     CloseFileLog log = new CloseFileLog("root.sg1", true);
+    log.setPreviousLogIndex(1);
+    log.setPreviousLogTerm(1);
+    log.setCurrLogIndex(2);
+    log.setCurrLogTerm(2);
     ByteBuffer byteBuffer = log.serialize();
     CloseFileLog logPrime = (CloseFileLog) LogParser.getINSTANCE().parse(byteBuffer);
     assertTrue(logPrime.isSeq());
@@ -75,4 +88,17 @@ public class SerializeLogTest {
     assertEquals(log, logPrime);
   }
 
+  @Test
+  public void testRemoveNodeLog() throws UnknownLogTypeException {
+    RemoveNodeLog log = new RemoveNodeLog();
+    log.setPreviousLogIndex(1);
+    log.setPreviousLogTerm(1);
+    log.setCurrLogIndex(2);
+    log.setCurrLogTerm(2);
+    log.setRemovedNode(TestUtils.getNode(0));
+    ByteBuffer byteBuffer = log.serialize();
+    RemoveNodeLog logPrime = (RemoveNodeLog) LogParser.getINSTANCE().parse(byteBuffer);
+    assertEquals(log, logPrime);
+  }
+
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java
index 1ab526e..9bc08cf 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java
@@ -19,12 +19,16 @@
 package org.apache.iotdb.cluster.log.partition;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.IntStream;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -40,11 +44,15 @@ public class SlotPartitionTableTest {
   @Before
   public void setUp() {
     List<Node> nodes = new ArrayList<>();
-    IntStream.range(0, 20).forEach(i -> nodes.add(new Node("localhost", 30000 + i, i, 40000 + i)));
+    IntStream.range(0, 20).forEach(i -> nodes.add(getNode(i)));
     ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(replica_size);
     table = new SlotPartitionTable(nodes, nodes.get(3));
   }
 
+  private Node getNode(int i) {
+    return new Node("localhost", 30000 + i, i, 40000 + i);
+  }
+
   @After
   public void tearDown() {
     ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(3);
@@ -105,4 +113,26 @@ public class SlotPartitionTableTest {
   @Test
   public void getAllNodeSlots() {
   }
+
+  @Test
+  public void testRemoveNode() {
+    List<Integer> nodeSlots = table.getNodeSlots(getNode(0));
+    NodeRemovalResult nodeRemovalResult = table.removeNode(getNode(0));
+    assertFalse(table.getAllNodes().contains(getNode(0)));
+    PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup();
+    for (int i = 0; i < 5; i++) {
+      assertTrue(removedGroup.contains(getNode(i)));
+    }
+    PartitionGroup newGroup = nodeRemovalResult.getNewGroup();
+    for (int i : new int[] {18, 19, 1, 2, 3}) {
+      assertTrue(newGroup.contains(getNode(i)));
+    }
+    // the slots owned by the removed one should be redistributed to other nodes
+    Map<Node, List<Integer>> newSlotOwners = nodeRemovalResult.getNewSlotOwners();
+    for (List<Integer> slots : newSlotOwners.values()) {
+      assertTrue(nodeSlots.containsAll(slots));
+      nodeSlots.removeAll(slots);
+    }
+    assertTrue(nodeSlots.isEmpty());
+  }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
deleted file mode 100644
index 73c2cee..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.snapshot;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.cluster.client.ClientPool;
-import org.apache.iotdb.cluster.client.DataClient;
-import org.apache.iotdb.cluster.common.TestSnapshot;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Snapshot;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
-import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.server.member.DataGroupMember;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.junit.Before;
-import org.junit.Test;
-
-public class PullSnapshotTaskTest {
-
-  private ClientPool clientPool = new ClientPool(null);
-  private Map<Integer, Snapshot> snapshotMap = new HashMap<>();
-  private DataGroupMember newMember = new DataGroupMember() {
-    @Override
-    public AsyncClient connectNode(Node node) {
-      try {
-        return new DataClient(new TBinaryProtocol.Factory(), new TAsyncClientManager(),
-            node, clientPool) {
-          @Override
-          public void pullSnapshot(PullSnapshotRequest request,
-              AsyncMethodCallback<PullSnapshotResp> resultHandler) {
-            new Thread(() -> {
-              PullSnapshotResp resp = new PullSnapshotResp();
-              Map<Integer, ByteBuffer> snapshotBytes = new HashMap<>();
-              for (Entry<Integer, Snapshot> entry : snapshotMap.entrySet()) {
-                snapshotBytes.put(entry.getKey(), entry.getValue().serialize());
-              }
-              resp.setSnapshotBytes(snapshotBytes);
-              resultHandler.onComplete(resp);
-            }).start();
-          }
-        };
-      } catch (IOException e) {
-        return null;
-      }
-    }
-  };
-
-  @Before
-  public void setUp() {
-    for (int i = 0; i < 10; i++) {
-      snapshotMap.put(i, new TestSnapshot(i));
-    }
-  }
-
-  @Test
-  public void test() {
-    List<Integer> slots = new ArrayList<>();
-    List<Node> previousHolders = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      slots.add(i);
-      previousHolders.add(TestUtils.getNode(i + 1));
-    }
-    PullSnapshotTask<TestSnapshot> task = new PullSnapshotTask<>(TestUtils.getNode(0), slots,
-        newMember, previousHolders, TestSnapshot::new);
-    Map<Integer, TestSnapshot> result = task.call();
-    assertEquals(snapshotMap, result);
-  }
-}
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/SnapshotSerializeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/SnapshotSerializeTest.java
deleted file mode 100644
index c1eee48..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/SnapshotSerializeTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.snapshot;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.iotdb.cluster.common.TestSnapshot;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.Snapshot;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.Test;
-
-public class SnapshotSerializeTest {
-
-  @Test
-  public void testFileSnapshot() {
-    Set<MeasurementSchema> measurementSchemaList = new HashSet<>();
-    FileSnapshot snapshot = new FileSnapshot();
-    for (int i = 0; i < 10; i++) {
-      measurementSchemaList.add(TestUtils.getTestSchema(i, i));
-      TsFileResource tsFileResource = new TsFileResource(new File("TsFile" + i));
-      tsFileResource.setHistoricalVersions(Collections.singleton((long) i));
-      snapshot.addFile(tsFileResource, TestUtils.getNode(i));
-    }
-    snapshot.setTimeseriesSchemas(measurementSchemaList);
-
-    ByteBuffer byteBuffer = snapshot.serialize();
-    FileSnapshot deserializedSnapshot = new FileSnapshot();
-    deserializedSnapshot.deserialize(byteBuffer);
-    assertEquals(snapshot, deserializedSnapshot);
-  }
-
-  @Test
-  public void testMetaSimpleSnapshot() {
-    List<Log> logs = new ArrayList<>();
-    List<String> sgs = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      MeasurementSchema schema = TestUtils.getTestSchema(i, 0);
-      CreateTimeSeriesPlan createTimeSeriesPlan =
-          new CreateTimeSeriesPlan(new Path(schema.getMeasurementId()), schema.getType(),
-              schema.getEncodingType(), schema.getCompressor(), schema.getProps());
-      PhysicalPlanLog log = new PhysicalPlanLog();
-      log.setPlan(createTimeSeriesPlan);
-      logs.add(log);
-      log.setPreviousLogTerm(i - 1);
-      log.setPreviousLogIndex(i - 1);
-      log.setCurrLogIndex(i);
-      log.setCurrLogTerm(i);
-      sgs.add(TestUtils.getTestSg(i));
-    }
-    MetaSimpleSnapshot snapshot = new MetaSimpleSnapshot(logs, sgs);
-    snapshot.setLastLogId(10);
-    snapshot.setLastLogTerm(10);
-
-    ByteBuffer byteBuffer = snapshot.serialize();
-    MetaSimpleSnapshot deserializedSnapshot = new MetaSimpleSnapshot();
-    deserializedSnapshot.deserialize(byteBuffer);
-    assertEquals(snapshot, deserializedSnapshot);
-  }
-
-  @Test
-  public void testPartitionedSnapshot() {
-    PartitionedSnapshot<TestSnapshot> snapshot = new PartitionedSnapshot<>(TestSnapshot::new);
-    snapshot.setLastLogTerm(10);
-    snapshot.setLastLogId(10);
-
-    Map<Integer, Snapshot> slotSnapshots = new HashMap<>();
-    List<Integer> slots = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      TestSnapshot s = new TestSnapshot();
-      slotSnapshots.put(i, s);
-      snapshot.putSnapshot(i, s);
-      slots.add(i);
-    }
-
-    for (Integer slot : slots) {
-      TestSnapshot s = (TestSnapshot) snapshot.getSnapshot(slot);
-      assertTrue(slotSnapshots.values().contains(s));
-    }
-
-    PartitionedSnapshot subSnapshots = snapshot.getSubSnapshots(slots);
-    for (Integer slot : slots) {
-      TestSnapshot s = (TestSnapshot) subSnapshots.getSnapshot(slot);
-      assertTrue(slotSnapshots.values().contains(s));
-    }
-
-    ByteBuffer byteBuffer = snapshot.serialize();
-    PartitionedSnapshot deserializedSnapshot = new PartitionedSnapshot(TestSnapshot::new);
-    deserializedSnapshot.deserialize(byteBuffer);
-    assertEquals(snapshot, deserializedSnapshot);
-  }
-
-  @Test
-  public void testRemoteFileSnapshot() {
-    AtomicBoolean remoteSnapshotGet = new AtomicBoolean(false);
-    RemoteFileSnapshot snapshot = new RemoteFileSnapshot(
-        new Future<Void>() {
-          @Override
-          public boolean cancel(boolean mayInterruptIfRunning) {
-            return false;
-          }
-
-          @Override
-          public boolean isCancelled() {
-            return false;
-          }
-
-          @Override
-          public boolean isDone() {
-            return false;
-          }
-
-          @Override
-          public Void get() {
-            remoteSnapshotGet.set(true);
-            return null;
-          }
-
-          @Override
-          public Void get(long timeout, TimeUnit unit) {
-            remoteSnapshotGet.set(true);
-            return null;
-          }
-        });
-
-    assertNull(snapshot.serialize());
-    snapshot.getRemoteSnapshot();
-    assertTrue(remoteSnapshotGet.get());
-  }
-
-  @Test
-  public void testSimpleSnapshot() {
-    List<Log> logs = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      MeasurementSchema schema = TestUtils.getTestSchema(i, 0);
-      CreateTimeSeriesPlan createTimeSeriesPlan =
-          new CreateTimeSeriesPlan(new Path(schema.getMeasurementId()), schema.getType(),
-              schema.getEncodingType(), schema.getCompressor(), schema.getProps());
-      PhysicalPlanLog log = new PhysicalPlanLog();
-      log.setPlan(createTimeSeriesPlan);
-      logs.add(log);
-      log.setPreviousLogTerm(i - 1);
-      log.setPreviousLogIndex(i - 1);
-      log.setCurrLogIndex(i);
-      log.setCurrLogTerm(i);
-    }
-    SimpleSnapshot snapshot = new SimpleSnapshot(new ArrayList<>(logs.subList(0, 5)));
-    snapshot.setLastLogId(10);
-    snapshot.setLastLogTerm(10);
-    for (Log log : logs.subList(5, logs.size())) {
-      snapshot.add(log);
-    }
-
-    ByteBuffer byteBuffer = snapshot.serialize();
-    SimpleSnapshot deserializedSnapshot = new SimpleSnapshot();
-    deserializedSnapshot.deserialize(byteBuffer);
-    assertEquals(snapshot, deserializedSnapshot);
-  }
-}
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 3caf096..52472c8 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -33,8 +33,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iotdb.cluster.RemoteTsFileResource;
@@ -49,6 +51,7 @@ import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.RemoteFileSnapshot;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -100,11 +103,12 @@ import org.junit.Test;
 public class DataGroupMemberTest extends MemberTest {
 
   private DataGroupMember dataGroupMember;
-  private DataGroupMember.Factory factory;
   private Map<Integer, FileSnapshot> snapshotMap;
   private Map<Integer, RemoteFileSnapshot> receivedSnapshots;
+  private Set<Integer> pulledSnapshots;
   private boolean hasInitialSnapshots;
   private boolean enableSyncLeader;
+  private int numSlotsToPull = 2;
 
   @Before
   public void setUp() throws Exception {
@@ -117,9 +121,10 @@ public class DataGroupMemberTest extends MemberTest {
       snapshotMap.put(i, fileSnapshot);
     }
     receivedSnapshots = new HashMap<>();
+    pulledSnapshots = new HashSet<>();
   }
 
-  private PartitionedSnapshotLogManager getLogManager() {
+  private PartitionedSnapshotLogManager getLogManager(PartitionGroup partitionGroup) {
     return new TestPartitionedLogManager(new DataLogApplier(testMetaMember),
         testMetaMember.getPartitionTable(), partitionGroup.getHeader(), FileSnapshot::new) {
       @Override
@@ -141,8 +146,8 @@ public class DataGroupMemberTest extends MemberTest {
   }
 
   private DataGroupMember getDataGroupMember(Node node) throws IOException {
-    return new DataGroupMember(new TCompactProtocol.Factory(), new PartitionGroup(partitionGroup)
-        , node, getLogManager(),
+    PartitionGroup nodes = partitionTable.getHeaderGroup(node);
+    return new DataGroupMember(new TCompactProtocol.Factory(), nodes, node, getLogManager(nodes),
         testMetaMember, new TAsyncClientManager()) {
       @Override
       public AsyncClient connectNode(Node node) {
@@ -173,7 +178,16 @@ public class DataGroupMemberTest extends MemberTest {
                 PullSnapshotResp resp = new PullSnapshotResp();
                 Map<Integer, ByteBuffer> snapshotBufferMap = new HashMap<>();
                 for (Integer requiredSlot : request.getRequiredSlots()) {
-                  snapshotBufferMap.put(requiredSlot, snapshotMap.get(requiredSlot).serialize());
+                  FileSnapshot fileSnapshot = snapshotMap.get(requiredSlot);
+                  if (fileSnapshot != null) {
+                    snapshotBufferMap.put(requiredSlot, fileSnapshot.serialize());
+                  }
+                  synchronized (dataGroupMember) {
+                    pulledSnapshots.add(requiredSlot);
+                    if (pulledSnapshots.size() == numSlotsToPull) {
+                      dataGroupMember.notifyAll();
+                    }
+                  }
                 }
                 resp.setSnapshotBytes(snapshotBufferMap);
                 resultHandler.onComplete(resp);
@@ -266,8 +280,8 @@ public class DataGroupMemberTest extends MemberTest {
     assertTrue(lastMember.addNode(newNodeInGroup));
 
     Node newNodeAfterGroup = TestUtils.getNode(101);
-    assertFalse(firstMember.addNode(newNodeInGroup));
-    assertFalse(midMember.addNode(newNodeInGroup));
+    assertFalse(firstMember.addNode(newNodeAfterGroup));
+    assertFalse(midMember.addNode(newNodeAfterGroup));
   }
 
   @Test
@@ -827,4 +841,60 @@ public class DataGroupMemberTest extends MemberTest {
     return resource;
   }
 
+  @Test
+  public void testRemoveLeader() throws TTransportException, InterruptedException {
+    Node nodeToRemove = TestUtils.getNode(10);
+    NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable()
+        .removeNode(nodeToRemove);
+    dataGroupMember.setLeader(nodeToRemove);
+    dataGroupMember.start();
+    numSlotsToPull = 2;
+
+    try {
+      synchronized (dataGroupMember) {
+        dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult);
+        dataGroupMember.wait(500);
+      }
+
+      assertEquals(NodeCharacter.ELECTOR, dataGroupMember.getCharacter());
+      assertEquals(Long.MIN_VALUE, dataGroupMember.getLastHeartBeatReceivedTime());
+      assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30)));
+      assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove));
+      List<Integer> newSlots = nodeRemovalResult.getNewSlotOwners().get(TestUtils.getNode(0));
+      assertEquals(newSlots.size(), pulledSnapshots.size());
+      for (Integer newSlot : newSlots) {
+        assertTrue(pulledSnapshots.contains(newSlot));
+      }
+    } finally {
+      dataGroupMember.stop();
+    }
+  }
+
+  @Test
+  public void testRemoveNonLeader() throws TTransportException, InterruptedException {
+    Node nodeToRemove = TestUtils.getNode(10);
+    NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable()
+        .removeNode(nodeToRemove);
+    dataGroupMember.setLeader(TestUtils.getNode(20));
+    dataGroupMember.start();
+    numSlotsToPull = 2;
+
+    try {
+      synchronized (dataGroupMember) {
+        dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult);
+        dataGroupMember.wait(500);
+      }
+
+      assertEquals(0, dataGroupMember.getLastHeartBeatReceivedTime());
+      assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30)));
+      assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove));
+      List<Integer> newSlots = nodeRemovalResult.getNewSlotOwners().get(TestUtils.getNode(0));
+      assertEquals(newSlots.size(), pulledSnapshots.size());
+      for (Integer newSlot : newSlots) {
+        assertTrue(pulledSnapshots.contains(newSlot));
+      }
+    } finally {
+      dataGroupMember.stop();
+    }
+  }
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 9fcb55e..392c60f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -43,11 +43,10 @@ public class MemberTest {
   MetaGroupMember testMetaMember;
   LogManager metaLogManager;
   PartitionTable partitionTable;
-  PartitionGroup partitionGroup;
   QueryProcessExecutor queryProcessExecutor;
 
   private List<String> prevUrls;
-  private List<Node> allNodes;
+  PartitionGroup allNodes;
 
   @Before
   public void setUp() throws Exception {
@@ -59,11 +58,6 @@ public class MemberTest {
     }
     ClusterDescriptor.getINSTANCE().getConfig().setSeedNodeUrls(testUrls);
 
-    partitionGroup = new PartitionGroup();
-    for (int i = 0; i < 100; i += 10) {
-      partitionGroup.add(TestUtils.getNode(i));
-    }
-
     metaLogManager = new TestLogManager();
     testMetaMember = new TestMetaGroupMember() {
       @Override
@@ -72,8 +66,8 @@ public class MemberTest {
       }
     };
 
-    allNodes = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
+    allNodes = new PartitionGroup();
+    for (int i = 0; i < 100; i += 10) {
       allNodes.add(TestUtils.getNode(i));
     }
     partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0), 100);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 60e0f73..0e2bd42 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -37,9 +37,8 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iotdb.cluster.client.ClientPool;
@@ -48,6 +47,7 @@ import org.apache.iotdb.cluster.common.TestMetaClient;
 import org.apache.iotdb.cluster.common.TestPartitionedLogManager;
 import org.apache.iotdb.cluster.common.TestSnapshot;
 import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
@@ -103,6 +103,7 @@ import org.apache.iotdb.tsfile.read.filter.ValueFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TCompactProtocol.Factory;
 import org.apache.thrift.transport.TTransportException;
@@ -117,14 +118,16 @@ public class MetaGroupMemberTest extends MemberTest {
   private DataClusterServer dataClusterServer;
   private AtomicLong dummyResponse;
   private boolean mockDataClusterServer;
+  private Node exiledNode;
 
   @Before
   public void setUp() throws Exception {
     super.setUp();
     dummyResponse = new AtomicLong(Response.RESPONSE_AGREE);
     metaGroupMember = getMetaGroupMember(TestUtils.getNode(0));
+    metaGroupMember.setAllNodes(allNodes);
     // a faked data member to respond requests
-    dataGroupMember = getDataGroupMember(partitionGroup, TestUtils.getNode(0));
+    dataGroupMember = getDataGroupMember(allNodes, TestUtils.getNode(0));
     dataGroupMember.setCharacter(LEADER);
     dataClusterServer = new DataClusterServer(TestUtils.getNode(0),
         new DataGroupMember.Factory(null, metaGroupMember, null, null) {
@@ -139,11 +142,12 @@ public class MetaGroupMemberTest extends MemberTest {
     metaGroupMember.getThisNode().setNodeIdentifier(0);
     mockDataClusterServer = false;
     QueryCoordinator.getINSTANCE().setMetaGroupMember(metaGroupMember);
+    exiledNode = null;
   }
 
   private DataGroupMember getDataGroupMember(PartitionGroup group, Node node) {
     return new DataGroupMember(null, group, node, new TestPartitionedLogManager(null,
-        partitionTable, partitionGroup.getHeader(), TestSnapshot::new),
+        partitionTable, group.getHeader(), TestSnapshot::new),
         metaGroupMember, null) {
       @Override
       public boolean syncLeader() {
@@ -250,8 +254,6 @@ public class MetaGroupMemberTest extends MemberTest {
           @Override
           public AsyncClient getClient(Node node) throws IOException {
             return new TestDataClient(node) {
-              private AtomicLong readerId = new AtomicLong();
-              private Map<Long, IReaderByTimestamp> readerByTimestampMap = new HashMap<>();
 
               @Override
               public void querySingleSeries(SingleSeriesQueryRequest request,
@@ -378,6 +380,19 @@ public class MetaGroupMemberTest extends MemberTest {
             public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
               new Thread(() -> resultHandler.onComplete(new TNodeStatus())).start();
             }
+
+            @Override
+            public void exile(AsyncMethodCallback<Void> resultHandler) {
+             new Thread(() -> exiledNode = node).start();
+            }
+
+            @Override
+            public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
+              new Thread(() -> {
+                metaGroupMember.applyRemoveNode(node);
+                resultHandler.onComplete(Response.RESPONSE_AGREE);
+              }).start();
+            }
           };
         } catch (IOException e) {
           return null;
@@ -920,4 +935,151 @@ public class MetaGroupMemberTest extends MemberTest {
     MetaGroupMember metaGroupMember = getMetaGroupMember(new Node());
     assertEquals(100, metaGroupMember.getThisNode().getNodeIdentifier());
   }
+
+  @Test
+  public void testRemoveNodeWithoutPartitionTable() throws InterruptedException {
+    metaGroupMember.setPartitionTable(null);
+    AtomicBoolean passed = new AtomicBoolean(false);
+    synchronized (metaGroupMember) {
+      metaGroupMember.removeNode(TestUtils.getNode(0), new AsyncMethodCallback<Long>() {
+        @Override
+        public void onComplete(Long aLong) {
+          synchronized (metaGroupMember) {
+            metaGroupMember.notifyAll();
+          }
+        }
+
+        @Override
+        public void onError(Exception e) {
+          new Thread(() -> {
+            synchronized (metaGroupMember) {
+              passed.set(e instanceof PartitionTableUnavailableException);
+              metaGroupMember.notifyAll();
+            }
+          }).start();
+        }
+      });
+      metaGroupMember.wait(500);
+    }
+
+    assertTrue(passed.get());
+  }
+
+  @Test
+  public void testRemoveThisNode() throws InterruptedException {
+    AtomicReference<Long> resultRef = new AtomicReference<>();
+    metaGroupMember.setLeader(metaGroupMember.getThisNode());
+    metaGroupMember.setCharacter(LEADER);
+    doRemoveNode(resultRef, metaGroupMember.getThisNode());
+    assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
+    assertFalse(metaGroupMember.getAllNodes().contains(metaGroupMember.getThisNode()));
+  }
+
+  @Test
+  public void testRemoveLeader() throws InterruptedException {
+    AtomicReference<Long> resultRef = new AtomicReference<>();
+    metaGroupMember.setLeader(TestUtils.getNode(40));
+    metaGroupMember.setCharacter(FOLLOWER);
+    doRemoveNode(resultRef, TestUtils.getNode(40));
+    assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
+    assertFalse(metaGroupMember.getAllNodes().contains(TestUtils.getNode(40)));
+    assertEquals(ELECTOR, metaGroupMember.getCharacter());
+    assertEquals(Long.MIN_VALUE, metaGroupMember.getLastHeartBeatReceivedTime());
+  }
+
+  @Test
+  public void testRemoveNonLeader() throws InterruptedException {
+    AtomicReference<Long> resultRef = new AtomicReference<>();
+    metaGroupMember.setLeader(TestUtils.getNode(40));
+    metaGroupMember.setCharacter(FOLLOWER);
+    doRemoveNode(resultRef, TestUtils.getNode(20));
+    assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
+    assertFalse(metaGroupMember.getAllNodes().contains(TestUtils.getNode(20)));
+    assertEquals(0, metaGroupMember.getLastHeartBeatReceivedTime());
+  }
+
+  @Test
+  public void testRemoveNodeAsLeader() throws InterruptedException {
+    AtomicReference<Long> resultRef = new AtomicReference<>();
+    metaGroupMember.setLeader(metaGroupMember.getThisNode());
+    metaGroupMember.setCharacter(LEADER);
+    doRemoveNode(resultRef, TestUtils.getNode(20));
+    assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
+    assertFalse(metaGroupMember.getAllNodes().contains(TestUtils.getNode(20)));
+    assertEquals(TestUtils.getNode(20), exiledNode);
+  }
+
+  @Test
+  public void testRemoveNonExistNode() throws InterruptedException {
+    AtomicBoolean passed = new AtomicBoolean(false);
+    metaGroupMember.setCharacter(LEADER);
+    metaGroupMember.setLeader(metaGroupMember.getThisNode());
+    synchronized (metaGroupMember) {
+      metaGroupMember.removeNode(TestUtils.getNode(120), new AsyncMethodCallback<Long>() {
+        @Override
+        public void onComplete(Long aLong) {
+          synchronized (metaGroupMember) {
+            passed.set(aLong.equals(Response.RESPONSE_REJECT));
+            metaGroupMember.notifyAll();
+          }
+        }
+
+        @Override
+        public void onError(Exception e) {
+          new Thread(() -> {
+            synchronized (metaGroupMember) {
+              e.printStackTrace();
+              metaGroupMember.notifyAll();
+            }
+          }).start();
+        }
+      });
+      metaGroupMember.wait(500);
+    }
+
+    assertTrue(passed.get());
+  }
+
+  @Test
+  public void testRemoveTooManyNodes() throws InterruptedException {
+    for (int i = 0; i < 7; i ++) {
+      AtomicReference<Long> resultRef = new AtomicReference<>();
+      metaGroupMember.setCharacter(LEADER);
+      doRemoveNode(resultRef, TestUtils.getNode(90 - i * 10));
+      assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
+      assertFalse(metaGroupMember.getAllNodes().contains(TestUtils.getNode(90 - i * 10)));
+    }
+    AtomicReference<Long> resultRef = new AtomicReference<>();
+    metaGroupMember.setCharacter(LEADER);
+    doRemoveNode(resultRef, TestUtils.getNode(20));
+    assertEquals(Response.RESPONSE_CLUSTER_TOO_SMALL, (long) resultRef.get());
+    assertTrue(metaGroupMember.getAllNodes().contains(TestUtils.getNode(20)));
+  }
+
+  private void doRemoveNode(AtomicReference<Long> resultRef, Node nodeToRemove) throws InterruptedException {
+    synchronized (resultRef) {
+      metaGroupMember.removeNode(nodeToRemove, new AsyncMethodCallback<Long>() {
+        @Override
+        public void onComplete(Long o) {
+          new Thread(() -> {
+            synchronized (resultRef) {
+              resultRef.set(o);
+              resultRef.notifyAll();
+            }
+          }).start();
+        }
+
+        @Override
+        public void onError(Exception e) {
+          new Thread(() -> {
+            synchronized (resultRef) {
+              e.printStackTrace();
+              resultRef.notifyAll();
+            }
+          }).start();
+        }
+      });
+      resultRef.wait(500);
+    }
+  }
 }
\ No newline at end of file
diff --git a/service-rpc/src/main/thrift/cluster.thrift b/service-rpc/src/main/thrift/cluster.thrift
index f3b4b3d..4195289 100644
--- a/service-rpc/src/main/thrift/cluster.thrift
+++ b/service-rpc/src/main/thrift/cluster.thrift
@@ -113,6 +113,10 @@ struct PullSnapshotRequest {
   1: required list<int> requiredSlots
   // for data group
   2: optional Node header
+  // set to true if the previous holder has been removed from the cluster.
+  // This will make the previous holder read-only so that different new
+  // replicas can pull the same snapshot.
+  3: required bool requireReadOnly
 }
 
 struct PullSnapshotResp {
@@ -278,6 +282,13 @@ service TSMetaService extends RaftService {
   **/
   long removeNode(1: Node node)
 
+   /**
+   * When a node is removed from the cluster, if it is not the meta leader, it cannot receive
+   * the commit command by heartbeat since it has been removed, so the leader should tell it
+   * directly that it is no longer in the cluster.
+   **/
+   void exile()
+
   TNodeStatus queryNodeStatus()
 
   Node checkAlive()