You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/03/01 02:30:36 UTC

[iotdb] branch cluster_scalability updated: This commit fix following issues: 1. remove meta leader lead to new leader can not commit remove log until next meta log, which is fixed by empty content log 2. reorganized the logic of get member 3. meta leader should notify the target node to exile when removing

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_scalability by this push:
     new 8c7f80b  This commit fix following issues: 1. remove meta leader lead to new leader can not commit remove log until next meta log, which is fixed by empty content log 2. reorganized the logic of get member 3. meta leader should notify the target node to exile when removing
8c7f80b is described below

commit 8c7f80b4aff04044f8a6076de09a25663abdabae
Author: lta <li...@163.com>
AuthorDate: Mon Mar 1 10:27:08 2021 +0800

    This commit fix following issues:
    1. remove meta leader lead to new leader can not commit remove log until next meta log, which is fixed by empty content log
    2. reorganized the logic of get member
    3. meta leader should notify the target node to exile when removing
---
 .../cluster/client/sync/SyncClientAdaptor.java     |   2 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |  29 +---
 .../log/manage/MetaSingleSnapshotLogManager.java   |   2 +-
 .../iotdb/cluster/partition/NodeRemovalResult.java |  31 ----
 .../iotdb/cluster/partition/PartitionTable.java    |   2 +
 .../partition/slot/SlotNodeRemovalResult.java      |   2 +-
 .../cluster/partition/slot/SlotPartitionTable.java |  10 +-
 .../iotdb/cluster/server/DataClusterServer.java    | 161 +++++++++++++--------
 .../cluster/server/heartbeat/HeartbeatThread.java  |   2 +-
 .../server/heartbeat/MetaHeartbeatThread.java      |   8 +
 .../cluster/server/member/DataGroupMember.java     |  25 ++--
 .../cluster/server/member/MetaGroupMember.java     |  75 +++++++++-
 .../iotdb/cluster/server/member/RaftMember.java    |   7 +-
 .../cluster/server/service/MetaAsyncService.java   |  10 +-
 .../cluster/server/service/MetaSyncService.java    |   6 +-
 .../cluster/partition/SlotPartitionTableTest.java  |   4 -
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |   5 +
 .../cluster/server/member/MetaGroupMemberTest.java |   3 +-
 18 files changed, 235 insertions(+), 149 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 019a08b..b028766 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -94,7 +94,7 @@ public class SyncClientAdaptor {
     asyncMetaClient.removeNode(nodeToRemove, handler);
     synchronized (responseRef) {
       if (responseRef.get() == null) {
-        responseRef.wait(RaftServer.getConnectionTimeoutInMS());
+        responseRef.wait(60 * 1000L);
       }
     }
     return responseRef.get();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index 705000f..9b98a73 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -19,19 +19,15 @@
 
 package org.apache.iotdb.cluster.log.applier;
 
-import org.apache.iotdb.cluster.exception.ChangeMembershipException;
-import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.physical.sys.LogPlan;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,41 +46,24 @@ public class MetaLogApplier extends BaseApplier {
 
   @Override
   public void apply(Log log) {
-    apply(log, false);
-  }
-
-  public void apply(Log log, boolean isLeader) {
     try {
       logger.debug("MetaMember [{}] starts applying Log {}", metaGroupMember.getName(), log);
       if (log instanceof AddNodeLog) {
-        if (isLeader) {
-          sendLogToAllDataGroups(log);
-        }
         member.applyAddNode((AddNodeLog) log);
       } else if (log instanceof PhysicalPlanLog) {
         applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
       } else if (log instanceof RemoveNodeLog) {
-        if (isLeader) {
-          sendLogToAllDataGroups(log);
-        }
         member.applyRemoveNode(((RemoveNodeLog) log));
+      } else if (log instanceof EmptyContentLog) {
+        // Do nothing
       } else {
         logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
       }
-    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException | UnsupportedPlanException e) {
+    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) {
       logger.debug("Exception occurred when executing {}", log, e);
       log.setException(e);
     } finally {
       log.setApplied(true);
     }
   }
-
-  private void sendLogToAllDataGroups(Log log)
-      throws ChangeMembershipException, UnsupportedPlanException {
-    LogPlan plan = new LogPlan(log.serialize());
-    TSStatus status = member.processPartitionedPlan(plan);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new ChangeMembershipException(String.format("apply %s failed with status {%s}", log, status));
-    }
-  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index c92a482..c3136b6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@ -94,7 +94,7 @@ public class MetaSingleSnapshotLogManager extends RaftLogManager {
         continue;
       }
       try {
-        ((MetaLogApplier)logApplier).apply(entry, metaGroupMember.getCharacter() == NodeCharacter.LEADER);
+        logApplier.apply(entry);
       } catch (Exception e) {
         logger.error("Can not apply log {}", entry, e);
         entry.setException(e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 16e25e2..d075e3c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -33,9 +33,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 public class NodeRemovalResult {
 
   private List<PartitionGroup> removedGroupList = new ArrayList<>();
-  // if the removed group contains the local node, the local node should join a new group to
-  // preserve the replication number
-  private List<PartitionGroup> newGroupList = new ArrayList<>();
 
   public PartitionGroup getRemovedGroup(int raftId) {
     for (PartitionGroup group : removedGroupList) {
@@ -50,32 +47,11 @@ public class NodeRemovalResult {
     this.removedGroupList.add(group);
   }
 
-  public List<PartitionGroup> getNewGroupList() {
-    return newGroupList;
-  }
-
-  public void addNewGroup(PartitionGroup newGroup) {
-    this.newGroupList.add(newGroup);
-  }
-
-  public PartitionGroup getNewGroup(int raftId) {
-    for (PartitionGroup group : newGroupList) {
-      if (group.getId() == raftId) {
-        return group;
-      }
-    }
-    return null;
-  }
-
   public void serialize(DataOutputStream dataOutputStream) throws IOException {
     dataOutputStream.writeInt(removedGroupList.size());
     for (PartitionGroup group: removedGroupList) {
       group.serialize(dataOutputStream);
     }
-    dataOutputStream.writeInt(newGroupList.size());
-    for (PartitionGroup group: newGroupList) {
-      group.serialize(dataOutputStream);
-    }
   }
 
   public void deserialize(ByteBuffer buffer) {
@@ -85,13 +61,6 @@ public class NodeRemovalResult {
       group.deserialize(buffer);
       removedGroupList.add(group);
     }
-
-    int newGroupListSize = buffer.getInt();
-    for (int i = 0 ; i < newGroupListSize; i++) {
-      PartitionGroup group = new PartitionGroup();
-      group.deserialize(buffer);
-      newGroupList.add(group);
-    }
   }
 
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index c1c7c21..97d88b7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -98,6 +98,8 @@ public interface PartitionTable {
    */
   boolean deserialize(ByteBuffer buffer);
 
+  boolean checkChangeMembershipValidity(long targetLogIndex);
+
   List<Node> getAllNodes();
 
   List<PartitionGroup> getGlobalGroups();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
index 9a17ea3..d8fbb5b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
@@ -52,7 +52,7 @@ public class SlotNodeRemovalResult extends NodeRemovalResult {
     dataOutputStream.writeInt(newSlotOwners.size());
     for (Map.Entry<RaftNode, List<Integer>> entry: newSlotOwners.entrySet()) {
       RaftNode raftNode = entry.getKey();
-      dataOutputStream.writeInt(raftNode.getNode().nodeIdentifier);
+      SerializeUtils.serialize(raftNode.getNode(), dataOutputStream);
       dataOutputStream.writeInt(raftNode.getRaftId());
       dataOutputStream.writeInt(entry.getValue().size());
       for (Integer slot: entry.getValue()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 8817b9f..bd764b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -82,7 +82,7 @@ public class SlotPartitionTable implements PartitionTable {
   // find the data source
   private Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = new ConcurrentHashMap<>();
 
-  private NodeRemovalResult nodeRemovalResult = new NodeRemovalResult();
+  private NodeRemovalResult nodeRemovalResult = new SlotNodeRemovalResult();
 
   //the filed is used for determining which nodes need to be a group.
   // the data groups which this node belongs to.
@@ -414,7 +414,7 @@ public class SlotPartitionTable implements PartitionTable {
       previousNodeMap.put(raftNode, prevHolders);
     }
 
-    nodeRemovalResult = new NodeRemovalResult();
+    nodeRemovalResult = new SlotNodeRemovalResult();
     nodeRemovalResult.deserialize(buffer);
 
     nodeRing.clear();
@@ -431,6 +431,11 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
+  public boolean checkChangeMembershipValidity(long targetLogIndex) {
+    return lastLogIndex == targetLogIndex;
+  }
+
+  @Override
   public List<Node> getAllNodes() {
     return nodeRing;
   }
@@ -526,7 +531,6 @@ public class SlotPartitionTable implements PartitionTable {
         Node header = nodeRing.get(headerNodeIdx);
         PartitionGroup newGrp = getHeaderGroup(new RaftNode(header, raftId));
         localGroups.add(newGrp);
-        result.addNewGroup(newGrp);
       }
 
       globalGroups = calculateGlobalGroups(nodeRing);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 3b70208..ee026a4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -62,9 +62,9 @@ import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncProcessor;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Processor;
-import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.server.service.DataAsyncService;
 import org.apache.iotdb.cluster.server.service.DataSyncService;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -186,27 +186,25 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
     // avoid creating two members for a header
     Exception ex = null;
-    synchronized (headerGroupMap) {
-      member = headerGroupMap.get(raftNode);
-      if (member != null) {
-        return member;
-      }
-      logger.info("Received a request \"{}\" from unregistered header {}", request, raftNode);
-      if (partitionTable != null) {
-        try {
-          member = createNewMember(raftNode);
-        } catch (NotInSameGroupException | CheckConsistencyException e) {
-          ex = e;
-        }
-      } else {
-        logger.info("Partition is not ready, cannot create member");
-        ex = new PartitionTableUnavailableException(thisNode);
-      }
-      if (ex != null && resultHandler != null) {
-        resultHandler.onError(ex);
-      }
+    member = headerGroupMap.get(raftNode);
+    if (member != null) {
       return member;
     }
+    logger.info("Received a request \"{}\" from unregistered header {}", request, raftNode);
+    if (partitionTable != null) {
+      try {
+        member = createNewMember(raftNode);
+      } catch (NotInSameGroupException | CheckConsistencyException e) {
+        ex = e;
+      }
+    } else {
+      logger.info("Partition is not ready, cannot create member");
+      ex = new PartitionTableUnavailableException(thisNode);
+    }
+    if (ex != null && resultHandler != null) {
+      resultHandler.onError(ex);
+    }
+    return member;
   }
 
   /**
@@ -216,7 +214,6 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    */
   private DataGroupMember createNewMember(RaftNode raftNode)
       throws NotInSameGroupException, CheckConsistencyException {
-    DataGroupMember member;
     PartitionGroup partitionGroup;
     partitionGroup = partitionTable.getHeaderGroup(raftNode);
     if (partitionGroup == null || !partitionGroup.contains(thisNode)) {
@@ -224,24 +221,28 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       metaGroupMember.syncLeaderWithConsistencyCheck(true);
       partitionGroup = partitionTable.getHeaderGroup(raftNode);
     }
-    if (partitionGroup != null && partitionGroup.contains(thisNode)) {
-      // the two nodes are in the same group, create a new data member
-      member = dataMemberFactory.create(partitionGroup, thisNode);
-      DataGroupMember prevMember = headerGroupMap.put(raftNode, member);
-      if (prevMember != null) {
-        prevMember.stop();
-      }
-      logger.info("Created a member for header {}", raftNode);
-      member.start();
-    } else {
-      // the member may have been stopped after syncLeader
-      member = stoppedMemberManager.get(raftNode);
+    DataGroupMember member;
+    synchronized (headerGroupMap) {
+      member = headerGroupMap.get(raftNode);
       if (member != null) {
         return member;
       }
-      logger.info("This node {} does not belong to the group {}, header {}", thisNode,
-          partitionGroup, raftNode);
-      throw new NotInSameGroupException(partitionGroup, thisNode);
+      if (partitionGroup != null && partitionGroup.contains(thisNode)) {
+        // the two nodes are in the same group, create a new data member
+        member = dataMemberFactory.create(partitionGroup, thisNode);
+        headerGroupMap.put(raftNode, member);
+        logger.info("Created a member for header {}, group is {}", raftNode, partitionGroup);
+        member.start();
+      } else {
+        // the member may have been stopped after syncLeader
+        member = stoppedMemberManager.get(raftNode);
+        if (member != null) {
+          return member;
+        }
+        logger.info("This node {} does not belong to the group {}, header {}", thisNode,
+            partitionGroup, raftNode);
+        throw new NotInSameGroupException(partitionGroup, thisNode);
+      }
     }
     return member;
   }
@@ -399,23 +400,31 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void pullMeasurementSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
+        resultHandler,
         "Pull measurement schema");
-    service.pullMeasurementSchema(request, resultHandler);
+    if (service != null) {
+      service.pullMeasurementSchema(request, resultHandler);
+    }
   }
 
   @Override
   public void getAllDevices(Node header, int raftId, List<String> paths,
       AsyncMethodCallback<Set<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Get all devices");
-    service.getAllDevices(header, raftId, paths, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
+        "Get all devices");
+    if (service != null) {
+      service.getAllDevices(header, raftId, paths, resultHandler);
+    }
   }
 
   @Override
   public void getNodeList(Node header, int raftId, String path, int nodeLevel,
       AsyncMethodCallback<List<String>> resultHandler) {
     DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Get node list");
-    service.getNodeList(header, raftId, path, nodeLevel, resultHandler);
+    if (service != null) {
+      service.getNodeList(header, raftId, path, nodeLevel, resultHandler);
+    }
   }
 
   @Override
@@ -423,7 +432,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       AsyncMethodCallback<Set<String>> resultHandler) {
     DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Get child node path in next level");
-    service.getChildNodePathInNextLevel(header, raftId, path, resultHandler);
+    if (service != null) {
+      service.getChildNodePathInNextLevel(header, raftId, path, resultHandler);
+    }
   }
 
   @Override
@@ -431,14 +442,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Get all measurement schema");
-    service.getAllMeasurementSchema(header, raftId, planBytes, resultHandler);
+    if (service != null) {
+      service.getAllMeasurementSchema(header, raftId, planBytes, resultHandler);
+    }
   }
 
   @Override
   public void getAggrResult(GetAggrResultRequest request,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
     DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
-    service.getAggrResult(request, resultHandler);
+    if (service != null) {
+      service.getAggrResult(request, resultHandler);
+    }
   }
 
   @Override
@@ -446,20 +461,26 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       AsyncMethodCallback<List<String>> resultHandler) {
     DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
         "Check if measurements are registered");
-    service.getUnregisteredTimeseries(header, raftId, timeseriesList, resultHandler);
+    if (service != null) {
+      service.getUnregisteredTimeseries(header, raftId, timeseriesList, resultHandler);
+    }
   }
 
   @Override
   public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) {
     DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
-    service.getGroupByExecutor(request, resultHandler);
+    if (service != null) {
+      service.getGroupByExecutor(request, resultHandler);
+    }
   }
 
   @Override
   public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
     DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Fetch group by");
-    service.getGroupByResult(header, raftId, executorId, startTime, endTime, resultHandler);
+    if (service != null) {
+      service.getGroupByResult(header, raftId, executorId, startTime, endTime, resultHandler);
+    }
   }
 
   @Override
@@ -493,6 +514,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   public void preAddNodeForDataGroup(AddNodeLog log, DataGroupMember targetDataGroupMember) {
+
     // Make sure the previous add/remove node log has applied
     metaGroupMember.waitUtil(log.getMetaLogIndex() - 1);
 
@@ -501,6 +523,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       return;
     }
 
+    logger.debug("Pre add a new node {} to cluster", log.getNewNode());
     targetDataGroupMember.preAddNode(log.getNewNode());
   }
 
@@ -555,6 +578,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     dataGroupMember.waitFollowersToSync();
     dataGroupMember.stop();
     stoppedMemberManager.put(header, dataGroupMember);
+    logger.info("Data group member has removed, header is {}.", header);
   }
 
   /**
@@ -600,6 +624,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   public void preRemoveNodeForDataGroup(RemoveNodeLog log, DataGroupMember targetDataGroupMember) {
+
     // Make sure the previous add/remove node log has applied
     metaGroupMember.waitUtil(log.getMetaLogIndex() - 1);
 
@@ -608,6 +633,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       return;
     }
 
+    logger.debug("Removing a node {} from {}", log.getRemovedNode(), targetDataGroupMember.getAllNodes());
     targetDataGroupMember.preRemoveNode(log.getRemovedNode());
   }
 
@@ -634,11 +660,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
           dataGroupMember.removeNode(node, removalResult);
         }
       }
-      for (PartitionGroup newGroup : removalResult.getNewGroupList()) {
-        if (newGroup != null) {
-          logger.info("{} should join a new group {}", thisNode, newGroup);
+
+      // if the removed group contains the local node, the local node should join a new group to
+      // preserve the replication number
+      for (PartitionGroup group : partitionTable.getLocalGroups()) {
+        if (!headerGroupMap.containsKey(new RaftNode(group.getHeader(), group.getId()))) {
+          logger.info("{} should join a new group {}", thisNode, group);
           try {
-            createNewMember(new RaftNode(newGroup.getHeader(), newGroup.getId()));
+            createNewMember(new RaftNode(group.getHeader(), group.getId()));
           } catch (NotInSameGroupException e) {
             // ignored
           } catch (CheckConsistencyException ce) {
@@ -683,7 +712,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   public void previousFill(PreviousFillRequest request,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
-    service.previousFill(request, resultHandler);
+    if (service != null) {
+      service.previousFill(request, resultHandler);
+    }
   }
 
   public void closeLogManagers() {
@@ -696,27 +727,36 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   public void matchTerm(long index, long term, Node header, int raftId,
       AsyncMethodCallback<Boolean> resultHandler) {
     DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Match term");
-    service.matchTerm(index, term, header, raftId, resultHandler);
+    if (service != null) {
+      service.matchTerm(index, term, header, raftId, resultHandler);
+    }
   }
 
   @Override
   public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
     DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, "last");
-    service.last(request, resultHandler);
+    if (service != null) {
+      service.last(request, resultHandler);
+    }
   }
 
   @Override
   public void getPathCount(Node header, int raftId, List<String> pathsToQuery, int level,
       AsyncMethodCallback<Integer> resultHandler) {
     DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "count path");
-    service.getPathCount(header, raftId, pathsToQuery, level, resultHandler);
+    if (service != null) {
+      service.getPathCount(header, raftId, pathsToQuery, level, resultHandler);
+    }
   }
 
   @Override
   public void onSnapshotApplied(Node header, int raftId, List<Integer> slots,
       AsyncMethodCallback<Boolean> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Snapshot applied");
-    service.onSnapshotApplied(header, raftId, slots, resultHandler);
+    DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
+        "Snapshot applied");
+    if (service != null) {
+      service.onSnapshotApplied(header, raftId, slots, resultHandler);
+    }
   }
 
   @Override
@@ -904,7 +944,10 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   @Override
   public void removeHardLink(String hardLinkPath, int raftId,
       AsyncMethodCallback<Void> resultHandler) {
-    getDataAsyncService(new RaftNode(thisNode, raftId), resultHandler, hardLinkPath).removeHardLink(hardLinkPath, raftId,
-        resultHandler);
+    DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, raftId), resultHandler,
+        hardLinkPath);
+    if (service != null) {
+      service.removeHardLink(hardLinkPath, raftId, resultHandler);
+    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 6a2b041..99bf3f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -130,7 +130,7 @@ public class HeartbeatThread implements Runnable {
   /**
    * Send each node (except the local node) in the group of the member a heartbeat.
    */
-  private void sendHeartbeats() {
+  protected void sendHeartbeats() {
     synchronized (localMember.getTerm()) {
       request.setTerm(localMember.getTerm().get());
       request.setLeader(localMember.getThisNode());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index d51e151..89aa9d5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.cluster.server.heartbeat;
 
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,4 +70,11 @@ public class MetaHeartbeatThread extends HeartbeatThread {
     // erase the sent partition table so it will not be sent in the next heartbeat
     request.unsetPartitionTableBytes();
   }
+
+
+  @Override
+  void startElection() {
+    super.startElection();
+    localMetaMember.getAppendLogThreadPool().submit(() -> localMetaMember.processEmptyContentLog());
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index e0a382e..2bd460d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -749,7 +749,7 @@ public class DataGroupMember extends RaftMember {
 
   public void preRemoveNode(Node removedNode) {
     synchronized (allNodes) {
-      if (allNodes.contains(removedNode)) {
+      if (allNodes.contains(removedNode) && allNodes.size() == config.getReplicationNum()) {
         // update the group if the deleted node was in it
         PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
         if (newGroup == null) {
@@ -772,7 +772,7 @@ public class DataGroupMember extends RaftMember {
     syncLeader();
 
     synchronized (allNodes) {
-      if (allNodes.contains(removedNode)) {
+      if (allNodes.contains(removedNode) && allNodes.size() > config.getReplicationNum()) {
         // update the group if the deleted node was in it
         allNodes.remove(removedNode);
         peerMap.remove(removedNode);
@@ -783,15 +783,15 @@ public class DataGroupMember extends RaftMember {
             setLastHeartbeatReceivedTime(Long.MIN_VALUE);
           }
         }
-      }
-      List<Integer> slotsToPull = ((SlotNodeRemovalResult) removalResult).getNewSlotOwners()
-          .get(new RaftNode(getHeader(), getRaftGroupId()));
-      if (slotsToPull != null) {
-        // pull the slots that should be taken over
-        PullSnapshotTaskDescriptor taskDescriptor = new PullSnapshotTaskDescriptor(
-            removalResult.getRemovedGroup(getRaftGroupId()),
-            slotsToPull, true);
-        pullFileSnapshot(taskDescriptor, null);
+        List<Integer> slotsToPull = ((SlotNodeRemovalResult) removalResult).getNewSlotOwners()
+            .get(new RaftNode(getHeader(), getRaftGroupId()));
+        if (slotsToPull != null) {
+          // pull the slots that should be taken over
+          PullSnapshotTaskDescriptor taskDescriptor = new PullSnapshotTaskDescriptor(
+              removalResult.getRemovedGroup(getRaftGroupId()),
+              slotsToPull, true);
+          pullFileSnapshot(taskDescriptor, null);
+        }
       }
     }
   }
@@ -802,6 +802,9 @@ public class DataGroupMember extends RaftMember {
     }
     for (Map.Entry<Node, Peer> entry: peerMap.entrySet()) {
       Node node = entry.getKey();
+      if (node.equals(thisNode)) {
+        continue;
+      }
       Peer peer = entry.getValue();
       while (peer.getMatchIndex() < logManager.getCommitLogIndex()) {
         try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index ec4f0de..5dddac7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.exception.AddSelfException;
+import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
 import org.apache.iotdb.cluster.exception.EmptyIntervalException;
@@ -74,9 +75,11 @@ import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
 import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
@@ -137,6 +140,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -437,7 +441,7 @@ public class MetaGroupMember extends RaftMember {
 
     Node newNode = addNodeLog.getNewNode();
     synchronized (allNodes) {
-      if (partitionTable.deserialize(addNodeLog.getPartitionTable())) {
+      if (partitionTable.checkChangeMembershipValidity(addNodeLog.getPartitionTable().getLong())) {
         logger.debug("Adding a new node {} into {}", newNode, allNodes);
 
         if (!allNodes.contains(newNode)) {
@@ -843,7 +847,7 @@ public class MetaGroupMember extends RaftMember {
    * @param node cannot be the local node
    */
   public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus)
-      throws AddSelfException, LogExecutionException {
+      throws AddSelfException, LogExecutionException, ChangeMembershipException, InterruptedException, UnsupportedPlanException {
     AddNodeResponse response = new AddNodeResponse();
     if (partitionTable == null) {
       logger.info("Cannot add node now because the partition table is not set");
@@ -876,7 +880,8 @@ public class MetaGroupMember extends RaftMember {
    * @return true if the process is over, false if the request should be forwarded
    */
   private boolean processAddNodeLocally(Node newNode, StartUpStatus startUpStatus,
-      AddNodeResponse response) throws LogExecutionException {
+      AddNodeResponse response)
+      throws LogExecutionException, ChangeMembershipException, InterruptedException, UnsupportedPlanException {
     if (character != NodeCharacter.LEADER) {
       return false;
     }
@@ -938,6 +943,7 @@ public class MetaGroupMember extends RaftMember {
         AppendLogResult result = sendLogToFollowers(addNodeLog);
         switch (result) {
           case OK:
+            sendLogToAllDataGroups(addNodeLog);
             commitLog(addNodeLog);
             logger.info("Join request of {} is accepted", newNode);
 
@@ -959,6 +965,42 @@ public class MetaGroupMember extends RaftMember {
     }
   }
 
+  /**
+   * Process empty log for leader to commit all previous log.
+   */
+  public void processEmptyContentLog() {
+    if (character != NodeCharacter.LEADER) {
+      return;
+    }
+
+    Log log = new EmptyContentLog();
+    log.setCurrLogTerm(getTerm().get());
+    log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+
+    synchronized (logManager) {
+
+      logManager.append(log);
+
+      while (true) {
+        AppendLogResult result = sendLogToFollowers(log);
+        switch (result) {
+          case OK:
+            try {
+              commitLog(log);
+            } catch (LogExecutionException e) {
+              logger.error("Fail to execute empty content log", e);
+            }
+            return;
+          case TIME_OUT:
+            continue;
+          case LEADERSHIP_STALE:
+          default:
+            return;
+        }
+      }
+    }
+  }
+
   private boolean checkNodeConfig(StartUpStatus remoteStartUpStatus, AddNodeResponse response) {
     long remotePartitionInterval = remoteStartUpStatus.getPartitionInterval();
     int remoteHashSalt = remoteStartUpStatus.getHashSalt();
@@ -1880,7 +1922,7 @@ public class MetaGroupMember extends RaftMember {
    * @param node the node to be removed.
    */
   public long removeNode(Node node)
-      throws PartitionTableUnavailableException, LogExecutionException {
+      throws PartitionTableUnavailableException, LogExecutionException, ChangeMembershipException, InterruptedException, UnsupportedPlanException {
     if (partitionTable == null) {
       logger.info("Cannot add node now because the partition table is not set");
       throw new PartitionTableUnavailableException(thisNode);
@@ -1901,7 +1943,7 @@ public class MetaGroupMember extends RaftMember {
    * @return Long.MIN_VALUE if further forwarding is required, or the execution result
    */
   private long processRemoveNodeLocally(Node node)
-      throws LogExecutionException {
+      throws LogExecutionException, ChangeMembershipException, InterruptedException, UnsupportedPlanException {
     if (character != NodeCharacter.LEADER) {
       return Response.RESPONSE_NULL;
     }
@@ -1954,6 +1996,7 @@ public class MetaGroupMember extends RaftMember {
         AppendLogResult result = sendLogToFollowers(removeNodeLog);
         switch (result) {
           case OK:
+            sendLogToAllDataGroups(removeNodeLog);
             commitLog(removeNodeLog);
             logger.info("Removal request of {} is accepted", target);
             return Response.RESPONSE_AGREE;
@@ -1969,6 +2012,25 @@ public class MetaGroupMember extends RaftMember {
     }
   }
 
+  private void sendLogToAllDataGroups(Log log)
+      throws ChangeMembershipException, UnsupportedPlanException, InterruptedException {
+    int retryTime = 0;
+    TSStatus status = null;
+    while(retryTime++ <= 5) {
+      logger.debug("Send log {} to all data groups, retry time: {}", log, retryTime);
+      LogPlan plan = new LogPlan(log.serialize());
+      status = processPartitionedPlan(plan);
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        break;
+      }
+      Thread.sleep(100);
+    }
+    if (status == null || status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new ChangeMembershipException(String.format("apply %s failed with status {%s}", log, status));
+    }
+    logger.debug("Send log {} to all data groups: end", log);
+  }
+
   /**
    * Remove a node from the node list, partition table and update DataGroupMembers. If the removed
    * node is the local node, also stop heartbeat and catch-up service of metadata, but the heartbeat
@@ -1980,7 +2042,7 @@ public class MetaGroupMember extends RaftMember {
 
     Node oldNode = removeNodeLog.getRemovedNode();
     synchronized (allNodes) {
-      if (partitionTable.deserialize(removeNodeLog.getPartitionTable())) {
+      if (partitionTable.checkChangeMembershipValidity(removeNodeLog.getPartitionTable().getLong())) {
         logger.debug("Removing a node {} from {}", oldNode, allNodes);
 
         if (allNodes.contains(oldNode)) {
@@ -2019,6 +2081,7 @@ public class MetaGroupMember extends RaftMember {
   }
 
   private void exileNode(RemoveNodeLog removeNodeLog) {
+    logger.debug("Exile node {}: start.", removeNodeLog.getRemovedNode());
     Node node = removeNodeLog.getRemovedNode();
     if (config.isUseAsyncServer()) {
       AsyncMetaClient asyncMetaClient = (AsyncMetaClient) getAsyncClient(node);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 2cf5f77..4fe3274 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -879,7 +879,7 @@ public abstract class RaftMember {
     }
 
     logger.debug("{}: Processing plan {}", name, plan);
-    if (readOnly) {
+    if (readOnly && !(plan instanceof LogPlan)) {
       return StatusUtils.NODE_READ_ONLY;
     }
     long startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
@@ -1085,6 +1085,10 @@ public abstract class RaftMember {
     return serialToParallelPool;
   }
 
+  public ExecutorService getAppendLogThreadPool() {
+    return appendLogThreadPool;
+  }
+
   public Object getSyncLock() {
     return syncLock;
   }
@@ -1457,6 +1461,7 @@ public abstract class RaftMember {
         logger.info("{} has update it's term to {}", getName(), newTerm);
         term.set(newTerm);
         setVoteFor(null);
+        setCharacter(NodeCharacter.ELECTOR);
         setLeader(ClusterConstant.EMPTY_NODE);
         updateHardState(newTerm, getVoteFor());
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index a555e17..a7a96a5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.cluster.server.service;
 
 import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.exception.AddSelfException;
+import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -71,7 +73,7 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
     AddNodeResponse addNodeResponse = null;
     try {
       addNodeResponse = metaGroupMember.addNode(node, startUpStatus);
-    } catch (AddSelfException | LogExecutionException e) {
+    } catch (AddSelfException | LogExecutionException | ChangeMembershipException | InterruptedException | UnsupportedPlanException e) {
       resultHandler.onError(e);
     }
     if (addNodeResponse != null) {
@@ -148,8 +150,10 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
     long result = Response.RESPONSE_NULL;
     try {
       result = metaGroupMember.removeNode(node);
-    } catch (PartitionTableUnavailableException | LogExecutionException e) {
+    } catch (PartitionTableUnavailableException | LogExecutionException | ChangeMembershipException | InterruptedException | UnsupportedPlanException e) {
+      logger.error("Can not remove node {}", node, e);
       resultHandler.onError(e);
+      return;
     }
 
     if (result != Response.RESPONSE_NULL) {
@@ -198,6 +202,8 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
    */
   @Override
   public void exile(ByteBuffer removeNodeLogBuffer, AsyncMethodCallback<Void> resultHandler) {
+    logger.info("Start to exile.");
+    removeNodeLogBuffer.get();
     RemoveNodeLog removeNodeLog = new RemoveNodeLog();
     removeNodeLog.deserialize(removeNodeLogBuffer);
     metaGroupMember.applyRemoveNode(removeNodeLog);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index d88f4f5..c3804e8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -22,9 +22,11 @@ package org.apache.iotdb.cluster.server.service;
 import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
 import org.apache.iotdb.cluster.exception.AddSelfException;
+import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -69,7 +71,7 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
     AddNodeResponse addNodeResponse;
     try {
       addNodeResponse = metaGroupMember.addNode(node, startUpStatus);
-    } catch (AddSelfException | LogExecutionException e) {
+    } catch (AddSelfException | LogExecutionException | ChangeMembershipException | InterruptedException | UnsupportedPlanException e) {
       throw new TException(e);
     }
     if (addNodeResponse != null) {
@@ -142,7 +144,7 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
     long result;
     try {
       result = metaGroupMember.removeNode(node);
-    } catch (PartitionTableUnavailableException | LogExecutionException e) {
+    } catch (PartitionTableUnavailableException | LogExecutionException | ChangeMembershipException | InterruptedException | UnsupportedPlanException e) {
       throw new TException(e);
     }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
index 032e535..06c7fa9 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
@@ -506,10 +506,6 @@ public class SlotPartitionTableTest {
     for (int i = 0; i < 5; i++) {
       assertTrue(removedGroup.contains(getNode(i)));
     }
-    PartitionGroup newGroup = nodeRemovalResult.getNewGroup(0);
-    for (int i : new int[]{18, 19, 1, 2, 3}) {
-      assertTrue(newGroup.contains(getNode(i)));
-    }
     // the slots owned by the removed one should be redistributed to other nodes
     Map<RaftNode, List<Integer>> newSlotOwners = ((SlotNodeRemovalResult) nodeRemovalResult)
         .getNewSlotOwners();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index 5802a9d..2a6542f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -108,6 +108,11 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
+    public boolean checkChangeMembershipValidity(long targetLogIndex) {
+      return true;
+    }
+
+    @Override
     public List<Node> getAllNodes() {
       return null;
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 4c1c677..1b9b437 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.cluster.common.TestSnapshot;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
 import org.apache.iotdb.cluster.exception.EmptyIntervalException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
@@ -1180,7 +1181,7 @@ public class MetaGroupMemberTest extends MemberTest {
     try {
       testMetaMember.removeNode(TestUtils.getNode(0));
       fail("Expect PartitionTableUnavailableException");
-    } catch (PartitionTableUnavailableException e) {
+    } catch (PartitionTableUnavailableException | ChangeMembershipException | InterruptedException | UnsupportedPlanException e) {
       // ignore
     }
   }