You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/02/05 04:05:01 UTC

[incubator-iotdb] 01/01: first implementation

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2a842f0c582946cf45289885fce208efa3bd46bd
Author: jt2594838 <jt...@163.com>
AuthorDate: Wed Feb 5 11:44:13 2020 +0800

    first implementation
---
 .../java/org/apache/iotdb/cluster/ClusterMain.java |  87 +++++--
 .../apache/iotdb/cluster/client/MetaClient.java    |   4 +-
 .../cluster/exception/MemberReadOnlyException.java |  11 +
 .../java/org/apache/iotdb/cluster/log/Log.java     |   2 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |   6 +-
 .../iotdb/cluster/log/logtypes/RemoveNodeLog.java  |  41 ++++
 .../manage/FilePartitionedSnapshotLogManager.java  |   2 +-
 .../log/manage/PartitionedSnapshotLogManager.java  |   1 +
 .../iotdb/cluster/partition/NodeRemovalResult.java |  41 ++++
 .../iotdb/cluster/partition/PartitionTable.java    |   6 +
 .../cluster/partition/SlotPartitionTable.java      |  66 +++++-
 .../iotdb/cluster/server/DataClusterServer.java    |  44 +++-
 .../iotdb/cluster/server/MetaClusterServer.java    |   5 +
 .../org/apache/iotdb/cluster/server/Response.java  |   1 +
 .../cluster/server/member/DataGroupMember.java     |  48 +++-
 .../cluster/server/member/MetaGroupMember.java     | 151 +++++++++---
 .../iotdb/cluster/server/member/RaftMember.java    |  15 ++
 .../apache/iotdb/cluster/utils/StatusUtils.java    |   4 +
 .../iotdb/cluster/common/TestPartitionTable.java   |   6 +
 .../server/heartbeat/MetaHeartBeatThreadTest.java  |   6 +
 .../cluster/server/member/DataGroupMemberTest.java |   2 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +-
 service-rpc/src/main/thrift/cluster.thrift         |   7 +
 session/pom.xml                                    | 256 ++++++++++-----------
 24 files changed, 618 insertions(+), 197 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index e9bb374..8e76928 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -19,8 +19,21 @@
 package org.apache.iotdb.cluster;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.iotdb.cluster.client.MetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.MetaClusterServer;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +46,8 @@ public class ClusterMain {
   private static final String MODE_START = "-s";
   // join an established cluster
   private static final String MODE_ADD = "-a";
+  // join an established cluster
+  private static final String MODE_REMOVE = "-r";
 
   public static MetaClusterServer metaServer;
 
@@ -43,24 +58,70 @@ public class ClusterMain {
     }
     String mode = args[0];
 
+    logger.info("Running mode {}", mode);
     try {
-      metaServer = new MetaClusterServer();
-      metaServer.start();
-    } catch (TTransportException | IOException | StartupException e) {
-      logger.error("Cannot set up service", e);
+      if (MODE_START.equals(mode)) {
+        metaServer = new MetaClusterServer();
+        metaServer.start();
+        metaServer.buildCluster();
+      } else if (MODE_ADD.equals(mode)) {
+        metaServer = new MetaClusterServer();
+        metaServer.start();
+        if (!metaServer.joinCluster()) {
+          metaServer.stop();
+        }
+      } else if (MODE_REMOVE.equals(mode)) {
+        doRemoveNode(args);
+      } else {
+        logger.error("Unrecognized mode {}", mode);
+      }
+    } catch (IOException | TTransportException | StartupException e) {
+      logger.error("Fail to start meta server", e);
+    }
+  }
+
+  private static void doRemoveNode(String[] args) throws IOException {
+    if (args.length != 3) {
+      logger.error("Usage: -r <ip> <metaPort>");
       return;
     }
+    String ip = args[1];
+    int metaPort = Integer.parseInt(args[2]);
+    ClusterConfig config = ClusterDescriptor.getINSTANCE().getConfig();
+    TProtocolFactory factory = config
+        .isRpcThriftCompressionEnabled() ? new TCompactProtocol.Factory() : new Factory();
+    Node nodeToRemove = new Node();
+    nodeToRemove.setIp(ip).setMetaPort(metaPort);
+    // try sending the request to each seed node
+    for (String url : config.getSeedNodeUrls()) {
+      String[] splits = url.split(":");
+      Node node = new Node();
+      node.setIp(splits[0]).setMetaPort(Integer.parseInt(splits[1]));
+      MetaClient client = new MetaClient(factory, new TAsyncClientManager(), node, null);
 
-    logger.info("Running mode {}", mode);
-    if (MODE_START.equals(mode)) {
-      metaServer.buildCluster();
-    } else if (MODE_ADD.equals(mode)) {
-      if (!metaServer.joinCluster()) {
-        metaServer.stop();
+      AtomicReference<Long> responseRef = new AtomicReference<>();
+      GenericHandler handler = new GenericHandler(node, responseRef);
+      try {
+        synchronized (responseRef) {
+          client.removeNode(nodeToRemove, handler);
+          responseRef.wait(RaftServer.connectionTimeoutInMS);
+        }
+        Long response = responseRef.get();
+        if (response != null) {
+          if (response == Response.RESPONSE_AGREE) {
+            logger.info("Node {} is successfully removed", nodeToRemove);
+            return;
+          } else if (response == Response.RESPONSE_CLUSTER_TOO_SMALL) {
+            logger.error("Cluster size is too small, cannot remove any node");
+            return;
+          } else if (response == Response.RESPONSE_REJECT) {
+            logger.error("Node {} is not found in the cluster, please check", node);
+            return;
+          }
+        }
+      } catch (TException | InterruptedException e) {
+        logger.warn("Cannot send remove node request through {}, try next node", node);
       }
-    } else {
-      logger.error("Unrecognized mode {}", mode);
     }
-
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java
index 28c558d..625567d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java
@@ -46,7 +46,9 @@ public class MetaClient extends AsyncClient {
   public void onComplete() {
     super.onComplete();
     // return itself to the pool if the job is done
-    pool.putClient(node, this);
+    if (pool != null) {
+      pool.putClient(node, this);
+    }
   }
 
   public static class Factory implements ClientFactory {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/MemberReadOnlyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/MemberReadOnlyException.java
new file mode 100644
index 0000000..8fc3ed4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/MemberReadOnlyException.java
@@ -0,0 +1,11 @@
+package org.apache.iotdb.cluster.exception;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+public class MemberReadOnlyException extends Exception{
+
+  public MemberReadOnlyException(Node node) {
+    super(String.format("The node %s has been set readonly for the partitions, please retry to find "
+        + "a new node", node));
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 8bfe5b4..354d322 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -41,7 +41,7 @@ public abstract class Log {
 
   public enum Types {
     // TODO-Cluster#348 support more logs
-    ADD_NODE, PHYSICAL_PLAN, CLOSE_FILE
+    ADD_NODE, PHYSICAL_PLAN, CLOSE_FILE, REMOVE_NODE
   }
 
   public long getPreviousLogIndex() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index b9f2a62..ffd236f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -23,12 +23,12 @@ import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +39,6 @@ public class MetaLogApplier extends BaseApplier {
 
   private static final Logger logger = LoggerFactory.getLogger(MetaLogApplier.class);
   private MetaGroupMember member;
-  private QueryProcessExecutor queryExecutor;
 
   public MetaLogApplier(MetaGroupMember member) {
     super(member);
@@ -64,6 +63,9 @@ public class MetaLogApplier extends BaseApplier {
         logger.error("Cannot close {} file in {}", closeFileLog.isSeq() ? "seq" : "unseq",
             closeFileLog.getStorageGroupName());
       }
+    } else if (log instanceof RemoveNodeLog) {
+      RemoveNodeLog removeNodeLog = ((RemoveNodeLog) log);
+      member.applyRemoveNode(removeNodeLog.getRemovedNode());
     } else {
       // TODO-Cluster#348 support more types of logs
       logger.error("Unsupported log: {}", log);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
new file mode 100644
index 0000000..204e675
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -0,0 +1,41 @@
+package org.apache.iotdb.cluster.log.logtypes;
+
+import java.io.IOException;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.utils.SerializeUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+public class RemoveNodeLog extends Log {
+    private Node removedNode;
+
+    @Override
+    public ByteBuffer serialize() {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+        try {
+            dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
+        } catch (IOException e) {
+            // ignored
+        }
+        SerializeUtils.serialize(removedNode, dataOutputStream);
+        return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+    }
+
+    @Override
+    public void deserialize(ByteBuffer buffer) {
+        removedNode = new Node();
+        SerializeUtils.deserialize(removedNode, buffer);
+    }
+
+    public Node getRemovedNode() {
+        return removedNode;
+    }
+
+    public void setRemovedNode(Node removedNode) {
+        this.removedNode = removedNode;
+    }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 28e310e..6eed911 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Different from PartitionedSnapshotLogManager, FilePartitionedSnapshotLogManager does not store
- * the committed in memory, it considers the logs are contained in the TsFiles so it will record
+ * the committed in memory after snapshots, it considers the logs are contained in the TsFiles so it will record
  * every TsFiles in the slot instead.
  */
 public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogManager<FileSnapshot> {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index e740c86..5b0c432 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -96,6 +96,7 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
 
   public void setSnapshot(T snapshot, int slot) {
     synchronized (slotSnapshots) {
+      // TODO-Cluster#451: persist the remote snapshot so the pulling can be recovered in restart
       slotSnapshots.put(slot, snapshot);
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
new file mode 100644
index 0000000..df5228b
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -0,0 +1,41 @@
+package org.apache.iotdb.cluster.partition;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+/**
+ * NodeRemovalResult stores the removed partition group and who will take over its slots.
+ */
+public class NodeRemovalResult {
+  private PartitionGroup removedGroup;
+  // if the removed group contains the local node, the local node should join a new group to
+  // preserve the replication number
+  private PartitionGroup newGroup;
+  private Map<Node, List<Integer>> newSlotOwners;
+
+  public PartitionGroup getRemovedGroup() {
+    return removedGroup;
+  }
+
+  public void setRemovedGroup(PartitionGroup group) {
+    this.removedGroup = group;
+  }
+
+  public Map<Node, List<Integer>> getNewSlotOwners() {
+    return newSlotOwners;
+  }
+
+  public void setNewSlotOwners(
+      Map<Node, List<Integer>> newSlotOwners) {
+    this.newSlotOwners = newSlotOwners;
+  }
+
+  public PartitionGroup getNewGroup() {
+    return newGroup;
+  }
+
+  public void setNewGroup(PartitionGroup newGroup) {
+    this.newGroup = newGroup;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 130f48b..5810f34 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -68,6 +68,12 @@ public interface PartitionTable {
   PartitionGroup addNode(Node node);
 
   /**
+   * Remove a node and update the partition table.
+   * @param node
+   */
+  NodeRemovalResult removeNode(Node node);
+
+  /**
    *
    * @return All data groups where all VNodes of this node is the header. The first index
    * indicates the VNode and the second index indicates the data group of one VNode.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
index d2d3016..b1d7e31 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
@@ -91,8 +91,7 @@ public class SlotPartitionTable implements PartitionTable {
     int nodeNum = nodeRing.size();
     int slotsPerNode = slotNum / nodeNum;
     for (Node node : nodeRing) {
-      List<Integer> nodeSlots = new ArrayList<>();
-      nodeSlotMap.put(node, nodeSlots);
+      nodeSlotMap.put(node, new ArrayList<>());
     }
 
     for (int i = 0; i < slotNum; i++) {
@@ -150,10 +149,6 @@ public class SlotPartitionTable implements PartitionTable {
     return ret;
   }
 
-  private int nextIndex(int currIndex) {
-    return currIndex == nodeRing.size() - 1 ? 0 : currIndex + 1;
-  }
-
   @Override
   public PartitionGroup route(String storageGroupName, long timestamp) {
     synchronized (nodeRing) {
@@ -359,4 +354,63 @@ public class SlotPartitionTable implements PartitionTable {
   public int hashCode() {
     return 0;
   }
+
+  @Override
+  public NodeRemovalResult removeNode(Node target) {
+    synchronized (nodeRing) {
+      if (!nodeRing.contains(target)) {
+        return null;
+      }
+
+      NodeRemovalResult result = new NodeRemovalResult();
+      result.setRemovedGroup(getHeaderGroup(target));
+      nodeRing.remove(target);
+
+      // if the node belongs to a group that headed by target, this group should be removed
+      // and other groups containing target should be updated
+      int removedGroupIdx = -1;
+      for(int i = 0; i < localGroups.size(); i++) {
+        PartitionGroup oldGroup = localGroups.get(i);
+        Node header = oldGroup.getHeader();
+        if (header.equals(target)) {
+          removedGroupIdx = i;
+        } else {
+          PartitionGroup newGrp = getHeaderGroup(header);
+          localGroups.set(i, newGrp);
+          result.setNewGroup(newGrp);
+        }
+      }
+      if (removedGroupIdx != -1) {
+        localGroups.remove(removedGroupIdx);
+        // each node exactly joins replicationNum groups, so when a group is removed, the node
+        // should join a new one
+        int thisNodeIdx = nodeRing.indexOf(thisNode);
+        // this node must be the last node of the new group
+        int headerNodeIdx = thisNodeIdx - (replicationNum - 1);
+        headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx;
+        Node header = nodeRing.get(headerNodeIdx);
+        PartitionGroup newGrp = getHeaderGroup(header);
+        localGroups.add(newGrp);
+      }
+
+      // the slots movement is only done logically, the new node itself will pull data from the
+      // old node
+      Map<Node, List<Integer>> nodeListMap = retrieveSlots(target);
+      result.setNewSlotOwners(nodeListMap);
+      return result;
+    }
+  }
+
+  private Map<Node, List<Integer>> retrieveSlots(Node target) {
+    Map<Node, List<Integer>> newHolderSlotMap = new HashMap<>();
+    List<Integer> slots = nodeSlotMap.remove(target);
+    for (int i = 0; i < slots.size(); i++) {
+      int slot = slots.get(i);
+      Node newHolder = nodeRing.get(i % nodeRing.size());
+      slotNodeMap.put(slot, newHolder);
+      nodeSlotMap.get(newHolder).add(slot);
+      newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
+    }
+    return newHolderSlotMap;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 2ca212f..3a89b66 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.exception.NoHeaderNodeException;
 import org.apache.iotdb.cluster.exception.NotInSameGroupException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -88,7 +89,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
         if (partitionTable != null) {
           try {
             member = createNewMember(header);
-          } catch (NotInSameGroupException e) {
+          } catch (NotInSameGroupException | TTransportException e) {
             ex = e;
           }
         } else {
@@ -103,7 +104,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     }
   }
 
-  private DataGroupMember createNewMember(Node header) throws NotInSameGroupException {
+  private DataGroupMember createNewMember(Node header)
+      throws NotInSameGroupException, TTransportException {
     DataGroupMember member;
     synchronized (partitionTable) {
       // it may be that the header and this node are in the same group, but it is the first time
@@ -114,6 +116,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
         member = dataMemberFactory.create(partitionGroup, thisNode);
         headerGroupMap.put(header, member);
         logger.info("Created a member for header {}", header);
+        member.start();
       } else {
         logger.info("This node {} does not belong to the group {}", thisNode, partitionGroup);
         throw new NotInSameGroupException(partitionTable.getHeaderGroup(header),
@@ -298,6 +301,41 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     }
   }
 
+  public void removeNode(Node node, NodeRemovalResult removalResult) {
+    Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
+    synchronized (headerGroupMap) {
+      while (entryIterator.hasNext()) {
+        Entry<Node, DataGroupMember> entry = entryIterator.next();
+        DataGroupMember dataGroupMember = entry.getValue();
+        if (dataGroupMember.getHeader().equals(node)) {
+          // the group is removed as the node is removed, so new writes should be rejected as
+          // they belong to the new holder, but the member is kept alive for other nodes to pull
+          // snapshots
+          dataGroupMember.setReadOnly();
+          //TODO-Cluster: when to call removeLocalData?
+        } else {
+          if (node.equals(thisNode)) {
+            // this node is removed, it is no more replica of other groups
+            List<Integer> nodeSlots = partitionTable.getNodeSlots(dataGroupMember.getHeader());
+            dataGroupMember.removeLocalData(nodeSlots);
+            entryIterator.remove();
+          } else {
+            // the group should be updated and pull new slots from the removed node
+            dataGroupMember.removeNode(node, removalResult);
+          }
+        }
+      }
+      PartitionGroup newGroup = removalResult.getNewGroup();
+      if (newGroup != null) {
+        try {
+          createNewMember(newGroup.getHeader());
+        } catch (NotInSameGroupException | TTransportException e) {
+          // ignored
+        }
+      }
+    }
+  }
+
   @Override
   public void pullTimeSeriesSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
@@ -319,7 +357,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   public void pullSnapshots() {
     List<Integer> slots = partitionTable.getNodeSlots(thisNode);
     DataGroupMember dataGroupMember = headerGroupMap.get(thisNode);
-    dataGroupMember.pullSnapshots(slots, thisNode);
+    dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
   }
 
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 9ec26cd..61b1042 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.RegisterManager;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TTransportException;
@@ -176,4 +177,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
     return member;
   }
 
+  @Override
+  public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) throws TException {
+    member.removeNode(node, resultHandler);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
index 7064c73..e0fab54 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
@@ -34,6 +34,7 @@ public class Response {
   public static final long RESPONSE_NO_CONNECTION = -6;
   public static final long RESPONSE_META_LOG_STALE = -7;
   public static final long RESPONSE_LEADER_STILL_ONLINE = -8;
+  public static final long RESPONSE_CLUSTER_TOO_SMALL = -9;
 
   private Response() {
     // enum class
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 5bcb8de..3e901a5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
 import org.apache.iotdb.cluster.log.snapshot.RemoteFileSnapshot;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.query.ClusterQueryParser;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
@@ -229,7 +230,6 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
 
     long thisMetaLastLogIndex = metaGroupMember.getLogManager().getLastLogIndex();
     long thisMetaLastLogTerm = metaGroupMember.getLogManager().getLastLogTerm();
-    long thisMetaTerm = metaGroupMember.getTerm().get();
 
     // term of the electors's MetaGroupMember is not verified, so 0 and 1 are used to make sure
     // the verification does not fail
@@ -479,7 +479,12 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
     }
   }
 
-  public void pullSnapshots(List<Integer> slots, Node newNode) {
+  /**
+   * Pull snapshots from the previous holders after newNode joins the cluster.
+   * @param slots
+   * @param newNode
+   */
+  public void pullNodeAdditionSnapshots(List<Integer> slots, Node newNode) {
     synchronized (logManager) {
       logger.info("{} pulling {} slots from remote", name, slots.size());
       PartitionedSnapshot snapshot = (PartitionedSnapshot) logManager.getSnapshot();
@@ -499,17 +504,14 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       for (Entry<Node, List<Integer>> entry : holderSlotsMap.entrySet()) {
         Node node = entry.getKey();
         List<Integer> nodeSlots = entry.getValue();
-        pullFileSnapshot(node, nodeSlots);
+        pullFileSnapshot(metaGroupMember.getPartitionTable().getHeaderGroup(node),  nodeSlots);
       }
     }
   }
 
-  private void pullFileSnapshot(Node node, List<Integer> nodeSlots) {
-    PartitionGroup prevHolders =
-        new PartitionGroup(metaGroupMember.getPartitionTable().getHeaderGroup(node));
-
+  private void pullFileSnapshot(PartitionGroup prevHolders,  List<Integer> nodeSlots) {
     Future<Map<Integer, FileSnapshot>> snapshotFuture =
-        pullSnapshotService.submit(new PullSnapshotTask(node, nodeSlots, this,
+        pullSnapshotService.submit(new PullSnapshotTask(prevHolders.getHeader(), nodeSlots, this,
             prevHolders, FileSnapshot::new));
     for (int slot : nodeSlots) {
       logManager.setSnapshot(new RemoteFileSnapshot(snapshotFuture), slot);
@@ -718,7 +720,6 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
     }
     try {
       if (reader.hasNextBatch()) {
-        int count = 0;
         BatchData batchData = reader.nextBatch();
 
         ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -743,5 +744,34 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       resultHandler.onError(e);
     }
   }
+
+  /**
+   * When the node does not play a member in a group any more, the corresponding local data should
+   * be removed.
+   */
+  public void removeLocalData(List<Integer> slots) {
+    for (Integer slot : slots) {
+      //TODO-Cluster: remove the data in the slot
+    }
+  }
+
+  /**
+   * When a node is removed and it is not the header of the group, the member should take over
+   * some slots from the removed group, and add a new node to the group the removed node was in the
+   * group.
+   */
+  public void removeNode(Node removedNode, NodeRemovalResult removalResult) {
+    synchronized (allNodes) {
+      if (allNodes.contains(removedNode)) {
+        // update the group if the deleted node was in it
+        allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+      }
+      List<Integer> slotsToPull = removalResult.getNewSlotOwners().get(getHeader());
+      if (slotsToPull != null) {
+        // pull the slots that should be taken over
+        pullFileSnapshot(removalResult.getRemovedGroup(), slotsToPull);
+      }
+    }
+  }
 }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 19c520c..5cdf481 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.AddSelfException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.NotInSameGroupException;
+import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.RequestTimeOutException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.Log;
@@ -62,8 +63,10 @@ import org.apache.iotdb.cluster.log.applier.DataLogApplier;
 import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
 import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.SlotPartitionTable;
@@ -72,17 +75,7 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.query.reader.RemoteSeriesReaderByTimestamp;
 import org.apache.iotdb.cluster.query.reader.RemoteSimpleSeriesReader;
-import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
-import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
-import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
-import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
-import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
-import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
-import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
-import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
+import org.apache.iotdb.cluster.rpc.thrift.*;
 import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
 import org.apache.iotdb.cluster.server.ClientServer;
 import org.apache.iotdb.cluster.server.DataClusterServer;
@@ -138,8 +131,6 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
   public static final int REPLICATION_NUM =
       ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum();
 
-  private TProtocolFactory protocolFactory;
-
   // blind nodes are nodes that does not know the nodes in the cluster
   private Set<Node> blindNodes = new HashSet<>();
   private Set<Node> idConflictNodes = new HashSet<>();
@@ -150,7 +141,6 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
   private ClientServer clientServer;
 
   private LogApplier metaLogApplier = new MetaLogApplier(this);
-  private LogApplier dataLogApplier = new DataLogApplier(this);
   private DataGroupMember.Factory dataMemberFactory;
 
   private MetaSingleSnapshotLogManager logManager;
@@ -165,8 +155,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
       throws IOException {
     super("Meta", new ClientPool(new MetaClient.Factory(new TAsyncClientManager(), factory)));
     allNodes = new ArrayList<>();
-    this.protocolFactory = factory;
-    dataMemberFactory = new Factory(protocolFactory, this, dataLogApplier,
+    LogApplier dataLogApplier = new DataLogApplier(this);
+    dataMemberFactory = new Factory(factory, this, dataLogApplier,
         new TAsyncClientManager());
     dataClientPool =
         new ClientPool(new DataClient.Factory(new TAsyncClientManager(), factory));
@@ -292,7 +282,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
             DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode);
             getDataClusterServer().addDataGroupMember(dataGroupMember);
             dataGroupMember.start();
-            dataGroupMember.pullSnapshots(partitionTable.getNodeSlots(newNode), newNode);
+            dataGroupMember.pullNodeAdditionSnapshots(partitionTable.getNodeSlots(newNode), newNode);
           } catch (TTransportException e) {
             logger.error("Fail to create data newMember for new header {}", newNode, e);
           }
@@ -610,20 +600,12 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
         switch (result) {
           case OK:
             logger.info("Join request of {} is accepted", node);
-            // add node is instantly applied to update the partition table
-            try {
-              logManager.getApplier().apply(addNodeLog);
-            } catch (QueryProcessException e) {
-              logManager.removeLastLog();
-              resultHandler.onError(e);
-              return true;
-            }
+            logManager.commitLog(addNodeLog.getCurrLogIndex());
             synchronized (partitionTable) {
               response.setPartitionTableBytes(partitionTable.serialize());
             }
             response.setRespNum((int) Response.RESPONSE_AGREE);
             resultHandler.onComplete(response);
-            logManager.commitLog(logManager.getLastLogIndex());
             return true;
           case TIME_OUT:
             logger.info("Join request of {} timed out", node);
@@ -1220,10 +1202,6 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
     return nodeStatus;
   }
 
-  ClientServer getClientServer() {
-    return clientServer;
-  }
-
   @Override
   public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
     resultHandler.onComplete(new TNodeStatus());
@@ -1242,4 +1220,117 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
       dataClusterServer.setPartitionTable(partitionTable);
     }
   }
+
+  @Override
+  public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
+    if (partitionTable == null) {
+      logger.info("Cannot add node now because the partition table is not set");
+      resultHandler.onError(new PartitionTableUnavailableException(thisNode));
+      return;
+    }
+
+    // try to process the request locally, if it cannot be processed locally, forward it
+    if (processRemoveNodeLocally(node, resultHandler)) {
+      return;
+    }
+
+    if (character == NodeCharacter.FOLLOWER && leader != null) {
+      logger.info("Forward the node removal request of {} to leader {}", node, leader);
+      if (forwardRemoveNode(node, resultHandler)) {
+        return;
+      }
+    }
+    resultHandler.onError(new LeaderUnknownException(getAllNodes()));
+  }
+
+  private boolean forwardRemoveNode(Node node, AsyncMethodCallback resultHandler) {
+    TSMetaService.AsyncClient client = (TSMetaService.AsyncClient) connectNode(leader);
+    if (client != null) {
+      try {
+        client.removeNode(node, new GenericForwardHandler(resultHandler));
+        return true;
+      } catch (TException e) {
+        logger.warn("Cannot connect to node {}", node, e);
+      }
+    }
+    return false;
+  }
+
+  private boolean processRemoveNodeLocally(Node node, AsyncMethodCallback resultHandler) {
+    if (character == NodeCharacter.LEADER) {
+      if (allNodes.size() <= ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum()) {
+        resultHandler.onComplete(Response.RESPONSE_CLUSTER_TOO_SMALL);
+        return true;
+      }
+
+      Node target = null;
+      synchronized (allNodes) {
+        for (Node n : allNodes) {
+          if (n.ip.equals(node.ip) && n.metaPort == node.metaPort){
+            target = n;
+            break;
+          }
+        }
+      }
+
+      if (target == null) {
+        logger.debug("Node {} is not in the cluster", node);
+        resultHandler.onComplete(Response.RESPONSE_REJECT);
+        return true;
+      }
+
+      // node removal must be serialized
+      synchronized (logManager) {
+        RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+        removeNodeLog.setCurrLogTerm(getTerm().get());
+        removeNodeLog.setPreviousLogIndex(logManager.getLastLogIndex());
+        removeNodeLog.setPreviousLogTerm(logManager.getLastLogTerm());
+        removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+
+        removeNodeLog.setRemovedNode(target);
+
+        logManager.appendLog(removeNodeLog);
+
+        logger.info("Send the node removal request of {} to other nodes", target);
+        AppendLogResult result = sendLogToAllGroups(removeNodeLog);
+
+        switch (result) {
+          case OK:
+            logger.info("Join request of {} is accepted", target);
+            logManager.commitLog(removeNodeLog.getCurrLogIndex());
+            resultHandler.onComplete(Response.RESPONSE_AGREE);
+            return true;
+          case TIME_OUT:
+            logger.info("Join request of {} timed out", target);
+            resultHandler.onError(new RequestTimeOutException(removeNodeLog));
+            logManager.removeLastLog();
+            return true;
+          case LEADERSHIP_STALE:
+          default:
+            logManager.removeLastLog();
+            // if the leader is found, forward to it
+        }
+      }
+    }
+    return false;
+  }
+
+  public void applyRemoveNode(Node oldNode) {
+    synchronized (allNodes) {
+      if (allNodes.contains(oldNode)) {
+        logger.debug("Removing a node {} from {}", oldNode, allNodes);
+        allNodes.remove(oldNode);
+        idNodeMap.remove(oldNode.nodeIdentifier);
+
+        // update the partition table
+        NodeRemovalResult result = partitionTable.removeNode(oldNode);
+
+        getDataClusterServer().removeNode(oldNode, result);
+        if (oldNode == leader) {
+          setCharacter(NodeCharacter.ELECTOR);
+          lastHeartBeatReceivedTime = Long.MIN_VALUE;
+        }
+      }
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 6deb034..f4789bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -105,6 +105,11 @@ public abstract class RaftMember implements RaftService.AsyncIface {
   // know if this node is synchronized with the leader
   private Object syncLock = new Object();
 
+  // when the header of the group is removed from the cluster, the members of the group should no
+  // longer accept writes, but they still can be read candidates for weak consistency reads and
+  // provide snapshots for the new holders
+  private volatile boolean readOnly = false;
+
   public RaftMember() {
   }
 
@@ -652,6 +657,10 @@ public abstract class RaftMember implements RaftService.AsyncIface {
   TSStatus processPlanLocally(PhysicalPlan plan) {
     logger.debug("{}: Processing plan {}", name, plan);
     synchronized (logManager) {
+      if (readOnly) {
+        return StatusUtils.NODE_READ_ONLY;
+      }
+
       PhysicalPlanLog log = new PhysicalPlanLog();
       log.setCurrLogTerm(getTerm().get());
       log.setPreviousLogIndex(logManager.getLastLogIndex());
@@ -798,4 +807,10 @@ public abstract class RaftMember implements RaftService.AsyncIface {
       resultHandler.onError(e);
     }
   }
+
+  public void setReadOnly() {
+    synchronized (logManager) {
+      readOnly = true;
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
index 99fdd48..718a7e0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
@@ -39,6 +39,7 @@ public class StatusUtils {
   public static final TSStatus EXECUTE_STATEMENT_ERROR =
       getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   public static final TSStatus NO_STORAGE_GROUP = getStatus(TSStatusCode.STORAGE_GROUP_ERROR);
+  public static final TSStatus NODE_READ_ONLY = getStatus(TSStatusCode.NODE_READ_ONLY);
 
   private static TSStatus getStatus(TSStatusCode statusCode) {
     TSStatusType tsStatusType = new TSStatusType();
@@ -55,6 +56,9 @@ public class StatusUtils {
       case PARTITION_NOT_READY:
         tsStatusType.setMessage("Partition table is not ready");
         break;
+      case NODE_READ_ONLY:
+        tsStatusType.setMessage("Current node is read-only, please retry to find another available node");
+        break;
       default:
         tsStatusType.setMessage("");
         break;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java
index ef14b1d..aeda017 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.common;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -39,6 +40,11 @@ public class TestPartitionTable implements PartitionTable {
   }
 
   @Override
+  public NodeRemovalResult removeNode(Node node) {
+    return null;
+  }
+
+  @Override
   public List<PartitionGroup> getLocalGroups() {
     return null;
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java
index 457fffe..45d372e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.common.TestClient;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.log.LogManager;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -61,6 +62,11 @@ public class MetaHeartBeatThreadTest extends HeartBeatThreadTest {
     }
 
     @Override
+    public NodeRemovalResult removeNode(Node node) {
+      return null;
+    }
+
+    @Override
     public List<PartitionGroup> getLocalGroups() {
       return null;
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 9ea7028..3caf096 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -427,7 +427,7 @@ public class DataGroupMemberTest extends MemberTest {
       hasInitialSnapshots = false;
       partitionTable.addNode(TestUtils.getNode(10));
       List<Integer> requiredSlots = Arrays.asList(19, 39, 59, 79, 99);
-      dataGroupMember.pullSnapshots(requiredSlots, TestUtils.getNode(10));
+      dataGroupMember.pullNodeAdditionSnapshots(requiredSlots, TestUtils.getNode(10));
       assertEquals(requiredSlots.size(), receivedSnapshots.size());
       for (Integer requiredSlot : requiredSlots) {
         receivedSnapshots.get(requiredSlot).getRemoteSnapshot();
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index fd9019a..87a4171 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -65,7 +65,8 @@ public enum TSStatusCode {
   PARTITION_NOT_READY(700),
   TIME_OUT(701),
   NO_LEADER(702),
-  UNSUPPORTED_OPERATION(703);
+  UNSUPPORTED_OPERATION(703),
+  NODE_READ_ONLY(704);
 
 
   private int statusCode;
diff --git a/service-rpc/src/main/thrift/cluster.thrift b/service-rpc/src/main/thrift/cluster.thrift
index eb734cd..f3b4b3d 100644
--- a/service-rpc/src/main/thrift/cluster.thrift
+++ b/service-rpc/src/main/thrift/cluster.thrift
@@ -271,6 +271,13 @@ service TSMetaService extends RaftService {
   **/
   AddNodeResponse addNode(1: Node node)
 
+  /**
+  * Remove a node from the cluster. If the node is not in the cluster or the cluster size will
+  * less than replication number, the request will be rejected.
+  * return -1(RESPONSE_AGREE) or -3(RESPONSE_REJECT) or -9(RESPONSE_CLUSTER_TOO_SMALL)
+  **/
+  long removeNode(1: Node node)
+
   TNodeStatus queryNodeStatus()
 
   Node checkAlive()
diff --git a/session/pom.xml b/session/pom.xml
index 8df9ae6..dab30aa 100644
--- a/session/pom.xml
+++ b/session/pom.xml
@@ -19,134 +19,132 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <parent>
-    <artifactId>iotdb-parent</artifactId>
-    <groupId>org.apache.iotdb</groupId>
-    <version>0.10.0-SNAPSHOT</version>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>iotdb-session</artifactId>
-  <name>IoTDB Session</name>
-  <properties>
-    <session.test.skip>false</session.test.skip>
-    <session.it.skip>${session.test.skip}</session.it.skip>
-    <session.ut.skip>${session.test.skip}</session.ut.skip>
-  </properties>
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <version>3.1.0</version>
-        <configuration>
-          <descriptorRefs>
-            <descriptorRef>jar-with-dependencies</descriptorRef>
-          </descriptorRefs>
-        </configuration>
-        <executions>
-          <execution>
-            <id>make-assembly</id>
-            <!-- this is used for inheritance merges -->
-            <phase>package</phase>
-            <!-- bind to the packaging phase -->
-            <goals>
-              <goal>single</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <!--using `mvn test` to run UT, `mvn verify` to run ITs
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>iotdb-parent</artifactId>
+        <groupId>org.apache.iotdb</groupId>
+        <version>0.10.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>iotdb-session</artifactId>
+    <name>IoTDB Session</name>
+    <properties>
+        <session.test.skip>false</session.test.skip>
+        <session.it.skip>${session.test.skip}</session.it.skip>
+        <session.ut.skip>${session.test.skip}</session.ut.skip>
+    </properties>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.1.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <!-- this is used for inheritance merges -->
+                        <phase>package</phase>
+                        <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <!--using `mvn test` to run UT, `mvn verify` to run ITs
       Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <skipTests>${session.ut.skip}</skipTests>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-failsafe-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>run-integration-tests</id>
-            <phase>integration-test</phase>
-            <goals>
-              <goal>integration-test</goal>
-              <goal>verify</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <skipTests>${session.test.skip}</skipTests>
-          <skipITs>${session.it.skip}</skipITs>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.iotdb</groupId>
-      <artifactId>service-rpc</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.iotdb</groupId>
-      <artifactId>tsfile</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.iotdb</groupId>
-      <artifactId>iotdb-server</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.iotdb</groupId>
-      <artifactId>iotdb-server</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.iotdb</groupId>
-      <artifactId>iotdb-jdbc</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-    </dependency>
-  </dependencies>
-  <profiles>
-    <profile>
-      <id>skipSessionTests</id>
-      <activation>
-        <property>
-          <name>skipTests</name>
-          <value>true</value>
-        </property>
-      </activation>
-      <properties>
-        <session.test.skip>true</session.test.skip>
-        <session.ut.skip>true</session.ut.skip>
-        <session.it.skip>true</session.it.skip>
-      </properties>
-    </profile>
-    <profile>
-      <id>skipUT_SessionTests</id>
-      <activation>
-        <property>
-          <name>skipUTs</name>
-          <value>true</value>
-        </property>
-      </activation>
-      <properties>
-        <session.ut.skip>true</session.ut.skip>
-      </properties>
-    </profile>
-  </profiles>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skipTests>${session.ut.skip}</skipTests>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>run-integration-tests</id>
+                        <phase>integration-test</phase>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <skipTests>${session.test.skip}</skipTests>
+                    <skipITs>${session.it.skip}</skipITs>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>service-rpc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>tsfile</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-jdbc</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
+    </dependencies>
+    <profiles>
+        <profile>
+            <id>skipSessionTests</id>
+            <activation>
+                <property>
+                    <name>skipTests</name>
+                    <value>true</value>
+                </property>
+            </activation>
+            <properties>
+                <session.test.skip>true</session.test.skip>
+                <session.ut.skip>true</session.ut.skip>
+                <session.it.skip>true</session.it.skip>
+            </properties>
+        </profile>
+        <profile>
+            <id>skipUT_SessionTests</id>
+            <activation>
+                <property>
+                    <name>skipUTs</name>
+                    <value>true</value>
+                </property>
+            </activation>
+            <properties>
+                <session.ut.skip>true</session.ut.skip>
+            </properties>
+        </profile>
+    </profiles>
 </project>