You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/02/05 04:05:00 UTC

[incubator-iotdb] branch cluster_node_deletion created (now 2a842f0)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 2a842f0  first implementation

This branch includes the following new commits:

     new 2a842f0  first implementation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: first implementation

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2a842f0c582946cf45289885fce208efa3bd46bd
Author: jt2594838 <jt...@163.com>
AuthorDate: Wed Feb 5 11:44:13 2020 +0800

    first implementation
---
 .../java/org/apache/iotdb/cluster/ClusterMain.java |  87 +++++--
 .../apache/iotdb/cluster/client/MetaClient.java    |   4 +-
 .../cluster/exception/MemberReadOnlyException.java |  11 +
 .../java/org/apache/iotdb/cluster/log/Log.java     |   2 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |   6 +-
 .../iotdb/cluster/log/logtypes/RemoveNodeLog.java  |  41 ++++
 .../manage/FilePartitionedSnapshotLogManager.java  |   2 +-
 .../log/manage/PartitionedSnapshotLogManager.java  |   1 +
 .../iotdb/cluster/partition/NodeRemovalResult.java |  41 ++++
 .../iotdb/cluster/partition/PartitionTable.java    |   6 +
 .../cluster/partition/SlotPartitionTable.java      |  66 +++++-
 .../iotdb/cluster/server/DataClusterServer.java    |  44 +++-
 .../iotdb/cluster/server/MetaClusterServer.java    |   5 +
 .../org/apache/iotdb/cluster/server/Response.java  |   1 +
 .../cluster/server/member/DataGroupMember.java     |  48 +++-
 .../cluster/server/member/MetaGroupMember.java     | 151 +++++++++---
 .../iotdb/cluster/server/member/RaftMember.java    |  15 ++
 .../apache/iotdb/cluster/utils/StatusUtils.java    |   4 +
 .../iotdb/cluster/common/TestPartitionTable.java   |   6 +
 .../server/heartbeat/MetaHeartBeatThreadTest.java  |   6 +
 .../cluster/server/member/DataGroupMemberTest.java |   2 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +-
 service-rpc/src/main/thrift/cluster.thrift         |   7 +
 session/pom.xml                                    | 256 ++++++++++-----------
 24 files changed, 618 insertions(+), 197 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index e9bb374..8e76928 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -19,8 +19,21 @@
 package org.apache.iotdb.cluster;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.iotdb.cluster.client.MetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.MetaClusterServer;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +46,8 @@ public class ClusterMain {
   private static final String MODE_START = "-s";
   // join an established cluster
   private static final String MODE_ADD = "-a";
+  // join an established cluster
+  private static final String MODE_REMOVE = "-r";
 
   public static MetaClusterServer metaServer;
 
@@ -43,24 +58,70 @@ public class ClusterMain {
     }
     String mode = args[0];
 
+    logger.info("Running mode {}", mode);
     try {
-      metaServer = new MetaClusterServer();
-      metaServer.start();
-    } catch (TTransportException | IOException | StartupException e) {
-      logger.error("Cannot set up service", e);
+      if (MODE_START.equals(mode)) {
+        metaServer = new MetaClusterServer();
+        metaServer.start();
+        metaServer.buildCluster();
+      } else if (MODE_ADD.equals(mode)) {
+        metaServer = new MetaClusterServer();
+        metaServer.start();
+        if (!metaServer.joinCluster()) {
+          metaServer.stop();
+        }
+      } else if (MODE_REMOVE.equals(mode)) {
+        doRemoveNode(args);
+      } else {
+        logger.error("Unrecognized mode {}", mode);
+      }
+    } catch (IOException | TTransportException | StartupException e) {
+      logger.error("Fail to start meta server", e);
+    }
+  }
+
+  private static void doRemoveNode(String[] args) throws IOException {
+    if (args.length != 3) {
+      logger.error("Usage: -r <ip> <metaPort>");
       return;
     }
+    String ip = args[1];
+    int metaPort = Integer.parseInt(args[2]);
+    ClusterConfig config = ClusterDescriptor.getINSTANCE().getConfig();
+    TProtocolFactory factory = config
+        .isRpcThriftCompressionEnabled() ? new TCompactProtocol.Factory() : new Factory();
+    Node nodeToRemove = new Node();
+    nodeToRemove.setIp(ip).setMetaPort(metaPort);
+    // try sending the request to each seed node
+    for (String url : config.getSeedNodeUrls()) {
+      String[] splits = url.split(":");
+      Node node = new Node();
+      node.setIp(splits[0]).setMetaPort(Integer.parseInt(splits[1]));
+      MetaClient client = new MetaClient(factory, new TAsyncClientManager(), node, null);
 
-    logger.info("Running mode {}", mode);
-    if (MODE_START.equals(mode)) {
-      metaServer.buildCluster();
-    } else if (MODE_ADD.equals(mode)) {
-      if (!metaServer.joinCluster()) {
-        metaServer.stop();
+      AtomicReference<Long> responseRef = new AtomicReference<>();
+      GenericHandler handler = new GenericHandler(node, responseRef);
+      try {
+        synchronized (responseRef) {
+          client.removeNode(nodeToRemove, handler);
+          responseRef.wait(RaftServer.connectionTimeoutInMS);
+        }
+        Long response = responseRef.get();
+        if (response != null) {
+          if (response == Response.RESPONSE_AGREE) {
+            logger.info("Node {} is successfully removed", nodeToRemove);
+            return;
+          } else if (response == Response.RESPONSE_CLUSTER_TOO_SMALL) {
+            logger.error("Cluster size is too small, cannot remove any node");
+            return;
+          } else if (response == Response.RESPONSE_REJECT) {
+            logger.error("Node {} is not found in the cluster, please check", node);
+            return;
+          }
+        }
+      } catch (TException | InterruptedException e) {
+        logger.warn("Cannot send remove node request through {}, try next node", node);
       }
-    } else {
-      logger.error("Unrecognized mode {}", mode);
     }
-
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java
index 28c558d..625567d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java
@@ -46,7 +46,9 @@ public class MetaClient extends AsyncClient {
   public void onComplete() {
     super.onComplete();
     // return itself to the pool if the job is done
-    pool.putClient(node, this);
+    if (pool != null) {
+      pool.putClient(node, this);
+    }
   }
 
   public static class Factory implements ClientFactory {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/MemberReadOnlyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/MemberReadOnlyException.java
new file mode 100644
index 0000000..8fc3ed4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/MemberReadOnlyException.java
@@ -0,0 +1,11 @@
+package org.apache.iotdb.cluster.exception;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+public class MemberReadOnlyException extends Exception{
+
+  public MemberReadOnlyException(Node node) {
+    super(String.format("The node %s has been set readonly for the partitions, please retry to find "
+        + "a new node", node));
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 8bfe5b4..354d322 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -41,7 +41,7 @@ public abstract class Log {
 
   public enum Types {
     // TODO-Cluster#348 support more logs
-    ADD_NODE, PHYSICAL_PLAN, CLOSE_FILE
+    ADD_NODE, PHYSICAL_PLAN, CLOSE_FILE, REMOVE_NODE
   }
 
   public long getPreviousLogIndex() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index b9f2a62..ffd236f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -23,12 +23,12 @@ import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +39,6 @@ public class MetaLogApplier extends BaseApplier {
 
   private static final Logger logger = LoggerFactory.getLogger(MetaLogApplier.class);
   private MetaGroupMember member;
-  private QueryProcessExecutor queryExecutor;
 
   public MetaLogApplier(MetaGroupMember member) {
     super(member);
@@ -64,6 +63,9 @@ public class MetaLogApplier extends BaseApplier {
         logger.error("Cannot close {} file in {}", closeFileLog.isSeq() ? "seq" : "unseq",
             closeFileLog.getStorageGroupName());
       }
+    } else if (log instanceof RemoveNodeLog) {
+      RemoveNodeLog removeNodeLog = ((RemoveNodeLog) log);
+      member.applyRemoveNode(removeNodeLog.getRemovedNode());
     } else {
       // TODO-Cluster#348 support more types of logs
       logger.error("Unsupported log: {}", log);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
new file mode 100644
index 0000000..204e675
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -0,0 +1,41 @@
+package org.apache.iotdb.cluster.log.logtypes;
+
+import java.io.IOException;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.utils.SerializeUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+public class RemoveNodeLog extends Log {
+    private Node removedNode;
+
+    @Override
+    public ByteBuffer serialize() {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+        try {
+            dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
+        } catch (IOException e) {
+            // ignored
+        }
+        SerializeUtils.serialize(removedNode, dataOutputStream);
+        return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+    }
+
+    @Override
+    public void deserialize(ByteBuffer buffer) {
+        removedNode = new Node();
+        SerializeUtils.deserialize(removedNode, buffer);
+    }
+
+    public Node getRemovedNode() {
+        return removedNode;
+    }
+
+    public void setRemovedNode(Node removedNode) {
+        this.removedNode = removedNode;
+    }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 28e310e..6eed911 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Different from PartitionedSnapshotLogManager, FilePartitionedSnapshotLogManager does not store
- * the committed in memory, it considers the logs are contained in the TsFiles so it will record
+ * the committed in memory after snapshots, it considers the logs are contained in the TsFiles so it will record
  * every TsFiles in the slot instead.
  */
 public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogManager<FileSnapshot> {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index e740c86..5b0c432 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -96,6 +96,7 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
 
   public void setSnapshot(T snapshot, int slot) {
     synchronized (slotSnapshots) {
+      // TODO-Cluster#451: persist the remote snapshot so the pulling can be recovered in restart
       slotSnapshots.put(slot, snapshot);
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
new file mode 100644
index 0000000..df5228b
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -0,0 +1,41 @@
+package org.apache.iotdb.cluster.partition;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+/**
+ * NodeRemovalResult stores the removed partition group and who will take over its slots.
+ */
+public class NodeRemovalResult {
+  private PartitionGroup removedGroup;
+  // if the removed group contains the local node, the local node should join a new group to
+  // preserve the replication number
+  private PartitionGroup newGroup;
+  private Map<Node, List<Integer>> newSlotOwners;
+
+  public PartitionGroup getRemovedGroup() {
+    return removedGroup;
+  }
+
+  public void setRemovedGroup(PartitionGroup group) {
+    this.removedGroup = group;
+  }
+
+  public Map<Node, List<Integer>> getNewSlotOwners() {
+    return newSlotOwners;
+  }
+
+  public void setNewSlotOwners(
+      Map<Node, List<Integer>> newSlotOwners) {
+    this.newSlotOwners = newSlotOwners;
+  }
+
+  public PartitionGroup getNewGroup() {
+    return newGroup;
+  }
+
+  public void setNewGroup(PartitionGroup newGroup) {
+    this.newGroup = newGroup;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 130f48b..5810f34 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -68,6 +68,12 @@ public interface PartitionTable {
   PartitionGroup addNode(Node node);
 
   /**
+   * Remove a node and update the partition table.
+   * @param node
+   */
+  NodeRemovalResult removeNode(Node node);
+
+  /**
    *
    * @return All data groups where all VNodes of this node is the header. The first index
    * indicates the VNode and the second index indicates the data group of one VNode.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
index d2d3016..b1d7e31 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
@@ -91,8 +91,7 @@ public class SlotPartitionTable implements PartitionTable {
     int nodeNum = nodeRing.size();
     int slotsPerNode = slotNum / nodeNum;
     for (Node node : nodeRing) {
-      List<Integer> nodeSlots = new ArrayList<>();
-      nodeSlotMap.put(node, nodeSlots);
+      nodeSlotMap.put(node, new ArrayList<>());
     }
 
     for (int i = 0; i < slotNum; i++) {
@@ -150,10 +149,6 @@ public class SlotPartitionTable implements PartitionTable {
     return ret;
   }
 
-  private int nextIndex(int currIndex) {
-    return currIndex == nodeRing.size() - 1 ? 0 : currIndex + 1;
-  }
-
   @Override
   public PartitionGroup route(String storageGroupName, long timestamp) {
     synchronized (nodeRing) {
@@ -359,4 +354,63 @@ public class SlotPartitionTable implements PartitionTable {
   public int hashCode() {
     return 0;
   }
+
+  @Override
+  public NodeRemovalResult removeNode(Node target) {
+    synchronized (nodeRing) {
+      if (!nodeRing.contains(target)) {
+        return null;
+      }
+
+      NodeRemovalResult result = new NodeRemovalResult();
+      result.setRemovedGroup(getHeaderGroup(target));
+      nodeRing.remove(target);
+
+      // if the node belongs to a group that headed by target, this group should be removed
+      // and other groups containing target should be updated
+      int removedGroupIdx = -1;
+      for(int i = 0; i < localGroups.size(); i++) {
+        PartitionGroup oldGroup = localGroups.get(i);
+        Node header = oldGroup.getHeader();
+        if (header.equals(target)) {
+          removedGroupIdx = i;
+        } else {
+          PartitionGroup newGrp = getHeaderGroup(header);
+          localGroups.set(i, newGrp);
+          result.setNewGroup(newGrp);
+        }
+      }
+      if (removedGroupIdx != -1) {
+        localGroups.remove(removedGroupIdx);
+        // each node exactly joins replicationNum groups, so when a group is removed, the node
+        // should join a new one
+        int thisNodeIdx = nodeRing.indexOf(thisNode);
+        // this node must be the last node of the new group
+        int headerNodeIdx = thisNodeIdx - (replicationNum - 1);
+        headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx;
+        Node header = nodeRing.get(headerNodeIdx);
+        PartitionGroup newGrp = getHeaderGroup(header);
+        localGroups.add(newGrp);
+      }
+
+      // the slots movement is only done logically, the new node itself will pull data from the
+      // old node
+      Map<Node, List<Integer>> nodeListMap = retrieveSlots(target);
+      result.setNewSlotOwners(nodeListMap);
+      return result;
+    }
+  }
+
+  private Map<Node, List<Integer>> retrieveSlots(Node target) {
+    Map<Node, List<Integer>> newHolderSlotMap = new HashMap<>();
+    List<Integer> slots = nodeSlotMap.remove(target);
+    for (int i = 0; i < slots.size(); i++) {
+      int slot = slots.get(i);
+      Node newHolder = nodeRing.get(i % nodeRing.size());
+      slotNodeMap.put(slot, newHolder);
+      nodeSlotMap.get(newHolder).add(slot);
+      newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
+    }
+    return newHolderSlotMap;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 2ca212f..3a89b66 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.exception.NoHeaderNodeException;
 import org.apache.iotdb.cluster.exception.NotInSameGroupException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -88,7 +89,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
         if (partitionTable != null) {
           try {
             member = createNewMember(header);
-          } catch (NotInSameGroupException e) {
+          } catch (NotInSameGroupException | TTransportException e) {
             ex = e;
           }
         } else {
@@ -103,7 +104,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     }
   }
 
-  private DataGroupMember createNewMember(Node header) throws NotInSameGroupException {
+  private DataGroupMember createNewMember(Node header)
+      throws NotInSameGroupException, TTransportException {
     DataGroupMember member;
     synchronized (partitionTable) {
       // it may be that the header and this node are in the same group, but it is the first time
@@ -114,6 +116,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
         member = dataMemberFactory.create(partitionGroup, thisNode);
         headerGroupMap.put(header, member);
         logger.info("Created a member for header {}", header);
+        member.start();
       } else {
         logger.info("This node {} does not belong to the group {}", thisNode, partitionGroup);
         throw new NotInSameGroupException(partitionTable.getHeaderGroup(header),
@@ -298,6 +301,41 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     }
   }
 
+  public void removeNode(Node node, NodeRemovalResult removalResult) {
+    Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
+    synchronized (headerGroupMap) {
+      while (entryIterator.hasNext()) {
+        Entry<Node, DataGroupMember> entry = entryIterator.next();
+        DataGroupMember dataGroupMember = entry.getValue();
+        if (dataGroupMember.getHeader().equals(node)) {
+          // the group is removed as the node is removed, so new writes should be rejected as
+          // they belong to the new holder, but the member is kept alive for other nodes to pull
+          // snapshots
+          dataGroupMember.setReadOnly();
+          //TODO-Cluster: when to call removeLocalData?
+        } else {
+          if (node.equals(thisNode)) {
+            // this node is removed, it is no more replica of other groups
+            List<Integer> nodeSlots = partitionTable.getNodeSlots(dataGroupMember.getHeader());
+            dataGroupMember.removeLocalData(nodeSlots);
+            entryIterator.remove();
+          } else {
+            // the group should be updated and pull new slots from the removed node
+            dataGroupMember.removeNode(node, removalResult);
+          }
+        }
+      }
+      PartitionGroup newGroup = removalResult.getNewGroup();
+      if (newGroup != null) {
+        try {
+          createNewMember(newGroup.getHeader());
+        } catch (NotInSameGroupException | TTransportException e) {
+          // ignored
+        }
+      }
+    }
+  }
+
   @Override
   public void pullTimeSeriesSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
@@ -319,7 +357,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   public void pullSnapshots() {
     List<Integer> slots = partitionTable.getNodeSlots(thisNode);
     DataGroupMember dataGroupMember = headerGroupMap.get(thisNode);
-    dataGroupMember.pullSnapshots(slots, thisNode);
+    dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
   }
 
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 9ec26cd..61b1042 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.RegisterManager;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TTransportException;
@@ -176,4 +177,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
     return member;
   }
 
+  @Override
+  public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) throws TException {
+    member.removeNode(node, resultHandler);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
index 7064c73..e0fab54 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
@@ -34,6 +34,7 @@ public class Response {
   public static final long RESPONSE_NO_CONNECTION = -6;
   public static final long RESPONSE_META_LOG_STALE = -7;
   public static final long RESPONSE_LEADER_STILL_ONLINE = -8;
+  public static final long RESPONSE_CLUSTER_TOO_SMALL = -9;
 
   private Response() {
     // enum class
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 5bcb8de..3e901a5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
 import org.apache.iotdb.cluster.log.snapshot.RemoteFileSnapshot;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.query.ClusterQueryParser;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
@@ -229,7 +230,6 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
 
     long thisMetaLastLogIndex = metaGroupMember.getLogManager().getLastLogIndex();
     long thisMetaLastLogTerm = metaGroupMember.getLogManager().getLastLogTerm();
-    long thisMetaTerm = metaGroupMember.getTerm().get();
 
     // term of the electors's MetaGroupMember is not verified, so 0 and 1 are used to make sure
     // the verification does not fail
@@ -479,7 +479,12 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
     }
   }
 
-  public void pullSnapshots(List<Integer> slots, Node newNode) {
+  /**
+   * Pull snapshots from the previous holders after newNode joins the cluster.
+   * @param slots
+   * @param newNode
+   */
+  public void pullNodeAdditionSnapshots(List<Integer> slots, Node newNode) {
     synchronized (logManager) {
       logger.info("{} pulling {} slots from remote", name, slots.size());
       PartitionedSnapshot snapshot = (PartitionedSnapshot) logManager.getSnapshot();
@@ -499,17 +504,14 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       for (Entry<Node, List<Integer>> entry : holderSlotsMap.entrySet()) {
         Node node = entry.getKey();
         List<Integer> nodeSlots = entry.getValue();
-        pullFileSnapshot(node, nodeSlots);
+        pullFileSnapshot(metaGroupMember.getPartitionTable().getHeaderGroup(node),  nodeSlots);
       }
     }
   }
 
-  private void pullFileSnapshot(Node node, List<Integer> nodeSlots) {
-    PartitionGroup prevHolders =
-        new PartitionGroup(metaGroupMember.getPartitionTable().getHeaderGroup(node));
-
+  private void pullFileSnapshot(PartitionGroup prevHolders,  List<Integer> nodeSlots) {
     Future<Map<Integer, FileSnapshot>> snapshotFuture =
-        pullSnapshotService.submit(new PullSnapshotTask(node, nodeSlots, this,
+        pullSnapshotService.submit(new PullSnapshotTask(prevHolders.getHeader(), nodeSlots, this,
             prevHolders, FileSnapshot::new));
     for (int slot : nodeSlots) {
       logManager.setSnapshot(new RemoteFileSnapshot(snapshotFuture), slot);
@@ -718,7 +720,6 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
     }
     try {
       if (reader.hasNextBatch()) {
-        int count = 0;
         BatchData batchData = reader.nextBatch();
 
         ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -743,5 +744,34 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       resultHandler.onError(e);
     }
   }
+
+  /**
+   * When the node does not play a member in a group any more, the corresponding local data should
+   * be removed.
+   */
+  public void removeLocalData(List<Integer> slots) {
+    for (Integer slot : slots) {
+      //TODO-Cluster: remove the data in the slot
+    }
+  }
+
+  /**
+   * When a node is removed and it is not the header of the group, the member should take over
+   * some slots from the removed group, and add a new node to the group the removed node was in the
+   * group.
+   */
+  public void removeNode(Node removedNode, NodeRemovalResult removalResult) {
+    synchronized (allNodes) {
+      if (allNodes.contains(removedNode)) {
+        // update the group if the deleted node was in it
+        allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+      }
+      List<Integer> slotsToPull = removalResult.getNewSlotOwners().get(getHeader());
+      if (slotsToPull != null) {
+        // pull the slots that should be taken over
+        pullFileSnapshot(removalResult.getRemovedGroup(), slotsToPull);
+      }
+    }
+  }
 }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 19c520c..5cdf481 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.AddSelfException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.NotInSameGroupException;
+import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.RequestTimeOutException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.Log;
@@ -62,8 +63,10 @@ import org.apache.iotdb.cluster.log.applier.DataLogApplier;
 import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
 import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.SlotPartitionTable;
@@ -72,17 +75,7 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.query.reader.RemoteSeriesReaderByTimestamp;
 import org.apache.iotdb.cluster.query.reader.RemoteSimpleSeriesReader;
-import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
-import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
-import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
-import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
-import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
-import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
-import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
-import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
+import org.apache.iotdb.cluster.rpc.thrift.*;
 import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
 import org.apache.iotdb.cluster.server.ClientServer;
 import org.apache.iotdb.cluster.server.DataClusterServer;
@@ -138,8 +131,6 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
   public static final int REPLICATION_NUM =
       ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum();
 
-  private TProtocolFactory protocolFactory;
-
   // blind nodes are nodes that does not know the nodes in the cluster
   private Set<Node> blindNodes = new HashSet<>();
   private Set<Node> idConflictNodes = new HashSet<>();
@@ -150,7 +141,6 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
   private ClientServer clientServer;
 
   private LogApplier metaLogApplier = new MetaLogApplier(this);
-  private LogApplier dataLogApplier = new DataLogApplier(this);
   private DataGroupMember.Factory dataMemberFactory;
 
   private MetaSingleSnapshotLogManager logManager;
@@ -165,8 +155,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
       throws IOException {
     super("Meta", new ClientPool(new MetaClient.Factory(new TAsyncClientManager(), factory)));
     allNodes = new ArrayList<>();
-    this.protocolFactory = factory;
-    dataMemberFactory = new Factory(protocolFactory, this, dataLogApplier,
+    LogApplier dataLogApplier = new DataLogApplier(this);
+    dataMemberFactory = new Factory(factory, this, dataLogApplier,
         new TAsyncClientManager());
     dataClientPool =
         new ClientPool(new DataClient.Factory(new TAsyncClientManager(), factory));
@@ -292,7 +282,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
             DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode);
             getDataClusterServer().addDataGroupMember(dataGroupMember);
             dataGroupMember.start();
-            dataGroupMember.pullSnapshots(partitionTable.getNodeSlots(newNode), newNode);
+            dataGroupMember.pullNodeAdditionSnapshots(partitionTable.getNodeSlots(newNode), newNode);
           } catch (TTransportException e) {
             logger.error("Fail to create data newMember for new header {}", newNode, e);
           }
@@ -610,20 +600,12 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
         switch (result) {
           case OK:
             logger.info("Join request of {} is accepted", node);
-            // add node is instantly applied to update the partition table
-            try {
-              logManager.getApplier().apply(addNodeLog);
-            } catch (QueryProcessException e) {
-              logManager.removeLastLog();
-              resultHandler.onError(e);
-              return true;
-            }
+            logManager.commitLog(addNodeLog.getCurrLogIndex());
             synchronized (partitionTable) {
               response.setPartitionTableBytes(partitionTable.serialize());
             }
             response.setRespNum((int) Response.RESPONSE_AGREE);
             resultHandler.onComplete(response);
-            logManager.commitLog(logManager.getLastLogIndex());
             return true;
           case TIME_OUT:
             logger.info("Join request of {} timed out", node);
@@ -1220,10 +1202,6 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
     return nodeStatus;
   }
 
-  ClientServer getClientServer() {
-    return clientServer;
-  }
-
   @Override
   public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
     resultHandler.onComplete(new TNodeStatus());
@@ -1242,4 +1220,117 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
       dataClusterServer.setPartitionTable(partitionTable);
     }
   }
+
+  @Override
+  public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
+    if (partitionTable == null) {
+      logger.info("Cannot add node now because the partition table is not set");
+      resultHandler.onError(new PartitionTableUnavailableException(thisNode));
+      return;
+    }
+
+    // try to process the request locally, if it cannot be processed locally, forward it
+    if (processRemoveNodeLocally(node, resultHandler)) {
+      return;
+    }
+
+    if (character == NodeCharacter.FOLLOWER && leader != null) {
+      logger.info("Forward the node removal request of {} to leader {}", node, leader);
+      if (forwardRemoveNode(node, resultHandler)) {
+        return;
+      }
+    }
+    resultHandler.onError(new LeaderUnknownException(getAllNodes()));
+  }
+
+  private boolean forwardRemoveNode(Node node, AsyncMethodCallback resultHandler) {
+    TSMetaService.AsyncClient client = (TSMetaService.AsyncClient) connectNode(leader);
+    if (client != null) {
+      try {
+        client.removeNode(node, new GenericForwardHandler(resultHandler));
+        return true;
+      } catch (TException e) {
+        logger.warn("Cannot connect to node {}", node, e);
+      }
+    }
+    return false;
+  }
+
+  private boolean processRemoveNodeLocally(Node node, AsyncMethodCallback resultHandler) {
+    if (character == NodeCharacter.LEADER) {
+      if (allNodes.size() <= ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum()) {
+        resultHandler.onComplete(Response.RESPONSE_CLUSTER_TOO_SMALL);
+        return true;
+      }
+
+      Node target = null;
+      synchronized (allNodes) {
+        for (Node n : allNodes) {
+          if (n.ip.equals(node.ip) && n.metaPort == node.metaPort){
+            target = n;
+            break;
+          }
+        }
+      }
+
+      if (target == null) {
+        logger.debug("Node {} is not in the cluster", node);
+        resultHandler.onComplete(Response.RESPONSE_REJECT);
+        return true;
+      }
+
+      // node removal must be serialized
+      synchronized (logManager) {
+        RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+        removeNodeLog.setCurrLogTerm(getTerm().get());
+        removeNodeLog.setPreviousLogIndex(logManager.getLastLogIndex());
+        removeNodeLog.setPreviousLogTerm(logManager.getLastLogTerm());
+        removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+
+        removeNodeLog.setRemovedNode(target);
+
+        logManager.appendLog(removeNodeLog);
+
+        logger.info("Send the node removal request of {} to other nodes", target);
+        AppendLogResult result = sendLogToAllGroups(removeNodeLog);
+
+        switch (result) {
+          case OK:
+            logger.info("Join request of {} is accepted", target);
+            logManager.commitLog(removeNodeLog.getCurrLogIndex());
+            resultHandler.onComplete(Response.RESPONSE_AGREE);
+            return true;
+          case TIME_OUT:
+            logger.info("Join request of {} timed out", target);
+            resultHandler.onError(new RequestTimeOutException(removeNodeLog));
+            logManager.removeLastLog();
+            return true;
+          case LEADERSHIP_STALE:
+          default:
+            logManager.removeLastLog();
+            // if the leader is found, forward to it
+        }
+      }
+    }
+    return false;
+  }
+
+  public void applyRemoveNode(Node oldNode) {
+    synchronized (allNodes) {
+      if (allNodes.contains(oldNode)) {
+        logger.debug("Removing a node {} from {}", oldNode, allNodes);
+        allNodes.remove(oldNode);
+        idNodeMap.remove(oldNode.nodeIdentifier);
+
+        // update the partition table
+        NodeRemovalResult result = partitionTable.removeNode(oldNode);
+
+        getDataClusterServer().removeNode(oldNode, result);
+        if (oldNode == leader) {
+          setCharacter(NodeCharacter.ELECTOR);
+          lastHeartBeatReceivedTime = Long.MIN_VALUE;
+        }
+      }
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 6deb034..f4789bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -105,6 +105,11 @@ public abstract class RaftMember implements RaftService.AsyncIface {
   // know if this node is synchronized with the leader
   private Object syncLock = new Object();
 
+  // when the header of the group is removed from the cluster, the members of the group should no
+  // longer accept writes, but they still can be read candidates for weak consistency reads and
+  // provide snapshots for the new holders
+  private volatile boolean readOnly = false;
+
   public RaftMember() {
   }
 
@@ -652,6 +657,10 @@ public abstract class RaftMember implements RaftService.AsyncIface {
   TSStatus processPlanLocally(PhysicalPlan plan) {
     logger.debug("{}: Processing plan {}", name, plan);
     synchronized (logManager) {
+      if (readOnly) {
+        return StatusUtils.NODE_READ_ONLY;
+      }
+
       PhysicalPlanLog log = new PhysicalPlanLog();
       log.setCurrLogTerm(getTerm().get());
       log.setPreviousLogIndex(logManager.getLastLogIndex());
@@ -798,4 +807,10 @@ public abstract class RaftMember implements RaftService.AsyncIface {
       resultHandler.onError(e);
     }
   }
+
+  public void setReadOnly() {
+    synchronized (logManager) {
+      readOnly = true;
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
index 99fdd48..718a7e0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
@@ -39,6 +39,7 @@ public class StatusUtils {
   public static final TSStatus EXECUTE_STATEMENT_ERROR =
       getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   public static final TSStatus NO_STORAGE_GROUP = getStatus(TSStatusCode.STORAGE_GROUP_ERROR);
+  public static final TSStatus NODE_READ_ONLY = getStatus(TSStatusCode.NODE_READ_ONLY);
 
   private static TSStatus getStatus(TSStatusCode statusCode) {
     TSStatusType tsStatusType = new TSStatusType();
@@ -55,6 +56,9 @@ public class StatusUtils {
       case PARTITION_NOT_READY:
         tsStatusType.setMessage("Partition table is not ready");
         break;
+      case NODE_READ_ONLY:
+        tsStatusType.setMessage("Current node is read-only, please retry to find another available node");
+        break;
       default:
         tsStatusType.setMessage("");
         break;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java
index ef14b1d..aeda017 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.common;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -39,6 +40,11 @@ public class TestPartitionTable implements PartitionTable {
   }
 
   @Override
+  public NodeRemovalResult removeNode(Node node) {
+    return null;
+  }
+
+  @Override
   public List<PartitionGroup> getLocalGroups() {
     return null;
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java
index 457fffe..45d372e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.common.TestClient;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.log.LogManager;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -61,6 +62,11 @@ public class MetaHeartBeatThreadTest extends HeartBeatThreadTest {
     }
 
     @Override
+    public NodeRemovalResult removeNode(Node node) {
+      return null;
+    }
+
+    @Override
     public List<PartitionGroup> getLocalGroups() {
       return null;
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 9ea7028..3caf096 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -427,7 +427,7 @@ public class DataGroupMemberTest extends MemberTest {
       hasInitialSnapshots = false;
       partitionTable.addNode(TestUtils.getNode(10));
       List<Integer> requiredSlots = Arrays.asList(19, 39, 59, 79, 99);
-      dataGroupMember.pullSnapshots(requiredSlots, TestUtils.getNode(10));
+      dataGroupMember.pullNodeAdditionSnapshots(requiredSlots, TestUtils.getNode(10));
       assertEquals(requiredSlots.size(), receivedSnapshots.size());
       for (Integer requiredSlot : requiredSlots) {
         receivedSnapshots.get(requiredSlot).getRemoteSnapshot();
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index fd9019a..87a4171 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -65,7 +65,8 @@ public enum TSStatusCode {
   PARTITION_NOT_READY(700),
   TIME_OUT(701),
   NO_LEADER(702),
-  UNSUPPORTED_OPERATION(703);
+  UNSUPPORTED_OPERATION(703),
+  NODE_READ_ONLY(704);
 
 
   private int statusCode;
diff --git a/service-rpc/src/main/thrift/cluster.thrift b/service-rpc/src/main/thrift/cluster.thrift
index eb734cd..f3b4b3d 100644
--- a/service-rpc/src/main/thrift/cluster.thrift
+++ b/service-rpc/src/main/thrift/cluster.thrift
@@ -271,6 +271,13 @@ service TSMetaService extends RaftService {
   **/
   AddNodeResponse addNode(1: Node node)
 
+  /**
+  * Remove a node from the cluster. If the node is not in the cluster or the cluster size will
+  * less than replication number, the request will be rejected.
+  * return -1(RESPONSE_AGREE) or -3(RESPONSE_REJECT) or -9(RESPONSE_CLUSTER_TOO_SMALL)
+  **/
+  long removeNode(1: Node node)
+
   TNodeStatus queryNodeStatus()
 
   Node checkAlive()
diff --git a/session/pom.xml b/session/pom.xml
index 8df9ae6..dab30aa 100644
--- a/session/pom.xml
+++ b/session/pom.xml
@@ -19,134 +19,132 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <parent>
-    <artifactId>iotdb-parent</artifactId>
-    <groupId>org.apache.iotdb</groupId>
-    <version>0.10.0-SNAPSHOT</version>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>iotdb-session</artifactId>
-  <name>IoTDB Session</name>
-  <properties>
-    <session.test.skip>false</session.test.skip>
-    <session.it.skip>${session.test.skip}</session.it.skip>
-    <session.ut.skip>${session.test.skip}</session.ut.skip>
-  </properties>
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <version>3.1.0</version>
-        <configuration>
-          <descriptorRefs>
-            <descriptorRef>jar-with-dependencies</descriptorRef>
-          </descriptorRefs>
-        </configuration>
-        <executions>
-          <execution>
-            <id>make-assembly</id>
-            <!-- this is used for inheritance merges -->
-            <phase>package</phase>
-            <!-- bind to the packaging phase -->
-            <goals>
-              <goal>single</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <!--using `mvn test` to run UT, `mvn verify` to run ITs
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>iotdb-parent</artifactId>
+        <groupId>org.apache.iotdb</groupId>
+        <version>0.10.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>iotdb-session</artifactId>
+    <name>IoTDB Session</name>
+    <properties>
+        <session.test.skip>false</session.test.skip>
+        <session.it.skip>${session.test.skip}</session.it.skip>
+        <session.ut.skip>${session.test.skip}</session.ut.skip>
+    </properties>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.1.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <!-- this is used for inheritance merges -->
+                        <phase>package</phase>
+                        <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <!--using `mvn test` to run UT, `mvn verify` to run ITs
       Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <skipTests>${session.ut.skip}</skipTests>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-failsafe-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>run-integration-tests</id>
-            <phase>integration-test</phase>
-            <goals>
-              <goal>integration-test</goal>
-              <goal>verify</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <skipTests>${session.test.skip}</skipTests>
-          <skipITs>${session.it.skip}</skipITs>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.iotdb</groupId>
-      <artifactId>service-rpc</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.iotdb</groupId>
-      <artifactId>tsfile</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.iotdb</groupId>
-      <artifactId>iotdb-server</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.iotdb</groupId>
-      <artifactId>iotdb-server</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.iotdb</groupId>
-      <artifactId>iotdb-jdbc</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-    </dependency>
-  </dependencies>
-  <profiles>
-    <profile>
-      <id>skipSessionTests</id>
-      <activation>
-        <property>
-          <name>skipTests</name>
-          <value>true</value>
-        </property>
-      </activation>
-      <properties>
-        <session.test.skip>true</session.test.skip>
-        <session.ut.skip>true</session.ut.skip>
-        <session.it.skip>true</session.it.skip>
-      </properties>
-    </profile>
-    <profile>
-      <id>skipUT_SessionTests</id>
-      <activation>
-        <property>
-          <name>skipUTs</name>
-          <value>true</value>
-        </property>
-      </activation>
-      <properties>
-        <session.ut.skip>true</session.ut.skip>
-      </properties>
-    </profile>
-  </profiles>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skipTests>${session.ut.skip}</skipTests>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>run-integration-tests</id>
+                        <phase>integration-test</phase>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <skipTests>${session.test.skip}</skipTests>
+                    <skipITs>${session.it.skip}</skipITs>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>service-rpc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>tsfile</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-jdbc</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
+    </dependencies>
+    <profiles>
+        <profile>
+            <id>skipSessionTests</id>
+            <activation>
+                <property>
+                    <name>skipTests</name>
+                    <value>true</value>
+                </property>
+            </activation>
+            <properties>
+                <session.test.skip>true</session.test.skip>
+                <session.ut.skip>true</session.ut.skip>
+                <session.it.skip>true</session.it.skip>
+            </properties>
+        </profile>
+        <profile>
+            <id>skipUT_SessionTests</id>
+            <activation>
+                <property>
+                    <name>skipUTs</name>
+                    <value>true</value>
+                </property>
+            </activation>
+            <properties>
+                <session.ut.skip>true</session.ut.skip>
+            </properties>
+        </profile>
+    </profiles>
 </project>