You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/03/04 15:09:39 UTC

[iotdb] branch cluster_scalability updated: This commit fix following issues: 1. When applying add/remove log in data group member, it ignores to update commit index which will lead to error in some cases. 2. Fix a bug of only leader can flush file when do snapshots. 3. Meta group apply add/remove log, it may has no data members which apply add/remove log, which lead to partition table is old version. 4. Send hint into a wrong data group duo to iterator.remove which will notify wrong data group and lead to slot status is wrong. 5. I [...]

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_scalability by this push:
     new 5da92c5  This commit fix following issues: 1. When applying add/remove log in data group member, it ignores to update commit index which will lead to error in some cases. 2. Fix a bug of only leader can flush file when do snapshots. 3. Meta group apply add/remove log, it may has no data members which apply add/remove log, which lead to partition table is old version. 4. Send hint into a wrong data group duo to iterator.remove which will notify wrong data group and lead to slot st [...]
5da92c5 is described below

commit 5da92c5a58f8ca79a48e5017b9266e043bb51db0
Author: lta <li...@163.com>
AuthorDate: Thu Mar 4 23:00:51 2021 +0800

    This commit fix following issues:
    1. When applying add/remove log in data group member, it ignores to update commit index which will lead to error in some cases.
    2. Fix a bug of only leader can flush file when do snapshots.
    3. Meta group apply add/remove log, it may has no data members which apply add/remove log, which lead to partition table is old version.
    4. Send hint into a wrong data group duo to iterator.remove which will notify wrong data group and lead to slot status is wrong.
    5. Issue of deserialize removeNode log when sync exile.
    6. When handle heartbeat request, it will lock logManger which may lead to lock long time due to apply log which may also lock logManager.
    7. It occurs the wait slot status for more than 30s, which may caused by unsetting volatile for SlotStatus variable.
---
 .../RedirectMetaLeaderException.java}              |  36 +++--
 .../manage/FilePartitionedSnapshotLogManager.java  |  21 +--
 .../log/manage/PartitionedSnapshotLogManager.java  |   4 +-
 .../cluster/log/snapshot/PullSnapshotTask.java     |   2 +-
 .../iotdb/cluster/partition/PartitionGroup.java    |   5 -
 .../iotdb/cluster/partition/PartitionTable.java    |   2 -
 .../iotdb/cluster/partition/slot/SlotManager.java  |   7 +-
 .../cluster/partition/slot/SlotPartitionTable.java |   7 +-
 .../iotdb/cluster/server/DataClusterServer.java    |  17 ++-
 .../cluster/server/PullSnapshotHintService.java    |   9 +-
 .../server/handlers/caller/HeartbeatHandler.java   |  12 +-
 .../cluster/server/member/DataGroupMember.java     |  80 ++++++++---
 .../cluster/server/member/MetaGroupMember.java     | 160 ++++++++++++---------
 .../iotdb/cluster/server/member/RaftMember.java    |  30 ++--
 .../cluster/server/service/MetaAsyncService.java   |   2 +-
 .../cluster/server/service/MetaSyncService.java    |   2 +
 .../cluster/utils/nodetool/ClusterMonitor.java     |  78 +++++++---
 .../utils/nodetool/ClusterMonitorMBean.java        |  29 ++--
 .../iotdb/cluster/utils/nodetool/NodeTool.java     |  10 +-
 .../nodetool/function/{Ring.java => Header.java}   |  30 ++--
 .../cluster/utils/nodetool/function/Migration.java |  60 ++++++++
 .../utils/nodetool/function/NodeToolCmd.java       |  15 +-
 .../cluster/utils/nodetool/function/Ring.java      |  13 +-
 .../nodetool/function/{Host.java => Slot.java}     |  16 +--
 .../cluster/log/applier/MetaLogApplierTest.java    |   1 -
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |   5 -
 26 files changed, 423 insertions(+), 230 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Ring.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/RedirectMetaLeaderException.java
similarity index 52%
copy from cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Ring.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/exception/RedirectMetaLeaderException.java
index 9b70e3c..6097d96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Ring.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/RedirectMetaLeaderException.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,27 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.utils.nodetool.function;
-
-import static org.apache.iotdb.cluster.utils.nodetool.Printer.msgPrintln;
+package org.apache.iotdb.cluster.exception;
 
-import io.airlift.airline.Command;
-import java.util.List;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitorMBean;
 
-@Command(name = "ring", description = "Print information about the hash ring of nodes")
-public class Ring extends NodeToolCmd {
+/**
+ * Redirect to meta leader.
+ */
+public class RedirectMetaLeaderException extends Exception {
+
+  private Node metaLeader;
+
+  public RedirectMetaLeaderException(Node leader) {
+    super(String.format("Redirect to meta leader %s", leader));
+    this.metaLeader = leader;
+  }
 
-  @Override
-  public void execute(ClusterMonitorMBean proxy) {
-    List<Node> allNodes = proxy.getRing();
-    if (allNodes == null) {
-      msgPrintln(BUILDING_CLUSTER_INFO);
-    } else {
-      msgPrintln(String.format("%-20s  %30s", "Node Identifier", "Node"));
-      allNodes.forEach(
-          node -> msgPrintln(String.format("%-20d->%30s", node.nodeIdentifier, nodeToString(node))));
-    }
+  public Node getMetaLeader() {
+    return metaLeader;
   }
-}
\ No newline at end of file
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 4c3da5e..230ce82 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -65,7 +65,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
   /**
    * send FlushPlan to all nodes in one dataGroup
    */
-  private void syncFlushAllProcessor(List<Integer> requiredSlots) {
+  private void syncFlushAllProcessor(List<Integer> requiredSlots, boolean needLeader) {
     logger.info("{}: Start flush all storage group processor in one data group", getName());
     Map<String, List<Pair<Long, Boolean>>> storageGroupPartitions = StorageEngine.getInstance()
         .getWorkingStorageGroupPartitions();
@@ -73,18 +73,20 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
       logger.info("{}: no need to flush processor", getName());
       return;
     }
-    dataGroupMember.flushFileWhenDoSnapshot(storageGroupPartitions, requiredSlots);
+    dataGroupMember.flushFileWhenDoSnapshot(storageGroupPartitions, requiredSlots, needLeader);
   }
 
   @Override
   @SuppressWarnings("java:S1135") // ignore todos
   public void takeSnapshot() throws IOException {
     takeSnapshotForSpecificSlots(((SlotPartitionTable) partitionTable)
-        .getNodeSlots(new RaftNode(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId())));
+            .getNodeSlots(new RaftNode(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId())),
+        true);
   }
 
   @Override
-  public void takeSnapshotForSpecificSlots(List<Integer> requiredSlots) throws IOException {
+  public void takeSnapshotForSpecificSlots(List<Integer> requiredSlots, boolean needLeader)
+      throws IOException {
     try {
       logger.info("{}: Taking snapshots, flushing IoTDB", getName());
       // record current commit index and prevent further logs from being applied, so the
@@ -93,7 +95,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
       // wait until all logs before BlockAppliedCommitIndex are applied
       super.takeSnapshot();
       // flush data to disk so that the disk files will represent a complete state
-      syncFlushAllProcessor(requiredSlots);
+      syncFlushAllProcessor(requiredSlots, needLeader);
       logger.info("{}: Taking snapshots, IoTDB is flushed", getName());
       // TODO-cluster https://issues.apache.org/jira/browse/IOTDB-820
       synchronized (this) {
@@ -215,13 +217,4 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
     }
     return true;
   }
-
-  @Override
-  public long append(Log entry) {
-    long lastLogIndex = super.append(entry);
-    if (lastLogIndex != -1 && (entry instanceof AddNodeLog || entry instanceof RemoveNodeLog)) {
-      logApplier.apply(entry);
-    }
-    return lastLogIndex;
-  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index 0cf6c77..804e8f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -70,7 +70,9 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
     this.dataGroupMember = dataGroupMember;
   }
 
-  public void takeSnapshotForSpecificSlots(List<Integer> requiredSlots) throws IOException {}
+  public void takeSnapshotForSpecificSlots(List<Integer> requiredSlots, boolean needLeader)
+      throws IOException {
+  }
 
   @Override
   public Snapshot getSnapshot(long minIndex) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index fc9b968..16e5bd8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -112,7 +112,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
       }
 
       if (logger.isInfoEnabled()) {
-        logger.info("Received a snapshot {} from {}", result,
+        logger.info("{}: Received a snapshot {} from {}", newMember.getName(), result,
             descriptor.getPreviousHolders().get(nodeIndex));
       }
       try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index f3106ef..810837e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -99,9 +99,4 @@ public class PartitionGroup extends ArrayList<Node> {
   public int getId() {
     return id;
   }
-
-//  @Override
-//  public String toString() {
-//    return String.format("PartitionGroup{id=%d, header=%s}", id, get(0));
-//  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 67fb4f0..36727d6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -98,8 +98,6 @@ public interface PartitionTable {
    */
   boolean deserialize(ByteBuffer buffer);
 
-  boolean checkChangeMembershipValidity(long targetLogIndex);
-
   List<Node> getAllNodes();
 
   List<PartitionGroup> getGlobalGroups();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
index 1104dc8..9b26e7a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
@@ -76,6 +76,7 @@ public class SlotManager {
    */
   public void waitSlot(int slotId) {
     SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
+    long startTime = System.currentTimeMillis();
     while (true) {
       synchronized (slotDescriptor) {
         if (slotDescriptor.slotStatus == SlotStatus.PULLING
@@ -87,6 +88,10 @@ public class SlotManager {
             logger.error("Unexpected interruption when waiting for slot {}", slotId, e);
           }
         } else {
+          long cost = System.currentTimeMillis() - startTime;
+          if (cost > 1000) {
+            logger.info("Wait slot {} cost {}ms", slotId, cost);
+          }
           return;
         }
       }
@@ -324,7 +329,7 @@ public class SlotManager {
   }
 
   private static class SlotDescriptor {
-    private SlotStatus slotStatus;
+    private volatile SlotStatus slotStatus;
     private Node source;
     // in LOSING status, how many members in the new owner have pulled data
     private volatile int snapshotReceivedCount;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 3dea23e..b305b32 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -381,7 +381,7 @@ public class SlotPartitionTable implements PartitionTable {
 
     // judge whether the partition table of byte buffer is out of date
     if (lastMetaLogIndex != -1 && lastMetaLogIndex >= newLastLogIndex) {
-      return lastMetaLogIndex <= newLastLogIndex;
+      return lastMetaLogIndex == newLastLogIndex;
     }
     lastMetaLogIndex = newLastLogIndex;
     logger.info("Initializing the partition table from buffer");
@@ -435,11 +435,6 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public boolean checkChangeMembershipValidity(long targetLogIndex) {
-    return lastMetaLogIndex == targetLogIndex;
-  }
-
-  @Override
   public List<Node> getAllNodes() {
     return nodeRing;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index e95df88..be56c8d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -528,7 +528,6 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       return;
     }
 
-    logger.debug("Pre add a new node {} to cluster", log.getNewNode());
     targetDataGroupMember.preAddNode(log.getNewNode());
   }
 
@@ -559,6 +558,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
         }
       }
 
+      if (logger.isDebugEnabled()) {
+        logger.debug("Data cluster server: start to handle new groups when adding new node {}", node);
+      }
       for (PartitionGroup newGroup: result.getNewGroupList()) {
         if (newGroup.contains(thisNode)) {
           logger.info("Adding this node into a new group {}", newGroup);
@@ -642,7 +644,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       return;
     }
 
-    logger.debug("Removing a node {} from {}", log.getRemovedNode(), targetDataGroupMember.getAllNodes());
+    logger.debug("Pre removing a node {} from {}", log.getRemovedNode(), targetDataGroupMember.getAllNodes());
     targetDataGroupMember.preRemoveNode(log.getRemovedNode());
   }
 
@@ -670,14 +672,19 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
         }
       }
 
+      if (logger.isDebugEnabled()) {
+        logger.debug("Data cluster server: start to handle new groups when removing node {}", node);
+      }
       // if the removed group contains the local node, the local node should join a new group to
       // preserve the replication number
       for (PartitionGroup group : partitionTable.getLocalGroups()) {
-        if (!headerGroupMap.containsKey(new RaftNode(group.getHeader(), group.getId()))) {
+        RaftNode header = new RaftNode(group.getHeader(), group.getId());
+        if (!headerGroupMap.containsKey(header)) {
           logger.info("{} should join a new group {}", thisNode, group);
           DataGroupMember dataGroupMember = dataMemberFactory.create(group, thisNode);
           addDataGroupMember(dataGroupMember);
         }
+        headerGroupMap.get(header).pullSlots(removalResult);
       }
     }
   }
@@ -727,6 +734,10 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     }
   }
 
+  public Map<RaftNode, DataGroupMember> getHeaderGroupMap() {
+    return headerGroupMap;
+  }
+
   @Override
   public void matchTerm(long index, long term, Node header, int raftId,
       AsyncMethodCallback<Boolean> resultHandler) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
index d3d611f..63e2571 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
@@ -72,7 +72,8 @@ public class PullSnapshotHintService {
 
   public void registerHint(PullSnapshotTaskDescriptor descriptor) {
     PullSnapshotHint hint = new PullSnapshotHint();
-    hint.receivers = new PartitionGroup(descriptor.getPreviousHolders());
+    hint.partitionGroup = descriptor.getPreviousHolders();
+    hint.receivers = new PartitionGroup(hint.partitionGroup);
     hint.slots = descriptor.getSlots();
     hints.add(hint);
   }
@@ -86,7 +87,7 @@ public class PullSnapshotHintService {
           if (logger.isDebugEnabled()) {
             logger.debug(
                 "{}: start to send hint to target group {}, receiver {}, slot is {} and other {}",
-                member.getName(), hint.receivers, receiver, hint.slots.get(0),
+                member.getName(), hint.partitionGroup, receiver, hint.slots.get(0),
                 hint.slots.size() - 1);
           }
           boolean result = sendHint(receiver, hint);
@@ -139,10 +140,12 @@ public class PullSnapshotHintService {
      */
     private PartitionGroup receivers;
 
+    private PartitionGroup partitionGroup;
+
     private List<Integer> slots;
 
     public Node getHeader() {
-      return receivers.getHeader();
+      return partitionGroup.getHeader();
     }
 
     public int getRaftId() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 7a7bab0..508052d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -78,9 +78,11 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
     long lastLogTerm = resp.getLastLogTerm();
     long localLastLogIdx = localMember.getLogManager().getLastLogIndex();
     long localLastLogTerm = localMember.getLogManager().getLastLogTerm();
-    logger.trace("{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
-        memberName, follower, lastLogIdx
-        , localLastLogIdx, lastLogTerm, localLastLogTerm);
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
+          memberName, follower, lastLogIdx
+          , localLastLogIdx, lastLogTerm, localLastLogTerm);
+    }
 
     Peer peer = localMember.getPeerMap()
         .computeIfAbsent(follower, k -> new Peer(localMember.getLogManager().getLastLogIndex()));
@@ -94,12 +96,12 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
         peer.setMatchIndex(-1);
       }
 
-      // only start a catch up when the follower's lastLogIndex remains stall and unchanged for 5
+      // only start a catch up when the follower's lastLogIndex remains stall and unchanged for 3
       // heartbeats
       if (lastLogIdx == peer.getLastHeartBeatIndex()) {
         // the follower's lastLogIndex is unchanged, increase inconsistent counter
         int inconsistentNum = peer.incInconsistentHeartbeatNum();
-        if (inconsistentNum >= 5) {
+        if (inconsistentNum >= 3) {
           logger.info("{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
               memberName, follower,
               lastLogIdx, lastLogTerm,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index fbf2dd4..5691d14 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -49,11 +49,14 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.applier.AsyncDataLogApplier;
 import org.apache.iotdb.cluster.log.applier.DataLogApplier;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.FilePartitionedSnapshotLogManager;
 import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
@@ -263,6 +266,9 @@ public class DataGroupMember extends RaftMember {
   }
 
   public boolean preAddNode(Node node) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: start to pre adding node {}", name, node);
+    }
     if (allNodes.contains(node)) {
       return false;
     }
@@ -301,6 +307,9 @@ public class DataGroupMember extends RaftMember {
    * otherwise
    */
   public boolean addNode(Node node, NodeAdditionResult result) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: start to add node {}", name, node);
+    }
     syncLeader();
 
     // mark slots that do not belong to this group any more
@@ -316,6 +325,7 @@ public class DataGroupMember extends RaftMember {
         // remove the last node because the group size is fixed to replication number
         Node removedNode = allNodes.remove(allNodes.size() - 1);
         peerMap.remove(removedNode);
+
         if (removedNode.equals(leader.get()) && !removedNode.equals(thisNode)) {
           // if the leader is removed, also start an election immediately
           synchronized (term) {
@@ -462,7 +472,7 @@ public class DataGroupMember extends RaftMember {
     synchronized (logManager) {
       PullSnapshotResp resp = new PullSnapshotResp();
       Map<Integer, ByteBuffer> resultMap = new HashMap<>();
-      ((PartitionedSnapshotLogManager)logManager).takeSnapshotForSpecificSlots(requiredSlots);
+      ((PartitionedSnapshotLogManager)logManager).takeSnapshotForSpecificSlots(requiredSlots, false);
 
       PartitionedSnapshot<Snapshot> allSnapshot = (PartitionedSnapshot) logManager.getSnapshot();
       for (int requiredSlot : requiredSlots) {
@@ -485,14 +495,15 @@ public class DataGroupMember extends RaftMember {
    * @param newNode
    */
   public void pullNodeAdditionSnapshots(List<Integer> slots, Node newNode) {
+    // group the slots by their owners
+    Map<PartitionGroup, List<Integer>> holderSlotsMap = new HashMap<>();
     synchronized (logManager) {
       logger.info("{} pulling {} slots from remote", name, slots.size());
       PartitionedSnapshot<Snapshot> snapshot = (PartitionedSnapshot) logManager.getSnapshot();
-      Map<Integer, PartitionGroup> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
+      Map<Integer, PartitionGroup> prevHolders = ((SlotPartitionTable) metaGroupMember
+          .getPartitionTable())
           .getPreviousNodeMap(new RaftNode(newNode, getRaftGroupId()));
 
-      // group the slots by their owners
-      Map<PartitionGroup, List<Integer>> holderSlotsMap = new HashMap<>();
       for (int slot : slots) {
         // skip the slot if the corresponding data is already replicated locally
         if (snapshot.getSnapshot(slot) == null) {
@@ -502,14 +513,14 @@ public class DataGroupMember extends RaftMember {
           }
         }
       }
+    }
 
-      // pull snapshots from each owner's data group
-      for (Entry<PartitionGroup, List<Integer>> entry : holderSlotsMap.entrySet()) {
-        List<Integer> nodeSlots = entry.getValue();
-        PullSnapshotTaskDescriptor taskDescriptor =
-            new PullSnapshotTaskDescriptor(entry.getKey(), nodeSlots, false);
-        pullFileSnapshot(taskDescriptor, null);
-      }
+    // pull snapshots from each owner's data group
+    for (Entry<PartitionGroup, List<Integer>> entry : holderSlotsMap.entrySet()) {
+      List<Integer> nodeSlots = entry.getValue();
+      PullSnapshotTaskDescriptor taskDescriptor =
+          new PullSnapshotTaskDescriptor(entry.getKey(), nodeSlots, false);
+      pullFileSnapshot(taskDescriptor, null);
     }
   }
 
@@ -639,8 +650,9 @@ public class DataGroupMember extends RaftMember {
   }
 
   public boolean flushFileWhenDoSnapshot(
-      Map<String, List<Pair<Long, Boolean>>> storageGroupPartitions, List<Integer> requiredSlots) {
-    if (character != NodeCharacter.LEADER) {
+      Map<String, List<Pair<Long, Boolean>>> storageGroupPartitions, List<Integer> requiredSlots,
+      boolean needLeader) {
+    if (needLeader && character != NodeCharacter.LEADER) {
       return false;
     }
 
@@ -764,6 +776,9 @@ public class DataGroupMember extends RaftMember {
   }
 
   public void preRemoveNode(Node removedNode) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: start to pre remove node {}", name, removedNode);
+    }
     synchronized (allNodes) {
       if (allNodes.contains(removedNode) && allNodes.size() == config.getReplicationNum()) {
         // update the group if the deleted node was in it
@@ -785,6 +800,9 @@ public class DataGroupMember extends RaftMember {
    */
   @SuppressWarnings("java:S2445") // the reference of allNodes is unchanged
   public void removeNode(Node removedNode, NodeRemovalResult removalResult) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: start to remove node {}", name, removedNode);
+    }
     syncLeader();
 
     synchronized (allNodes) {
@@ -800,17 +818,37 @@ public class DataGroupMember extends RaftMember {
           }
         }
       }
+    }
+
+    pullSlots(removalResult);
+  }
+
+  public void pullSlots(NodeRemovalResult removalResult) {
+    List<Integer> slotsToPull = ((SlotNodeRemovalResult) removalResult).getNewSlotOwners()
+        .get(new RaftNode(getHeader(), getRaftGroupId()));
+    if (slotsToPull != null) {
+      // pull the slots that should be taken over
+      PullSnapshotTaskDescriptor taskDescriptor = new PullSnapshotTaskDescriptor(
+          removalResult.getRemovedGroup(getRaftGroupId()),
+          slotsToPull, true);
+      pullFileSnapshot(taskDescriptor, null);
+    }
+  }
 
-      List<Integer> slotsToPull = ((SlotNodeRemovalResult) removalResult).getNewSlotOwners()
-          .get(new RaftNode(getHeader(), getRaftGroupId()));
-      if (slotsToPull != null) {
-        // pull the slots that should be taken over
-        PullSnapshotTaskDescriptor taskDescriptor = new PullSnapshotTaskDescriptor(
-            removalResult.getRemovedGroup(getRaftGroupId()),
-            slotsToPull, true);
-        pullFileSnapshot(taskDescriptor, null);
+  /**
+   * For data group, it's necessary to apply remove/add log immediately after append.
+   */
+  @Override
+  protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+    long resp = super.appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
+    if (resp == Response.RESPONSE_AGREE && (log instanceof AddNodeLog || log instanceof RemoveNodeLog)) {
+      try {
+        commitLog(log);
+      } catch (LogExecutionException e) {
+        logger.error("{}: execute add/remove log error.", name, e);
       }
     }
+    return resp;
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 391381c..c004b8a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -439,10 +439,13 @@ public class MetaGroupMember extends RaftMember {
    */
   public void applyAddNode(AddNodeLog addNodeLog) {
 
+    long startTime = System.currentTimeMillis();
     Node newNode = addNodeLog.getNewNode();
     synchronized (allNodes) {
-      if (partitionTable.checkChangeMembershipValidity(addNodeLog.getPartitionTable().getLong())) {
-        logger.debug("Adding a new node {} into {}", newNode, allNodes);
+      if (partitionTable.deserialize(addNodeLog.getPartitionTable())) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}: adding a new node {} into {}", name, newNode, allNodes);
+        }
 
         if (!allNodes.contains(newNode)) {
           registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
@@ -455,8 +458,13 @@ public class MetaGroupMember extends RaftMember {
         // update local data members
         NodeAdditionResult result = partitionTable.getNodeAdditionResult(newNode);
         getDataClusterServer().addNode(newNode, result);
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}: success to add a new node {} into {}", name, newNode, allNodes);
+        }
       }
     }
+    logger.info("{}: execute adding node {} cost {} ms", name, newNode,
+        (System.currentTimeMillis()) - startTime);
   }
 
   /**
@@ -920,13 +928,13 @@ public class MetaGroupMember extends RaftMember {
       return true;
     }
 
+    AddNodeLog addNodeLog = new AddNodeLog();
     // node adding is serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
       partitionTable.addNode(newNode);
       ((SlotPartitionTable) partitionTable).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
 
-      AddNodeLog addNodeLog = new AddNodeLog();
       addNodeLog.setPartitionTable(partitionTable.serialize());
       addNodeLog.setCurrLogTerm(getTerm().get());
       addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
@@ -935,32 +943,32 @@ public class MetaGroupMember extends RaftMember {
       addNodeLog.setNewNode(newNode);
 
       logManager.append(addNodeLog);
+    }
 
-      int retryTime = 1;
-      while (true) {
-        logger
-            .info("Send the join request of {} to other nodes, retry time: {}", newNode, retryTime);
-        AppendLogResult result = sendLogToFollowers(addNodeLog);
-        switch (result) {
-          case OK:
-            sendLogToAllDataGroups(addNodeLog);
-            commitLog(addNodeLog);
-            logger.info("Join request of {} is accepted", newNode);
-
-            synchronized (partitionTable) {
-              response.setPartitionTableBytes(partitionTable.serialize());
-            }
-            response.setRespNum((int) Response.RESPONSE_AGREE);
-            logger.info("Sending join response of {}", newNode);
-            return true;
-          case TIME_OUT:
-            logger.info("Join request of {} timed out", newNode);
-            retryTime++;
-            continue;
-          case LEADERSHIP_STALE:
-          default:
-            return false;
-        }
+    int retryTime = 0;
+    while (true) {
+      logger
+          .info("{}: Send the join request of {} to other nodes, retry time: {}", name, newNode, retryTime);
+      AppendLogResult result = sendLogToFollowers(addNodeLog);
+      switch (result) {
+        case OK:
+          sendLogToAllDataGroups(addNodeLog);
+          commitLog(addNodeLog);
+          logger.info("{}: Join request of {} is accepted", name, newNode);
+
+          synchronized (partitionTable) {
+            response.setPartitionTableBytes(partitionTable.serialize());
+          }
+          response.setRespNum((int) Response.RESPONSE_AGREE);
+          logger.info("{}: Sending join response of {}", name, newNode);
+          return true;
+        case TIME_OUT:
+          logger.info("{}: Join request of {} timed out", name, newNode);
+          retryTime++;
+          break;
+        case LEADERSHIP_STALE:
+        default:
+          return false;
       }
     }
   }
@@ -978,25 +986,28 @@ public class MetaGroupMember extends RaftMember {
     log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
     synchronized (logManager) {
-
       logManager.append(log);
+    }
 
-      while (true) {
-        AppendLogResult result = sendLogToFollowers(log);
-        switch (result) {
-          case OK:
-            try {
-              commitLog(log);
-            } catch (LogExecutionException e) {
-              logger.error("Fail to execute empty content log", e);
-            }
-            return;
-          case TIME_OUT:
-            continue;
-          case LEADERSHIP_STALE:
-          default:
-            return;
-        }
+    int retryTime = 0;
+    while (true) {
+      logger.info("{} Send empty content log to other nodes, retry time: {}", name, retryTime);
+      AppendLogResult result = sendLogToFollowers(log);
+      switch (result) {
+        case OK:
+          try {
+            commitLog(log);
+          } catch (LogExecutionException e) {
+            logger.error("{}: Fail to execute empty content log", name, e);
+          }
+          return;
+        case TIME_OUT:
+          logger.info("{}: add empty content log timed out", name);
+          retryTime++;
+          break;
+        case LEADERSHIP_STALE:
+        default:
+          return;
       }
     }
   }
@@ -1969,17 +1980,18 @@ public class MetaGroupMember extends RaftMember {
       return Response.RESPONSE_REJECT;
     }
 
-    if (partitionTable.getAllNodes().contains(target) && partitionTable.getAllNodes().size() != allNodes.size()) {
+    if (partitionTable.getAllNodes().contains(target)
+        && partitionTable.getAllNodes().size() != allNodes.size()) {
       return Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT;
     }
 
+    RemoveNodeLog removeNodeLog = new RemoveNodeLog();
     // node removal must be serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
       partitionTable.removeNode(target);
       ((SlotPartitionTable) partitionTable).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
 
-      RemoveNodeLog removeNodeLog = new RemoveNodeLog();
       removeNodeLog.setPartitionTable(partitionTable.serialize());
       removeNodeLog.setCurrLogTerm(getTerm().get());
       removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
@@ -1988,26 +2000,27 @@ public class MetaGroupMember extends RaftMember {
       removeNodeLog.setRemovedNode(target);
 
       logManager.append(removeNodeLog);
+    }
 
-      int retryTime = 1;
-      while (true) {
-        logger.info("Send the node removal request of {} to other nodes, retry time: {}", target,
-            retryTime);
-        AppendLogResult result = sendLogToFollowers(removeNodeLog);
-        switch (result) {
-          case OK:
-            sendLogToAllDataGroups(removeNodeLog);
-            commitLog(removeNodeLog);
-            logger.info("Removal request of {} is accepted", target);
-            return Response.RESPONSE_AGREE;
-          case TIME_OUT:
-            logger.info("Removal request of {} timed out", target);
-            break;
-          // retry
-          case LEADERSHIP_STALE:
-          default:
-            return Response.RESPONSE_NULL;
-        }
+    int retryTime = 0;
+    while (true) {
+      logger.info("{}: Send the node removal request of {} to other nodes, retry time: {}", name, target,
+          retryTime);
+      AppendLogResult result = sendLogToFollowers(removeNodeLog);
+      switch (result) {
+        case OK:
+          sendLogToAllDataGroups(removeNodeLog);
+          commitLog(removeNodeLog);
+          logger.info("{}: Removal request of {} is accepted", name, target);
+          return Response.RESPONSE_AGREE;
+        case TIME_OUT:
+          logger.info("{}: Removal request of {} timed out", name, target);
+          retryTime++;
+          break;
+        // retry
+        case LEADERSHIP_STALE:
+        default:
+          return Response.RESPONSE_NULL;
       }
     }
   }
@@ -2040,10 +2053,13 @@ public class MetaGroupMember extends RaftMember {
    */
   public void applyRemoveNode(RemoveNodeLog removeNodeLog) {
 
+    long startTime = System.currentTimeMillis();
     Node oldNode = removeNodeLog.getRemovedNode();
     synchronized (allNodes) {
-      if (partitionTable.checkChangeMembershipValidity(removeNodeLog.getPartitionTable().getLong())) {
-        logger.debug("Removing a node {} from {}", oldNode, allNodes);
+      if (partitionTable.deserialize(removeNodeLog.getPartitionTable())) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}: Removing a node {} from {}", name, oldNode, allNodes);
+        }
 
         if (allNodes.contains(oldNode)) {
           allNodes.remove(oldNode);
@@ -2071,12 +2087,20 @@ public class MetaGroupMember extends RaftMember {
           if (clientServer != null) {
             clientServer.stop();
           }
+          logger.info("{} has been removed from the cluster", name);
         } else if (thisNode.equals(leader.get())) {
           // as the old node is removed, it cannot know this by heartbeat or log, so it should be
           // directly kicked out of the cluster
           exileNode(removeNodeLog);
         }
+
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}: Success to remove a node {} from {}", name, oldNode, allNodes);
+        }
       }
+
+      logger.info("{}: execute removing node {} cost {} ms", name, oldNode,
+          (System.currentTimeMillis()) - startTime);
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index eb2e6cf..700945e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -61,7 +61,9 @@ import org.apache.iotdb.cluster.log.LogDispatcher;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -74,13 +76,13 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.monitor.Timer;
-import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.PlanSerializer;
@@ -388,8 +390,8 @@ public abstract class RaftMember {
         // a leader with a term lower than this node is invalid, send it the local term to inform
         // it to resign
         response.setTerm(thisTerm);
-        if (logger.isTraceEnabled()) {
-          logger.trace("{} received a heartbeat from a stale leader {}", name, request.getLeader());
+        if (logger.isDebugEnabled()) {
+          logger.debug("{} received a heartbeat from a stale leader {}", name, request.getLeader());
         }
       } else {
         // try updating local term
@@ -406,11 +408,13 @@ public abstract class RaftMember {
         response.setTerm(Response.RESPONSE_AGREE);
         // tell the leader who I am in case of catch-up
         response.setFollower(thisNode);
+        // tell the leader the local log progress so it may decide whether to perform a catch up
+        response.setLastLogIndex(logManager.getLastLogIndex());
+        response.setLastLogTerm(logManager.getLastLogTerm());
 
-        synchronized (logManager) {
-          // tell the leader the local log progress so it may decide whether to perform a catch up
-          response.setLastLogIndex(logManager.getLastLogIndex());
-          response.setLastLogTerm(logManager.getLastLogTerm());
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}: log commit log index = {}, max have applied commit index = {}", name,
+              logManager.getCommitLogIndex(), logManager.getMaxHaveAppliedCommitIndex());
         }
 
         if (logManager.getCommitLogIndex() < request.getCommitLogIndex()) {
@@ -433,8 +437,8 @@ public abstract class RaftMember {
                   logManager.getLastLogIndex(), logManager.getLastLogTerm());
         }
 
-        if (logger.isTraceEnabled()) {
-          logger.trace("{} received heartbeat from a valid leader {}", name, request.getLeader());
+        if (logger.isDebugEnabled()) {
+          logger.debug("{} received heartbeat from a valid leader {}", name, request.getLeader());
         }
       }
       return response;
@@ -1537,7 +1541,7 @@ public abstract class RaftMember {
       long startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG_TO_FOLLOWERS.getOperationStartTime();
       logger.debug("{}: Send log {} to other nodes, retry times: {}", name, log, retryTime);
       if (character != NodeCharacter.LEADER) {
-        logger.debug("Has lose leadership, so need not to send log");
+        logger.debug("{}: Has lose leadership, so need not to send log", name);
         return false;
       }
       AppendLogResult result = sendLogToFollowers(log);
@@ -1750,7 +1754,7 @@ public abstract class RaftMember {
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
    * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  private long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+  protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
     long resp = checkPrevLogIndex(prevLogIndex);
     if (resp != Response.RESPONSE_AGREE) {
       return resp;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index a7a96a5..e1c899a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -202,7 +202,7 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
    */
   @Override
   public void exile(ByteBuffer removeNodeLogBuffer, AsyncMethodCallback<Void> resultHandler) {
-    logger.info("Start to exile.");
+    logger.info("{}: start to exile.", name);
     removeNodeLogBuffer.get();
     RemoveNodeLog removeNodeLog = new RemoveNodeLog();
     removeNodeLog.deserialize(removeNodeLogBuffer);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index c3804e8..dfb2aee 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -193,6 +193,8 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
    */
   @Override
   public void exile(ByteBuffer removeNodeLogBuffer) {
+    logger.info("{}: start to exile.", name);
+    removeNodeLogBuffer.get();
     RemoveNodeLog removeNodeLog = new RemoveNodeLog();
     removeNodeLog.deserialize(removeNodeLogBuffer);
     metaGroupMember.applyRemoveNode(removeNodeLog);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index f1f46d2..09464b4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -18,18 +18,24 @@
  */
 package org.apache.iotdb.cluster.utils.nodetool;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.collections4.map.MultiKeyMap;
 import org.apache.iotdb.cluster.ClusterMain;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.LeaderUnknownException;
+import org.apache.iotdb.cluster.exception.RedirectMetaLeaderException;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.MetaClusterServer;
+import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -39,6 +45,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,9 +72,57 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
   }
 
   @Override
-  public List<Node> getRing() {
-    PartitionTable partitionTable = getPartitionTable();
-    return partitionTable != null ? partitionTable.getAllNodes() : null;
+  public List<Pair<Node, NodeCharacter>> getMetaGroup() {
+    MetaGroupMember metaMember = getMetaGroupMember();
+    if (metaMember.getPartitionTable() == null) {
+      return null;
+    }
+    List<Pair<Node, NodeCharacter>> res = new ArrayList<>();
+    for (Node node : metaMember.getPartitionTable().getAllNodes()) {
+      if (node.equals(metaMember.getThisNode())) {
+        res.add(new Pair<>(node, NodeCharacter.LEADER));
+      } else {
+        res.add(new Pair<>(node, NodeCharacter.FOLLOWER));
+      }
+    }
+    return res;
+  }
+
+  @Override
+  public List<Pair<Node, NodeCharacter>> getDataGroup(int raftId) throws Exception {
+    MetaGroupMember metaMember = getMetaGroupMember();
+    if (metaMember.getPartitionTable() == null) {
+      return null;
+    }
+    RaftNode raftNode = new RaftNode(metaMember.getThisNode(), raftId);
+    DataGroupMember dataMember = metaMember.getDataClusterServer().getHeaderGroupMap().getOrDefault(raftNode, null);
+    if (dataMember == null) {
+      throw new Exception(String.format("Partition whose header is %s doesn't exist.", raftNode));
+    }
+    List<Pair<Node, NodeCharacter>> res = new ArrayList<>();
+    for (Node node : dataMember.getAllNodes()) {
+      if (node.equals(metaMember.getThisNode())) {
+        res.add(new Pair<>(node, NodeCharacter.LEADER));
+      } else {
+        res.add(new Pair<>(node, NodeCharacter.FOLLOWER));
+      }
+    }
+    return res;
+  }
+
+  @Override
+  public Map<PartitionGroup, Integer> getSlotNumInDataMigration()
+      throws RedirectMetaLeaderException, LeaderUnknownException {
+    MetaGroupMember member = getMetaGroupMember();
+    if (member.getCharacter() != NodeCharacter.LEADER) {
+      if (member.getCharacter() == null) {
+        throw new LeaderUnknownException(member.getAllNodes());
+      } else {
+        throw new RedirectMetaLeaderException(member.getLeader());
+      }
+    } else {
+      return Collections.EMPTY_MAP;
+    }
   }
 
   @Override
@@ -80,7 +135,6 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
     try {
       return partitionTable.partitionByPathRangeTime(new PartialPath(path), startTime, endTime);
     } catch (MetadataException e) {
-      LOGGER.error("The storage group of path {} doesn't exist.", path, e);
       return new MultiKeyMap<>();
     }
   }
@@ -94,27 +148,11 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
     try {
       return partitionTable.partitionByPathTime(new PartialPath(path), 0);
     } catch (MetadataException e) {
-      LOGGER.error("The storage group of path {} doesn't exist.", path, e);
       return new PartitionGroup();
     }
   }
 
   @Override
-  public Map<PartitionGroup, Integer> getSlotNumOfCurNode() {
-    PartitionTable partitionTable = getPartitionTable();
-    if (partitionTable == null || partitionTable.getLocalGroups() == null) {
-      return null;
-    }
-    List<PartitionGroup> localGroups = partitionTable.getLocalGroups();
-    Map<RaftNode, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
-    Map<PartitionGroup, Integer> raftGroupMapSlotNum = new HashMap<>();
-    for (PartitionGroup group : localGroups) {
-      raftGroupMapSlotNum.put(group, nodeSlotMap.get(new RaftNode(group.getHeader(), group.getId())).size());
-    }
-    return raftGroupMapSlotNum;
-  }
-
-  @Override
   public Map<PartitionGroup, Integer> getSlotNumOfAllNode() {
     PartitionTable partitionTable = getPartitionTable();
     if (partitionTable == null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
index cc3e7b7..3c4db07 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
@@ -21,17 +21,31 @@ package org.apache.iotdb.cluster.utils.nodetool;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.collections4.map.MultiKeyMap;
+import org.apache.iotdb.cluster.exception.LeaderUnknownException;
+import org.apache.iotdb.cluster.exception.RedirectMetaLeaderException;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 public interface ClusterMonitorMBean {
 
   /**
-   * Get physical hash ring
-   *
-   * @return Node list
+   * Show the character of meta raft group.
+   */
+  List<Pair<Node, NodeCharacter>> getMetaGroup();
+
+  /**
+   * Show the character of target data raft group whose header is this node.
+   */
+  List<Pair<Node, NodeCharacter>> getDataGroup(int raftId) throws Exception;
+
+  /**
+   * Query how many slots are still PULLING or PULLING_WRITABLE, it means whether user can add/remove a node.
+   * @return key: group, value: slot num that still in the process of data migration
    */
-  List<Node> getRing();
+  Map<PartitionGroup, Integer> getSlotNumInDataMigration()
+      throws RedirectMetaLeaderException, LeaderUnknownException;
 
   /**
    * Get data partition information of input path and time range.
@@ -50,13 +64,6 @@ public interface ClusterMonitorMBean {
   PartitionGroup getMetaPartition(String path);
 
   /**
-   * Get data partition groups that input node belongs to and the slot number in each partition group.
-   *
-   * @return key: the partition group, value: the slot number
-   */
-  Map<PartitionGroup, Integer> getSlotNumOfCurNode();
-
-  /**
    * Get all data partition groups and the slot number in each partition group.
    *
    * @return key: the partition group, value: the slot number
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/NodeTool.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/NodeTool.java
index 1b05a79..743c5a8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/NodeTool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/NodeTool.java
@@ -22,10 +22,12 @@ package org.apache.iotdb.cluster.utils.nodetool;
 import com.google.common.collect.Lists;
 import io.airlift.airline.Help;
 import java.util.List;
-import org.apache.iotdb.cluster.utils.nodetool.function.Host;
+import org.apache.iotdb.cluster.utils.nodetool.function.Header;
+import org.apache.iotdb.cluster.utils.nodetool.function.Slot;
 import org.apache.iotdb.cluster.utils.nodetool.function.LogView;
 import org.apache.iotdb.cluster.utils.nodetool.function.Partition;
 import org.apache.iotdb.cluster.utils.nodetool.function.Ring;
+import org.apache.iotdb.cluster.utils.nodetool.function.Migration;
 import org.apache.iotdb.cluster.utils.nodetool.function.Status;
 import org.apache.iotdb.db.utils.CommonUtils;
 
@@ -36,9 +38,11 @@ public class NodeTool {
         Help.class,
         Ring.class,
         Partition.class,
-        Host.class,
+        Slot.class,
         Status.class,
-        LogView.class
+        LogView.class,
+        Migration.class,
+        Header.class
     );
 
     int status = CommonUtils.runCli(commands, args, "nodetool", "Manage your IoTDB cluster");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Ring.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Header.java
similarity index 57%
copy from cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Ring.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Header.java
index 9b70e3c..01a279a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Ring.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Header.java
@@ -21,22 +21,36 @@ package org.apache.iotdb.cluster.utils.nodetool.function;
 import static org.apache.iotdb.cluster.utils.nodetool.Printer.msgPrintln;
 
 import io.airlift.airline.Command;
+import io.airlift.airline.Option;
 import java.util.List;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitorMBean;
+import org.apache.iotdb.tsfile.utils.Pair;
 
-@Command(name = "ring", description = "Print information about the hash ring of nodes")
-public class Ring extends NodeToolCmd {
+@Command(name = "header", description = "Print information about data group whose header is this node")
+public class Header extends NodeToolCmd {
+
+  @Option(title = "raft id", name = {"-id",
+      "--raftid"}, description = "Specify the raft id of data group whose header is this node, default value is 0")
+  private int id = 0;
 
   @Override
   public void execute(ClusterMonitorMBean proxy) {
-    List<Node> allNodes = proxy.getRing();
-    if (allNodes == null) {
-      msgPrintln(BUILDING_CLUSTER_INFO);
-    } else {
+    try {
+      List<Pair<Node, NodeCharacter>> allNodes = proxy.getDataGroup(id);
+      if (allNodes == null) {
+        msgPrintln(BUILDING_CLUSTER_INFO);
+        return;
+      }
       msgPrintln(String.format("%-20s  %30s", "Node Identifier", "Node"));
-      allNodes.forEach(
-          node -> msgPrintln(String.format("%-20d->%30s", node.nodeIdentifier, nodeToString(node))));
+      for (Pair<Node, NodeCharacter> pair : allNodes) {
+        Node node = pair.left;
+        msgPrintln(String
+            .format("%-20d->%30s", node.nodeIdentifier, nodeCharacterToString(node, pair.right)));
+      }
+    } catch (Exception e) {
+      msgPrintln(e.getMessage());
     }
   }
 }
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Migration.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Migration.java
new file mode 100644
index 0000000..c3c7bad
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Migration.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils.nodetool.function;
+
+import static org.apache.iotdb.cluster.utils.nodetool.Printer.msgPrintln;
+
+import io.airlift.airline.Command;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.cluster.exception.LeaderUnknownException;
+import org.apache.iotdb.cluster.exception.RedirectMetaLeaderException;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitorMBean;
+
+@Command(name = "migration", description = "Print information about how many slots are in the state of data migration for each data group. ")
+public class Migration extends NodeToolCmd {
+
+  @Override
+  public void execute(ClusterMonitorMBean proxy) {
+    try {
+      Map<PartitionGroup, Integer> groupSlotsMap = proxy.getSlotNumInDataMigration();
+      if (groupSlotsMap == null) {
+        msgPrintln(BUILDING_CLUSTER_INFO);
+        return;
+      }
+      if (groupSlotsMap.isEmpty()) {
+        msgPrintln("No slots are in the state of data migration, users can change membership.");
+      } else {
+        msgPrintln(
+            "Some slots are in the state of data migration, users can not change membership until the end of data migration:");
+        msgPrintln(String.format("%-20s  %30s", "Slot num", "Data Group"));
+        for (Entry<PartitionGroup, Integer> entry : groupSlotsMap.entrySet()) {
+          PartitionGroup group = entry.getKey();
+          msgPrintln(String
+              .format("%-20d->%30s", entry.getValue(), group));
+        }
+      }
+    } catch (RedirectMetaLeaderException e) {
+      msgPrintln(redirectToQueryMetaLeader(e.getMetaLeader()));
+    } catch (LeaderUnknownException e) {
+      msgPrintln(META_LEADER_UNKNOWN_INFO);
+    }
+  }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/NodeToolCmd.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/NodeToolCmd.java
index ff9fd3f..0043c15 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/NodeToolCmd.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/NodeToolCmd.java
@@ -36,6 +36,7 @@ import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
 import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitorMBean;
 
@@ -62,6 +63,8 @@ public abstract class NodeToolCmd implements Runnable {
 
   static final String BUILDING_CLUSTER_INFO = "The cluster is being created.";
 
+  static final String META_LEADER_UNKNOWN_INFO = "Meta group leader is unknown, please try again later.";
+
   @Override
   public void run() {
     try {
@@ -97,12 +100,20 @@ public abstract class NodeToolCmd implements Runnable {
     return mbsc;
   }
 
-  String nodeToString(Node node) {
+  public String nodeCharacterToString(Node node, NodeCharacter character) {
+    return String.format("%s (%s)", nodeToString(node), character);
+  }
+
+  public String nodeToString(Node node) {
     return String.format("%s:%d:%d:%d", node.getIp(), node.getMetaPort(), node.getDataPort(),
         node.getClientPort());
   }
 
-  String partitionGroupToString(PartitionGroup group) {
+  public String redirectToQueryMetaLeader(Node node) {
+    return String.format("Please redirect to query meta group leader %s", nodeToString(node));
+  }
+
+  public String partitionGroupToString(PartitionGroup group) {
     StringBuilder stringBuilder = new StringBuilder("[");
     if (!group.isEmpty()) {
       stringBuilder.append(nodeToString(group.get(0)));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Ring.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Ring.java
index 9b70e3c..7016461 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Ring.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Ring.java
@@ -23,20 +23,25 @@ import static org.apache.iotdb.cluster.utils.nodetool.Printer.msgPrintln;
 import io.airlift.airline.Command;
 import java.util.List;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitorMBean;
+import org.apache.iotdb.tsfile.utils.Pair;
 
-@Command(name = "ring", description = "Print information about the hash ring of nodes")
+@Command(name = "ring", description = "Print information about the meta raft group")
 public class Ring extends NodeToolCmd {
 
   @Override
   public void execute(ClusterMonitorMBean proxy) {
-    List<Node> allNodes = proxy.getRing();
+    List<Pair<Node, NodeCharacter>> allNodes = proxy.getMetaGroup();
     if (allNodes == null) {
       msgPrintln(BUILDING_CLUSTER_INFO);
     } else {
       msgPrintln(String.format("%-20s  %30s", "Node Identifier", "Node"));
-      allNodes.forEach(
-          node -> msgPrintln(String.format("%-20d->%30s", node.nodeIdentifier, nodeToString(node))));
+      for (Pair<Node, NodeCharacter> pair : allNodes) {
+        Node node = pair.left;
+        msgPrintln(String
+            .format("%-20d->%30s", node.nodeIdentifier, nodeCharacterToString(node, pair.right)));
+      }
     }
   }
 }
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Slot.java
similarity index 80%
rename from cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Slot.java
index d32b94d..ca8d8ff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Slot.java
@@ -27,22 +27,14 @@ import java.util.Map.Entry;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitorMBean;
 
-@Command(name = "host", description = "Print partitions information which a specific host belongs to")
-public class Host extends NodeToolCmd {
-
-  @Option(title = "all nodes", name = {"-a", "--all"}, description = "Show all nodes partition info")
-  private boolean showAll = false;
+@Command(name = "slot", description = "Print slot information of all data groups")
+public class Slot extends NodeToolCmd {
 
   @Override
   public void execute(ClusterMonitorMBean proxy) {
-    Map<PartitionGroup, Integer> raftGroupMapSlotNum;
-    if (showAll) {
-      raftGroupMapSlotNum = proxy.getSlotNumOfAllNode();
-    } else {
-      raftGroupMapSlotNum = proxy.getSlotNumOfCurNode();
-    }
+    Map<PartitionGroup, Integer> raftGroupMapSlotNum = proxy.getSlotNumOfAllNode();
     if (raftGroupMapSlotNum == null) {
-      msgPrintln("The cluster is being created.");
+      msgPrintln(BUILDING_CLUSTER_INFO);
       return;
     }
     showInfo(raftGroupMapSlotNum);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
index fd1a87b..e6892a2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
@@ -24,7 +24,6 @@ import static junit.framework.TestCase.assertFalse;
 import static junit.framework.TestCase.assertTrue;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index 6d79710..c48fc71 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -108,11 +108,6 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
-    public boolean checkChangeMembershipValidity(long targetLogIndex) {
-      return true;
-    }
-
-    @Override
     public List<Node> getAllNodes() {
       return null;
     }