You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/01/14 08:09:43 UTC

[iotdb] 03/04: Reimplement the function of adding and removing nodes

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 452cba315bc66688684181e47fbff0cfcd94bf3a
Author: lta <li...@163.com>
AuthorDate: Wed Jan 13 15:49:28 2021 +0800

    Reimplement the function of adding and removing nodes
---
 .../iotdb/cluster/client/DataClientProvider.java   |   3 -
 ...ception.java => ChangeMembershipException.java} |  14 +-
 .../exception/CheckConsistencyException.java       |   4 +-
 .../java/org/apache/iotdb/cluster/log/Log.java     |   1 +
 .../iotdb/cluster/log/applier/DataLogApplier.java  |   8 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |  32 ++-
 .../iotdb/cluster/log/logtypes/AddNodeLog.java     |  40 +++-
 .../iotdb/cluster/log/logtypes/RemoveNodeLog.java  | 130 +++++++-----
 .../manage/FilePartitionedSnapshotLogManager.java  |  12 ++
 .../log/manage/MetaSingleSnapshotLogManager.java   |  19 ++
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   6 +-
 .../cluster/log/snapshot/PullSnapshotTask.java     |   5 +-
 .../iotdb/cluster/partition/NodeRemovalResult.java |  32 +++
 .../iotdb/cluster/partition/PartitionGroup.java    |  21 ++
 .../iotdb/cluster/partition/PartitionTable.java    |  17 +-
 .../partition/slot/SlotNodeRemovalResult.java      |  38 +++-
 .../cluster/partition/slot/SlotPartitionTable.java |  93 +++++----
 .../iotdb/cluster/query/ClusterPlanRouter.java     |  31 ++-
 .../iotdb/cluster/server/DataClusterServer.java    |  31 +++
 .../iotdb/cluster/server/MetaClusterServer.java    |   8 +-
 .../cluster/server/member/DataGroupMember.java     |  82 +++++---
 .../cluster/server/member/MetaGroupMember.java     | 229 +++++++++++----------
 .../iotdb/cluster/server/member/RaftMember.java    |  24 ++-
 .../cluster/server/service/MetaAsyncService.java   |   8 +-
 .../cluster/server/service/MetaSyncService.java    |   8 +-
 .../apache/iotdb/cluster/utils/StatusUtils.java    |   4 +
 .../org/apache/iotdb/cluster/common/TestUtils.java |   4 +
 .../apache/iotdb/cluster/log/LogParserTest.java    |   2 +
 .../cluster/log/applier/MetaLogApplierTest.java    |  12 +-
 .../cluster/log/logtypes/SerializeLogTest.java     |   2 +
 .../cluster/partition/SlotPartitionTableTest.java  |   3 +-
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |  25 ++-
 .../cluster/server/member/DataGroupMemberTest.java |   7 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   8 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  40 +++-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   7 +-
 .../apache/iotdb/db/qp/physical/sys/LogPlan.java   |  71 +++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +-
 thrift/src/main/thrift/cluster.thrift              |   3 +-
 39 files changed, 781 insertions(+), 306 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index 9a1c4df..4c882e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -29,12 +29,9 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class DataClientProvider {
 
-  private static final Logger logger = LoggerFactory.getLogger(DataClientProvider.class);
   /**
    * dataClientPool provides reusable thrift clients to connect to the DataGroupMembers of other
    * nodes
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
similarity index 64%
copy from cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
index 12ac407..f50e668 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
@@ -16,19 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.cluster.exception;
 
 /**
- * Raised when check consistency failed, now only happens if there is a strong-consistency and
- * syncLeader failed
+ * Raised when add/remove membership log can not be sent to all data groups
  */
-public class CheckConsistencyException extends Exception {
+public class ChangeMembershipException extends Exception {
 
-  public CheckConsistencyException(String errMag) {
-    super(String.format("check consistency failed, error message=%s ", errMag));
+  public ChangeMembershipException(String errMsg) {
+    super(String.format("change membership fail, error message=%s ", errMsg));
   }
-
-  public static final CheckConsistencyException CHECK_STRONG_CONSISTENCY_EXCEPTION =
-      new CheckConsistencyException(
-      "strong consistency, sync with leader failed");
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
index 12ac407..7b0609a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
@@ -24,8 +24,8 @@ package org.apache.iotdb.cluster.exception;
  */
 public class CheckConsistencyException extends Exception {
 
-  public CheckConsistencyException(String errMag) {
-    super(String.format("check consistency failed, error message=%s ", errMag));
+  public CheckConsistencyException(String errMsg) {
+    super(String.format("check consistency failed, error message=%s ", errMsg));
   }
 
   public static final CheckConsistencyException CHECK_STRONG_CONSISTENCY_EXCEPTION =
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 0c236b2..2903fe9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -127,4 +127,5 @@ public abstract class Log implements Comparable<Log> {
   public void setEnqueueTime(long enqueueTime) {
     this.enqueueTime = enqueueTime;
   }
+
 }
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 8ce84b5..ceed787 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.cluster.log.applier;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -60,7 +62,11 @@ public class DataLogApplier extends BaseApplier {
     logger.debug("DataMember [{}] start applying Log {}", dataGroupMember.getName(), log);
 
     try {
-      if (log instanceof PhysicalPlanLog) {
+      if (log instanceof AddNodeLog) {
+        metaGroupMember.getDataClusterServer().preAddNodeForDataGroup((AddNodeLog) log, dataGroupMember);
+      } else if (log instanceof RemoveNodeLog) {
+        metaGroupMember.getDataClusterServer().preRemoveNodeForDataGroup((RemoveNodeLog) log, dataGroupMember);
+      } else if (log instanceof PhysicalPlanLog) {
         PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
         PhysicalPlan plan = physicalPlanLog.getPlan();
         if (plan instanceof InsertPlan) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index d7dd5f9..94437ae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -19,15 +19,18 @@
 
 package org.apache.iotdb.cluster.log.applier;
 
+import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,25 +49,40 @@ public class MetaLogApplier extends BaseApplier {
 
   @Override
   public void apply(Log log) {
+    apply(log, false);
+  }
+
+  public void apply(Log log, boolean isLeader) {
     try {
       logger.debug("MetaMember [{}] starts applying Log {}", metaGroupMember.getName(), log);
       if (log instanceof AddNodeLog) {
-        AddNodeLog addNodeLog = (AddNodeLog) log;
-        Node newNode = addNodeLog.getNewNode();
-        member.applyAddNode(newNode);
+        if (isLeader) {
+          sendLogToAllDataGroups(log);
+        }
+        member.applyAddNode((AddNodeLog) log);
       } else if (log instanceof PhysicalPlanLog) {
         applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
       } else if (log instanceof RemoveNodeLog) {
-        RemoveNodeLog removeNodeLog = ((RemoveNodeLog) log);
-        member.applyRemoveNode(removeNodeLog.getRemovedNode());
+        if (isLeader) {
+          sendLogToAllDataGroups(log);
+        }
+        member.applyRemoveNode(((RemoveNodeLog) log));
       } else {
         logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
       }
-    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) {
+    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException e) {
       logger.debug("Exception occurred when executing {}", log, e);
       log.setException(e);
     } finally {
       log.setApplied(true);
     }
   }
+
+  private void sendLogToAllDataGroups(Log log) throws ChangeMembershipException {
+    LogPlan plan = new LogPlan(log.serialize());
+    TSStatus status = member.executeNonQueryPlan(plan);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new ChangeMembershipException(String.format("apply %s failed", log));
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
index f54725d..824c3f2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
@@ -33,16 +33,34 @@ import org.apache.iotdb.db.utils.SerializeUtils;
  */
 public class AddNodeLog extends Log {
 
+  private ByteBuffer partitionTable;
+
   private Node newNode;
 
-  public Node getNewNode() {
-    return newNode;
+  public AddNodeLog(ByteBuffer partitionTable, Node newNode) {
+    this.partitionTable = partitionTable;
+    this.newNode = newNode;
+  }
+
+  public AddNodeLog() {
+  }
+
+  public void setPartitionTable(ByteBuffer partitionTable) {
+    this.partitionTable = partitionTable;
   }
 
   public void setNewNode(Node newNode) {
     this.newNode = newNode;
   }
 
+  public Node getNewNode() {
+    return newNode;
+  }
+
+  public ByteBuffer getPartitionTable() {
+    return partitionTable;
+  }
+
   @Override
   public ByteBuffer serialize() {
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -52,6 +70,9 @@ public class AddNodeLog extends Log {
       dataOutputStream.writeLong(getCurrLogTerm());
 
       SerializeUtils.serialize(newNode, dataOutputStream);
+
+      dataOutputStream.write(partitionTable.array().length);
+      dataOutputStream.write(partitionTable.array());
     } catch (IOException e) {
       // ignored
     }
@@ -69,6 +90,9 @@ public class AddNodeLog extends Log {
 
     newNode = new Node();
     SerializeUtils.deserialize(newNode, buffer);
+
+    int len = buffer.getInt();
+    partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
   }
 
   @Override
@@ -83,11 +107,19 @@ public class AddNodeLog extends Log {
       return false;
     }
     AddNodeLog that = (AddNodeLog) o;
-    return Objects.equals(newNode, that.newNode);
+    return Objects.equals(newNode, that.newNode) && Objects
+        .equals(partitionTable, that.partitionTable);
+  }
+
+  @Override
+  public String toString() {
+    return "AddNodeLog{" +
+        "newNode=" + newNode +
+        '}';
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), newNode);
+    return Objects.hash(super.hashCode(), newNode, partitionTable);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 02d89d0..800b77d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -19,69 +19,101 @@
 
 package org.apache.iotdb.cluster.log.logtypes;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.nio.ByteBuffer;
 import org.apache.iotdb.db.utils.SerializeUtils;
 
 public class RemoveNodeLog extends Log {
 
-    private Node removedNode;
-
-    @Override
-    public ByteBuffer serialize() {
-        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-        try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
-            dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
-            dataOutputStream.writeLong(getCurrLogIndex());
-            dataOutputStream.writeLong(getCurrLogTerm());
-
-            SerializeUtils.serialize(removedNode, dataOutputStream);
-        } catch (IOException e) {
-            // ignored
-        }
-        return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
-    }
+  private ByteBuffer partitionTable;
 
-    @Override
-    public void deserialize(ByteBuffer buffer) {
-        setCurrLogIndex(buffer.getLong());
-        setCurrLogTerm(buffer.getLong());
+  private Node removedNode;
 
-        removedNode = new Node();
-        SerializeUtils.deserialize(removedNode, buffer);
-    }
+  public RemoveNodeLog(ByteBuffer partitionTable,
+      Node removedNode) {
+    this.partitionTable = partitionTable;
+    this.removedNode = removedNode;
+  }
 
-    public Node getRemovedNode() {
-        return removedNode;
-    }
+  public RemoveNodeLog() {
+  }
 
-    public void setRemovedNode(Node removedNode) {
-        this.removedNode = removedNode;
-    }
+  public ByteBuffer getPartitionTable() {
+    return partitionTable;
+  }
+
+  public void setPartitionTable(ByteBuffer partitionTable) {
+    this.partitionTable = partitionTable;
+  }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        if (!super.equals(o)) {
-            return false;
-        }
-        RemoveNodeLog that = (RemoveNodeLog) o;
-        return Objects.equals(removedNode, that.removedNode);
+  @Override
+  public ByteBuffer serialize() {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+      dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
+      dataOutputStream.writeLong(getCurrLogIndex());
+      dataOutputStream.writeLong(getCurrLogTerm());
+
+      SerializeUtils.serialize(removedNode, dataOutputStream);
+
+      dataOutputStream.write(partitionTable.array().length);
+      dataOutputStream.write(partitionTable.array());
+    } catch (IOException e) {
+      // ignored
     }
+    return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) {
+    setCurrLogIndex(buffer.getLong());
+    setCurrLogTerm(buffer.getLong());
+
+    removedNode = new Node();
+    SerializeUtils.deserialize(removedNode, buffer);
+
+    int len = buffer.getInt();
+    partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+  }
+
+  public Node getRemovedNode() {
+    return removedNode;
+  }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(super.hashCode(), removedNode);
+  public void setRemovedNode(Node removedNode) {
+    this.removedNode = removedNode;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
     }
+    RemoveNodeLog that = (RemoveNodeLog) o;
+    return Objects.equals(removedNode, that.removedNode) && Objects
+        .equals(partitionTable, that.partitionTable);
+  }
+
+  @Override
+  public String toString() {
+    return "RemoveNodeLog{" +
+        "removedNode=" + removedNode +
+        '}';
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), removedNode, partitionTable);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 79f3cd1..682da96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -28,7 +28,10 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.iotdb.cluster.exception.EntryCompactedException;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot.Factory;
 import org.apache.iotdb.cluster.partition.PartitionTable;
@@ -202,4 +205,13 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
     }
     return true;
   }
+
+  @Override
+  public long append(Log entry) {
+    long lastLogIndex = super.append(entry);
+    if (lastLogIndex != -1 && (entry instanceof AddNodeLog || entry instanceof RemoveNodeLog)) {
+      logApplier.apply(entry);
+    }
+    return lastLogIndex;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index ff650e3..1e86e11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@ -20,11 +20,15 @@
 package org.apache.iotdb.cluster.log.manage;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.Snapshot;
+import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
 import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
 import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
+import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
@@ -81,4 +85,19 @@ public class MetaSingleSnapshotLogManager extends RaftLogManager {
     snapshot.setLastLogTerm(term);
     return snapshot;
   }
+
+  @Override
+  void applyEntries(List<Log> entries) {
+    for (Log entry : entries) {
+      if (blockAppliedCommitIndex > 0 && entry.getCurrLogIndex() > blockAppliedCommitIndex) {
+        blockedUnappliedLogList.add(entry);
+        continue;
+      }
+      try {
+        ((MetaLogApplier)logApplier).apply(entry, metaGroupMember.getCharacter() == NodeCharacter.LEADER);
+      } catch (Exception e) {
+        entry.setException(e);
+      }
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..bd65c26 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -82,10 +82,10 @@ public abstract class RaftLogManager {
    * The committed log whose index is larger than blockAppliedCommitIndex will be blocked. if
    * blockAppliedCommitIndex < 0(default is -1), will not block any operation.
    */
-  private volatile long blockAppliedCommitIndex;
+  protected volatile long blockAppliedCommitIndex;
 
 
-  private LogApplier logApplier;
+  protected LogApplier logApplier;
 
   /**
    * to distinguish managers of different members
@@ -116,7 +116,7 @@ public abstract class RaftLogManager {
    */
   private final Object logUpdateCondition = new Object();
 
-  private List<Log> blockedUnappliedLogList;
+  protected List<Log> blockedUnappliedLogList;
 
   protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier applier, String name) {
     this.logApplier = applier;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 4a79485..752e3e3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -166,6 +166,9 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
   public Void call() {
     // If this node is the member of previous holder, it's unnecessary to pull data again
     if (descriptor.getPreviousHolders().contains(newMember.getThisNode())) {
+      for (Integer slot: descriptor.getSlots()) {
+        newMember.getSlotManager().setToNull(slot);
+      }
       // inform the previous holders that one member has successfully pulled snapshot directly
       newMember.registerPullSnapshotHint(descriptor);
     } else {
@@ -176,7 +179,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
       request.setRequireReadOnly(descriptor.isRequireReadOnly());
 
       boolean finished = false;
-      int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode());
+      int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()) - 1;
       while (!finished) {
         try {
           // sequentially pick up a node that may have this slot
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 5493980..4193ffd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -19,8 +19,13 @@
 
 package org.apache.iotdb.cluster.partition;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
 
 /**
  * NodeRemovalResult stores the removed partition group.
@@ -61,4 +66,31 @@ public class NodeRemovalResult {
     }
     return null;
   }
+
+  public void serialize(DataOutputStream dataOutputStream) throws IOException {
+    dataOutputStream.writeInt(removedGroupList.size());
+    for (PartitionGroup group: removedGroupList) {
+      group.serialize(dataOutputStream);
+    }
+    dataOutputStream.writeInt(newGroupList.size());
+    for (PartitionGroup group: newGroupList) {
+      group.serialize(dataOutputStream);
+    }
+  }
+
+  public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+    int removedGroupListSize = buffer.getInt();
+    for (int i = 0 ; i < removedGroupListSize; i++) {
+      PartitionGroup group = new PartitionGroup();
+      group.deserialize(buffer, idNodeMap);
+      removedGroupList.add(group);
+    }
+
+    int newGroupListSize = buffer.getInt();
+    for (int i = 0 ; i < newGroupListSize; i++) {
+      PartitionGroup group = new PartitionGroup();
+      group.deserialize(buffer, idNodeMap);
+      newGroupList.add(group);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index 2a562ac..b35cc10 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -19,9 +19,13 @@
 
 package org.apache.iotdb.cluster.partition;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 
@@ -64,6 +68,23 @@ public class PartitionGroup extends ArrayList<Node> {
         super.equals(group);
   }
 
+  public void serialize(DataOutputStream dataOutputStream)
+      throws IOException {
+    dataOutputStream.writeInt(getId());
+    dataOutputStream.writeInt(size());
+    for (Node node : this) {
+      dataOutputStream.writeInt(node.getNodeIdentifier());
+    }
+  }
+
+  public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+    id = buffer.getInt();
+    int nodeNum = buffer.getInt();
+    for (int i2 = 0; i2 < nodeNum; i2++) {
+      add(idNodeMap.get(buffer.getInt()));
+    }
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(id, super.hashCode());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 079aad1..6bf6c0c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -63,14 +63,18 @@ public interface PartitionTable {
    * @param node
    * @return the new group generated by the node
    */
-  NodeAdditionResult addNode(Node node);
+  void addNode(Node node);
+
+  NodeAdditionResult getNodeAdditionResult(Node node);
 
   /**
    * Remove a node and update the partition table.
    *
    * @param node
    */
-  NodeRemovalResult removeNode(Node node);
+  void removeNode(Node node);
+
+  NodeRemovalResult getNodeRemovalResult();
 
   /**
    * @return All data groups where all VNodes of this node is the header. The first index indicates
@@ -88,12 +92,19 @@ public interface PartitionTable {
 
   ByteBuffer serialize();
 
-  void deserialize(ByteBuffer buffer);
+  /**
+   * Deserialize partition table and check whether the partition table in byte buffer is valid
+   * @param buffer
+   * @return true if the partition table is valid
+   */
+  boolean deserialize(ByteBuffer buffer);
 
   List<Node> getAllNodes();
 
   List<PartitionGroup> getGlobalGroups();
 
+  List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing);
+
   /**
    * Judge whether the data of slot is held by node
    * @param node target node
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
index 17a0c93..a04a289 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
@@ -19,9 +19,15 @@
 
 package org.apache.iotdb.cluster.partition.slot;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.cluster.partition.NodeRemovalResult;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 
 /**
@@ -29,7 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
  */
 public class SlotNodeRemovalResult extends NodeRemovalResult {
 
-  private Map<RaftNode, List<Integer>> newSlotOwners;
+  private Map<RaftNode, List<Integer>> newSlotOwners = new HashMap<>();
 
   public Map<RaftNode, List<Integer>> getNewSlotOwners() {
     return newSlotOwners;
@@ -38,4 +44,34 @@ public class SlotNodeRemovalResult extends NodeRemovalResult {
   public void addNewSlotOwners(Map<RaftNode, List<Integer>> newSlotOwners) {
     this.newSlotOwners = newSlotOwners;
   }
+
+  @Override
+  public void serialize(DataOutputStream dataOutputStream) throws IOException {
+    super.serialize(dataOutputStream);
+    dataOutputStream.writeInt(newSlotOwners.size());
+    for (Map.Entry<RaftNode, List<Integer>> entry: newSlotOwners.entrySet()) {
+      RaftNode raftNode = entry.getKey();
+      dataOutputStream.writeInt(raftNode.getNode().nodeIdentifier);
+      dataOutputStream.writeInt(raftNode.getRaftId());
+      dataOutputStream.writeInt(entry.getValue().size());
+      for (Integer slot: entry.getValue()) {
+        dataOutputStream.writeInt(slot);
+      }
+    }
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+    super.deserialize(buffer, idNodeMap);
+    int size = buffer.getInt();
+    for (int i = 0 ; i < size; i++) {
+      RaftNode raftNode = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt());
+      List<Integer> slots = new ArrayList<>();
+      int slotSize = buffer.getInt();
+      for (int j = 0 ; j < slotSize; j++) {
+        slots.add(buffer.getInt());
+      }
+      newSlotOwners.put(raftNode, slots);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 2a5ae3c..f441e4a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -62,8 +62,11 @@ public class SlotPartitionTable implements PartitionTable {
   private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM];
   // the nodes that each slot belongs to before a new node is added, used for the new node to
   // find the data source
+  // find the data source
   private Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = new ConcurrentHashMap<>();
 
+  private NodeRemovalResult nodeRemovalResult = new NodeRemovalResult();
+
   //the filed is used for determining which nodes need to be a group.
   // the data groups which this node belongs to.
   private List<PartitionGroup> localGroups;
@@ -231,11 +234,11 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public NodeAdditionResult addNode(Node node) {
+  public void addNode(Node node) {
     List<Node> oldRing;
     synchronized (nodeRing) {
       if (nodeRing.contains(node)) {
-        return null;
+        return;
       }
 
       oldRing = new ArrayList<>(nodeRing);
@@ -270,21 +273,34 @@ public class SlotPartitionTable implements PartitionTable {
       }
     }
 
-    SlotNodeAdditionResult result = new SlotNodeAdditionResult();
     for (int raftId = 0; raftId < multiRaftFactor; raftId++) {
       PartitionGroup newGroup = getHeaderGroup(new RaftNode(node, raftId));
       if (newGroup.contains(thisNode)) {
         localGroups.add(newGroup);
       }
-      result.addNewGroup(newGroup);
     }
 
-    calculateGlobalGroups();
+    calculateGlobalGroups(nodeRing);
 
     // the slots movement is only done logically, the new node itself will pull data from the
     // old node
-    result.setLostSlots(moveSlotsToNew(node, oldRing));
+    moveSlotsToNew(node, oldRing);
 
+  }
+
+  @Override
+  public NodeAdditionResult getNodeAdditionResult(Node node) {
+    SlotNodeAdditionResult result = new SlotNodeAdditionResult();
+    Map<RaftNode, Set<Integer>> lostSlotsMap = new HashMap<>();
+    for (int raftId = 0; raftId < multiRaftFactor; raftId++) {
+      RaftNode raftNode = new RaftNode(node, raftId);
+      result.addNewGroup(getHeaderGroup(raftNode));
+      for (Entry<Integer, PartitionGroup> entry: previousNodeMap.get(raftNode).entrySet()) {
+        RaftNode header = new RaftNode(entry.getValue().getHeader(), entry.getValue().getId());
+        lostSlotsMap.computeIfAbsent(header, k -> new HashSet<>()).add(entry.getKey());
+      }
+    }
+    result.setLostSlots(lostSlotsMap);
     return result;
   }
 
@@ -294,10 +310,8 @@ public class SlotPartitionTable implements PartitionTable {
    * node.
    *
    * @param newNode
-   * @return a map recording what slots each group lost.
    */
-  private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode, List<Node> oldRing) {
-    Map<RaftNode, Set<Integer>> result = new HashMap<>();
+  private void moveSlotsToNew(Node newNode, List<Node> oldRing) {
     // as a node is added, the average slots for each node decrease
     // move the slots to the new node if any previous node have more slots than the new average
     int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor;
@@ -324,7 +338,6 @@ public class SlotPartitionTable implements PartitionTable {
           previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
           slotNodes[slot] = curNode;
         }
-        result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove);
         transferNum -= numToMove;
         if (transferNum > 0) {
           curNode = new RaftNode(newNode, ++raftId);
@@ -335,11 +348,9 @@ public class SlotPartitionTable implements PartitionTable {
             previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
             slotNodes[slot] = curNode;
           }
-          result.get(entry.getKey()).addAll(slotsToMove);
         }
       }
     }
-    return result;
   }
 
   @Override
@@ -354,6 +365,7 @@ public class SlotPartitionTable implements PartitionTable {
     DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
 
     try {
+      dataOutputStream.writeLong(lastLogIndex);
       dataOutputStream.writeInt(totalSlotNumbers);
       dataOutputStream.writeInt(nodeSlotMap.size());
       for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
@@ -370,16 +382,11 @@ public class SlotPartitionTable implements PartitionTable {
         dataOutputStream.writeInt(prevHolders.size());
         for (Entry<Integer, PartitionGroup> integerNodeEntry : prevHolders.entrySet()) {
           dataOutputStream.writeInt(integerNodeEntry.getKey());
-          PartitionGroup group = integerNodeEntry.getValue();
-          dataOutputStream.writeInt(group.getId());
-          dataOutputStream.writeInt(group.size());
-          for (Node node : group) {
-            dataOutputStream.writeInt(node.getNodeIdentifier());
-          }
+          integerNodeEntry.getValue().serialize(dataOutputStream);
         }
       }
 
-      dataOutputStream.writeLong(lastLogIndex);
+      nodeRemovalResult.serialize(dataOutputStream);
     } catch (IOException ignored) {
       // not reachable
     }
@@ -387,8 +394,14 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public void deserialize(ByteBuffer buffer) {
+  public synchronized boolean deserialize(ByteBuffer buffer) {
+    long newLastLogIndex = buffer.getLong();
 
+    // judge whether the partition table of byte buffer is out of date
+    if (lastLogIndex >= newLastLogIndex) {
+      return lastLogIndex <= newLastLogIndex;
+    }
+    lastLogIndex = newLastLogIndex;
     logger.info("Initializing the partition table from buffer");
     totalSlotNumbers = buffer.getInt();
     int size = buffer.getInt();
@@ -415,17 +428,15 @@ public class SlotPartitionTable implements PartitionTable {
       Map<Integer, PartitionGroup> prevHolders = new HashMap<>();
       int holderNum = buffer.getInt();
       for (int i1 = 0; i1 < holderNum; i1++) {
-        int slot = buffer.getInt();
-        PartitionGroup group = new PartitionGroup(buffer.getInt());
-        int nodeNum = buffer.getInt();
-        for (int i2 = 0 ; i2 < nodeNum; i2++) {
-          group.add(idNodeMap.get(buffer.getInt()));
-        }
-        prevHolders.put(slot, group);
+        PartitionGroup group = new PartitionGroup();
+        group.deserialize(buffer, idNodeMap);
+        prevHolders.put(buffer.getInt(), group);
       }
       previousNodeMap.put(node, prevHolders);
     }
-    lastLogIndex = buffer.getLong();
+
+    nodeRemovalResult = new NodeRemovalResult();
+    nodeRemovalResult.deserialize(buffer, idNodeMap);
 
     for (RaftNode raftNode : nodeSlotMap.keySet()) {
       if (!nodeRing.contains(raftNode.getNode())) {
@@ -436,6 +447,7 @@ public class SlotPartitionTable implements PartitionTable {
     logger.info("All known nodes: {}", nodeRing);
 
     localGroups = getPartitionGroups(thisNode);
+    return true;
   }
 
   @Override
@@ -485,10 +497,10 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public NodeRemovalResult removeNode(Node target) {
+  public void removeNode(Node target) {
     synchronized (nodeRing) {
       if (!nodeRing.contains(target)) {
-        return null;
+        return;
       }
 
       SlotNodeRemovalResult result = new SlotNodeRemovalResult();
@@ -532,16 +544,21 @@ public class SlotPartitionTable implements PartitionTable {
         result.addNewGroup(newGrp);
       }
 
-      calculateGlobalGroups();
+      calculateGlobalGroups(nodeRing);
 
       // the slots movement is only done logically, the new node itself will pull data from the
       // old node
       Map<RaftNode, List<Integer>> raftNodeListMap = retrieveSlots(target);
       result.addNewSlotOwners(raftNodeListMap);
-      return result;
+      this.nodeRemovalResult = result;
     }
   }
 
+  @Override
+  public NodeRemovalResult getNodeRemovalResult() {
+    return nodeRemovalResult;
+  }
+
   private Map<RaftNode, List<Integer>> retrieveSlots(Node target) {
     Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>();
     for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) {
@@ -563,7 +580,7 @@ public class SlotPartitionTable implements PartitionTable {
     // preventing a thread from getting incomplete globalGroups
     synchronized (nodeRing) {
       if (globalGroups == null) {
-        calculateGlobalGroups();
+        globalGroups = calculateGlobalGroups(nodeRing);
       }
       return globalGroups;
     }
@@ -574,13 +591,15 @@ public class SlotPartitionTable implements PartitionTable {
     return getHeaderGroup(slotNodes[slot]).contains(node);
   }
 
-  private void calculateGlobalGroups() {
-    globalGroups = new ArrayList<>();
-    for (Node node : getAllNodes()) {
+  @Override
+  public List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing) {
+    List<PartitionGroup> result = new ArrayList<>();
+    for (Node node : nodeRing) {
       for (int i = 0; i < multiRaftFactor; i++) {
-        globalGroups.add(getHeaderGroup(new RaftNode(node, i)));
+        result.add(getHeaderGroup(new RaftNode(node, i)));
       }
     }
+    return result;
   }
 
   public synchronized long getLastLogIndex() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index f052294..0c8cf25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -21,12 +21,19 @@ package org.apache.iotdb.cluster.query;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogParser;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -41,6 +48,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CountPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
 import org.apache.iotdb.db.service.IoTDB;
@@ -108,7 +116,7 @@ public class ClusterPlanRouter {
   }
 
   public Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(PhysicalPlan plan)
-      throws UnsupportedPlanException, MetadataException {
+      throws UnsupportedPlanException, MetadataException, UnknownLogTypeException {
     if (plan instanceof InsertTabletPlan) {
       return splitAndRoutePlan((InsertTabletPlan) plan);
     } else if (plan instanceof CountPlan) {
@@ -121,6 +129,8 @@ public class ClusterPlanRouter {
       return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
     } else if (plan instanceof CreateMultiTimeSeriesPlan) {
       return splitAndRoutePlan((CreateMultiTimeSeriesPlan) plan);
+    } else if (plan instanceof LogPlan) {
+      return splitAndRoutePlan((LogPlan)plan);
     }
     //the if clause can be removed after the program is stable
     if (PartitionUtils.isLocalNonQueryPlan(plan)) {
@@ -134,6 +144,25 @@ public class ClusterPlanRouter {
     throw new UnsupportedPlanException(plan);
   }
 
+  private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan)
+      throws UnknownLogTypeException, UnsupportedPlanException {
+    Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
+    Log log = LogParser.getINSTANCE().parse(plan.getLog());
+    List<Node> oldRing = new ArrayList<>(partitionTable.getAllNodes());
+    if (log instanceof AddNodeLog) {
+      oldRing.remove(((AddNodeLog) log).getNewNode());
+    } else if (log instanceof RemoveNodeLog) {
+      oldRing.add(((RemoveNodeLog) log).getRemovedNode());
+      oldRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
+    } else {
+      throw new UnsupportedPlanException(plan);
+    }
+    for (PartitionGroup partitionGroup: partitionTable.calculateGlobalGroups(oldRing)) {
+      result.put(plan, partitionGroup);
+    }
+    return result;
+  }
+
   private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowPlan plan)
       throws MetadataException {
     PartitionGroup partitionGroup = partitionTable.partitionByPathTime(plan.getDeviceId(),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index b023c36..16c1da6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -33,6 +33,8 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.NoHeaderNodeException;
 import org.apache.iotdb.cluster.exception.NotInSameGroupException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.partition.NodeAdditionResult;
 import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -490,6 +492,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     return "DataServerThread-";
   }
 
+  public void preAddNodeForDataGroup(AddNodeLog log, DataGroupMember targetDataGroupMember) {
+    // Make sure the previous add/remove node log has applied
+    metaGroupMember.syncLeader();
+
+    // Check the validity of the partition table
+    if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+      return;
+    }
+
+    targetDataGroupMember.preAddNode(log.getNewNode());
+  }
+
   /**
    * Try adding the node into the group of each DataGroupMember, and if the DataGroupMember no
    * longer stays in that group, also remove and stop it. If the new group contains this node, also
@@ -499,6 +513,10 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    * @param result
    */
   public void addNode(Node node, NodeAdditionResult result) {
+    // If the node executed adding itself to the cluster, it's unnecessary to add new groups because they already exist.
+    if (node.equals(thisNode)) {
+      return;
+    }
     Iterator<Entry<RaftNode, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
     synchronized (headerGroupMap) {
       while (entryIterator.hasNext()) {
@@ -581,6 +599,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     logger.info("Data group members are ready");
   }
 
+  public void preRemoveNodeForDataGroup(RemoveNodeLog log, DataGroupMember targetDataGroupMember) {
+    // Make sure the previous add/remove node log has applied
+    metaGroupMember.syncLeader();
+
+    // Check the validity of the partition table
+    if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+      return;
+    }
+
+    targetDataGroupMember.preRemoveNode(log.getRemovedNode());
+  }
+
   /**
    * Try removing a node from the groups of each DataGroupMember. If the node is the header of some
    * group, set the member to read only so that it can still provide data for other nodes that has
@@ -625,6 +655,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   /**
    * When the node joins a cluster, it also creates a new data group and a corresponding member
+   * When the node joins a cluster, it also creates a new data group and a corresponding member
    * which has no data. This is to make that member pull data from other nodes.
    */
   public void pullSnapshots() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index e4a7304..d198039 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -248,8 +248,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void exile(AsyncMethodCallback<Void> resultHandler) {
-    asyncService.exile(resultHandler);
+  public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
+    asyncService.exile(removeNodeLog, resultHandler);
   }
 
   @Override
@@ -274,8 +274,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void exile() {
-    syncService.exile();
+  public void exile(ByteBuffer removeNodeLog) {
+    syncService.exile(removeNodeLog);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 4737520..077d61d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -261,33 +261,10 @@ public class DataGroupMember extends RaftMember {
     }
   }
 
-  /**
-   * Try to add a Node into the group to which the member belongs.
-   *
-   * @param node
-   * @return true if this node should leave the group because of the addition of the node, false
-   * otherwise
-   */
-  public synchronized boolean addNode(Node node, NodeAdditionResult result) {
-    // when a new node is added, start an election instantly to avoid the stale leader still
-    // taking the leadership, which guarantees the valid leader will not have the stale
-    // partition table
-    synchronized (term) {
-      term.incrementAndGet();
-      setLeader(ClusterConstant.EMPTY_NODE);
-      setVoteFor(thisNode);
-      updateHardState(term.get(), getVoteFor());
-      setLastHeartbeatReceivedTime(System.currentTimeMillis());
-      setCharacter(NodeCharacter.ELECTOR);
-    }
-
-    // mark slots that do not belong to this group any more
-    Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots()
-        .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()), Collections.emptySet());
-    for (Integer lostSlot : lostSlots) {
-      slotManager.setToSending(lostSlot);
+  public void preAddNode(Node node) {
+    if (allNodes.contains(node)) {
+      return;
     }
-
     synchronized (allNodes) {
       int insertIndex = -1;
       // find the position to insert the new node, the nodes are ordered by their identifiers
@@ -307,12 +284,41 @@ public class DataGroupMember extends RaftMember {
       if (insertIndex > 0) {
         allNodes.add(insertIndex, node);
         peerMap.putIfAbsent(node, new Peer(logManager.getLastLogIndex()));
-        // remove the last node because the group size is fixed to replication number
-        Node removedNode = allNodes.remove(allNodes.size() - 1);
-        peerMap.remove(removedNode);
         // if the local node is the last node and the insertion succeeds, this node should leave
         // the group
         logger.debug("{}: Node {} is inserted into the data group {}", name, node, allNodes);
+      }
+    }
+  }
+
+  /**
+   * Try to add a Node into the group to which the member belongs.
+   *
+   * @param node
+   * @return true if this node should leave the group because of the addition of the node, false
+   * otherwise
+   */
+  public boolean addNode(Node node, NodeAdditionResult result) {
+
+    // mark slots that do not belong to this group any more
+    Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots()
+        .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()), Collections.emptySet());
+    for (Integer lostSlot : lostSlots) {
+      slotManager.setToSending(lostSlot);
+    }
+
+    synchronized (allNodes) {
+      if (allNodes.contains(node) && allNodes.size() > config.getReplicationNum()) {
+        // remove the last node because the group size is fixed to replication number
+        Node removedNode = allNodes.remove(allNodes.size() - 1);
+        peerMap.remove(removedNode);
+        if (removedNode.equals(leader.get())) {
+          // if the leader is removed, also start an election immediately
+          synchronized (term) {
+            setCharacter(NodeCharacter.ELECTOR);
+            setLastHeartbeatReceivedTime(Long.MIN_VALUE);
+          }
+        }
         return removedNode.equals(thisNode);
       }
       return false;
@@ -737,6 +743,18 @@ public class DataGroupMember extends RaftMember {
     }
   }
 
+  public void preRemoveNode(Node removedNode) {
+    synchronized (allNodes) {
+      if (allNodes.contains(removedNode)) {
+        // update the group if the deleted node was in it
+        PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
+        Node newNodeToGroup = newGroup.get(newGroup.size() - 1);
+        allNodes.add(newNodeToGroup);
+        peerMap.putIfAbsent(newNodeToGroup, new Peer(logManager.getLastLogIndex()));
+      }
+    }
+  }
+
   /**
    * When a node is removed and IT IS NOT THE HEADER of the group, the member should take over some
    * slots from the removed group, and add a new node to the group the removed node was in the
@@ -747,8 +765,8 @@ public class DataGroupMember extends RaftMember {
     synchronized (allNodes) {
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
-        allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
-        initPeerMap();
+        allNodes.remove(removedNode);
+        peerMap.remove(removedNode);
         if (removedNode.equals(leader.get())) {
           // if the leader is removed, also start an election immediately
           synchronized (term) {
@@ -837,7 +855,7 @@ public class DataGroupMember extends RaftMember {
         continue;
       }
       int sentReplicaNum = slotManager.sentOneReplication(slot);
-      if (sentReplicaNum >= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) {
+      if (sentReplicaNum >= config.getReplicationNum()) {
         removableSlots.add(slot);
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7e73f61..22520e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
@@ -186,11 +187,6 @@ public class MetaGroupMember extends RaftMember {
    * members in this node
    */
   private static final int REPORT_INTERVAL_SEC = 10;
-  /**
-   * how many times is a data record replicated, also the number of nodes in a data group
-   */
-  private static final int REPLICATION_NUM =
-      ClusterDescriptor.getInstance().getConfig().getReplicationNum();
 
   /**
    * during snapshot, hardlinks of data files are created to for downloading. hardlinks will be
@@ -421,19 +417,23 @@ public class MetaGroupMember extends RaftMember {
    * Apply the addition of a new node. Register its identifier, add it to the node list and
    * partition table, serialize the partition table and update the DataGroupMembers.
    */
-  public void applyAddNode(Node newNode) {
+  public void applyAddNode(AddNodeLog addNodeLog) {
+
+    Node newNode = addNodeLog.getNewNode();
     synchronized (allNodes) {
-      if (!allNodes.contains(newNode)) {
+      if (partitionTable.deserialize(addNodeLog.getPartitionTable())) {
         logger.debug("Adding a new node {} into {}", newNode, allNodes);
-        registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
-        allNodes.add(newNode);
+
+        if (!allNodes.contains(newNode)) {
+          registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
+          allNodes.add(newNode);
+        }
 
         // update the partition table
-        NodeAdditionResult result = partitionTable.addNode(newNode);
-        ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex());
         savePartitionTable();
 
         // update local data members
+        NodeAdditionResult result = partitionTable.getNodeAdditionResult(newNode);
         getDataClusterServer().addNode(newNode, result);
       }
     }
@@ -856,7 +856,12 @@ public class MetaGroupMember extends RaftMember {
 
     // node adding is serialized to reduce potential concurrency problem
     synchronized (logManager) {
+      // update partition table
+      partitionTable.addNode(node);
+      ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
+
       AddNodeLog addNodeLog = new AddNodeLog();
+      addNodeLog.setPartitionTable(partitionTable.serialize());
       addNodeLog.setCurrLogTerm(getTerm().get());
       addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
@@ -868,11 +873,11 @@ public class MetaGroupMember extends RaftMember {
       while (true) {
         logger
             .info("Send the join request of {} to other nodes, retry time: {}", node, retryTime);
-        AppendLogResult result = sendLogToAllGroups(addNodeLog);
+        AppendLogResult result = sendLogToFollowers(addNodeLog);
         switch (result) {
           case OK:
-            logger.info("Join request of {} is accepted", node);
             commitLog(addNodeLog);
+            logger.info("Join request of {} is accepted", node);
 
             synchronized (partitionTable) {
               response.setPartitionTableBytes(partitionTable.serialize());
@@ -902,9 +907,9 @@ public class MetaGroupMember extends RaftMember {
     long localPartitionInterval = IoTDBDescriptor.getInstance().getConfig()
         .getPartitionInterval();
     int localHashSalt = ClusterConstant.HASH_SALT;
-    int localReplicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
-    String localClusterName = ClusterDescriptor.getInstance().getConfig().getClusterName();
-    int localMultiRaftFactor = ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
+    int localReplicationNum = config.getReplicationNum();
+    String localClusterName = config.getClusterName();
+    int localMultiRaftFactor = config.getMultiRaftFactor();
     boolean partitionIntervalEquals = true;
     boolean multiRaftFactorEquals = true;
     boolean hashSaltEquals = true;
@@ -1027,7 +1032,7 @@ public class MetaGroupMember extends RaftMember {
   }
 
   private CheckStatusResponse checkStatus(Node seedNode) {
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+    if (config.isUseAsyncServer()) {
       AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(seedNode);
       if (client == null) {
         return null;
@@ -1061,29 +1066,29 @@ public class MetaGroupMember extends RaftMember {
    * Send the log the all data groups and return a success only when each group's quorum has
    * accepted this log.
    */
-  private AppendLogResult sendLogToAllGroups(Log log) {
-    List<Node> nodeRing = partitionTable.getAllNodes();
-
-    AtomicLong newLeaderTerm = new AtomicLong(term.get());
-    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
-    AppendEntryRequest request = buildAppendEntryRequest(log, true);
-
-    // ask for votes from each node
-    int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm);
-
-    if (!leaderShipStale.get()) {
-      // if all quorums of all groups have received this log, it is considered succeeded.
-      for (int remaining : groupRemainings) {
-        if (remaining > 0) {
-          return AppendLogResult.TIME_OUT;
-        }
-      }
-    } else {
-      return AppendLogResult.LEADERSHIP_STALE;
-    }
-
-    return AppendLogResult.OK;
-  }
+//  private AppendLogResult sendLogToAllGroups(Log log) {
+//    List<Node> nodeRing = partitionTable.getAllNodes();
+//
+//    AtomicLong newLeaderTerm = new AtomicLong(term.get());
+//    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
+//    AppendEntryRequest request = buildAppendEntryRequest(log, true);
+//
+//    // ask for votes from each node
+//    int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm);
+//
+//    if (!leaderShipStale.get()) {
+//      // if all quorums of all groups have received this log, it is considered succeeded.
+//      for (int remaining : groupRemainings) {
+//        if (remaining > 0) {
+//          return AppendLogResult.TIME_OUT;
+//        }
+//      }
+//    } else {
+//      return AppendLogResult.LEADERSHIP_STALE;
+//    }
+//
+//    return AppendLogResult.OK;
+//  }
 
   /**
    * Send "request" to each node in "nodeRing" and when a node returns a success, decrease all
@@ -1094,54 +1099,54 @@ public class MetaGroupMember extends RaftMember {
   @SuppressWarnings({"java:S2445", "java:S2274"})
   // groupRemaining is shared with the handlers,
   // and we do not wait infinitely to enable timeouts
-  private int[] askGroupVotes(List<Node> nodeRing,
-      AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log,
-      AtomicLong newLeaderTerm) {
-    // each node will be the header of a group, we use the node to represent the group
-    int nodeSize = nodeRing.size();
-    // the decreasing counters of how many nodes in a group has received the log, each time a
-    // node receive the log, the counters of all groups it is in will decrease by 1
-    int[] groupRemainings = new int[nodeSize];
-    // a group is considered successfully received the log if such members receive the log
-    int groupQuorum = REPLICATION_NUM / 2 + 1;
-    Arrays.fill(groupRemainings, groupQuorum);
-
-    synchronized (groupRemainings) {
-      // ask a vote from every node
-      for (int i = 0; i < nodeSize; i++) {
-        Node node = nodeRing.get(i);
-        if (node.equals(thisNode)) {
-          // this node automatically gives an agreement, decrease counters of all groups the local
-          // node is in
-          for (int j = 0; j < REPLICATION_NUM; j++) {
-            int groupIndex = i - j;
-            if (groupIndex < 0) {
-              groupIndex += groupRemainings.length;
-            }
-            groupRemainings[groupIndex]--;
-          }
-        } else {
-          askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, newLeaderTerm,
-              request);
-        }
-      }
-
-      try {
-        groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS());
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        logger.error("Unexpected interruption when waiting for the group votes", e);
-      }
-    }
-    return groupRemainings;
-  }
+//  private int[] askGroupVotes(List<Node> nodeRing,
+//      AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log,
+//      AtomicLong newLeaderTerm) {
+//    // each node will be the header of a group, we use the node to represent the group
+//    int nodeSize = nodeRing.size();
+//    // the decreasing counters of how many nodes in a group has received the log, each time a
+//    // node receive the log, the counters of all groups it is in will decrease by 1
+//    int[] groupRemainings = new int[nodeSize];
+//    // a group is considered successfully received the log if such members receive the log
+//    int groupQuorum = REPLICATION_NUM / 2 + 1;
+//    Arrays.fill(groupRemainings, groupQuorum);
+//
+//    synchronized (groupRemainings) {
+//      // ask a vote from every node
+//      for (int i = 0; i < nodeSize; i++) {
+//        Node node = nodeRing.get(i);
+//        if (node.equals(thisNode)) {
+//          // this node automatically gives an agreement, decrease counters of all groups the local
+//          // node is in
+//          for (int j = 0; j < REPLICATION_NUM; j++) {
+//            int groupIndex = i - j;
+//            if (groupIndex < 0) {
+//              groupIndex += groupRemainings.length;
+//            }
+//            groupRemainings[groupIndex]--;
+//          }
+//        } else {
+//          askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, newLeaderTerm,
+//              request);
+//        }
+//      }
+//
+//      try {
+//        groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS());
+//      } catch (InterruptedException e) {
+//        Thread.currentThread().interrupt();
+//        logger.error("Unexpected interruption when waiting for the group votes", e);
+//      }
+//    }
+//    return groupRemainings;
+//  }
 
   private void askRemoteGroupVote(Node node, int[] groupRemainings, int nodeIndex,
       AtomicBoolean leaderShipStale, Log log,
       AtomicLong newLeaderTerm, AppendEntryRequest request) {
     AppendGroupEntryHandler handler = new AppendGroupEntryHandler(groupRemainings,
         nodeIndex, node, leaderShipStale, log, newLeaderTerm, this);
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+    if (config.isUseAsyncServer()) {
       AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(node);
       try {
         if (client != null) {
@@ -1469,7 +1474,7 @@ public class MetaGroupMember extends RaftMember {
     if (planGroupMap == null || planGroupMap.isEmpty()) {
       if ((plan instanceof InsertPlan || plan instanceof CreateTimeSeriesPlan
           || plan instanceof CreateMultiTimeSeriesPlan)
-          && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+          && config.isEnableAutoCreateSchema()) {
         logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan);
         try {
           ((CMManager) IoTDB.metaManager).createSchema(plan);
@@ -1499,10 +1504,10 @@ public class MetaGroupMember extends RaftMember {
       syncLeaderWithConsistencyCheck(true);
       try {
         planGroupMap = router.splitAndRoutePlan(plan);
-      } catch (MetadataException ex) {
+      } catch (MetadataException | UnknownLogTypeException ex) {
         // ignore
       }
-    } catch (MetadataException e) {
+    } catch (MetadataException | UnknownLogTypeException e) {
       logger.error("Cannot route plan {}", plan, e);
     }
     return planGroupMap;
@@ -1534,7 +1539,7 @@ public class MetaGroupMember extends RaftMember {
     }
     if (plan instanceof InsertPlan
         && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
-        && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+        && config.isEnableAutoCreateSchema()) {
       TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan));
       if (tmpStatus != null) {
         status = tmpStatus;
@@ -1773,7 +1778,7 @@ public class MetaGroupMember extends RaftMember {
       try {
         // only data plans are partitioned, so it must be processed by its data server instead of
         // meta server
-        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+        if (config.isUseAsyncServer()) {
           status = forwardDataPlanAsync(plan, node, group.getHeader());
         } else {
           status = forwardDataPlanSync(plan, node, group.getHeader());
@@ -1880,7 +1885,7 @@ public class MetaGroupMember extends RaftMember {
     }
 
     try {
-      if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+      if (config.isUseAsyncServer()) {
         getNodeStatusAsync(nodeStatus);
       } else {
         getNodeStatusSync(nodeStatus);
@@ -1974,7 +1979,7 @@ public class MetaGroupMember extends RaftMember {
     }
 
     // if we cannot have enough replica after the removal, reject it
-    if (allNodes.size() <= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) {
+    if (allNodes.size() <= config.getReplicationNum()) {
       return Response.RESPONSE_CLUSTER_TOO_SMALL;
     }
 
@@ -1996,7 +2001,12 @@ public class MetaGroupMember extends RaftMember {
 
     // node removal must be serialized to reduce potential concurrency problem
     synchronized (logManager) {
+      // update partition table
+      partitionTable.addNode(node);
+      ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
+
       RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+      removeNodeLog.setPartitionTable(partitionTable.serialize());
       removeNodeLog.setCurrLogTerm(getTerm().get());
       removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
@@ -2008,12 +2018,11 @@ public class MetaGroupMember extends RaftMember {
       while (true) {
         logger.info("Send the node removal request of {} to other nodes, retry time: {}", target,
             retryTime);
-        AppendLogResult result = sendLogToAllGroups(removeNodeLog);
-
+        AppendLogResult result = sendLogToFollowers(removeNodeLog);
         switch (result) {
           case OK:
-            logger.info("Removal request of {} is accepted", target);
             commitLog(removeNodeLog);
+            logger.info("Removal request of {} is accepted", target);
             return Response.RESPONSE_AGREE;
           case TIME_OUT:
             logger.info("Removal request of {} timed out", target);
@@ -2033,22 +2042,28 @@ public class MetaGroupMember extends RaftMember {
    * and catch-up service of data are kept alive for other nodes to pull data. If the removed node
    * is a leader, send an exile to the removed node so that it can know it is removed.
    *
-   * @param oldNode the node to be removed
    */
-  public void applyRemoveNode(Node oldNode) {
+  public void applyRemoveNode(RemoveNodeLog removeNodeLog) {
+
+    Node oldNode = removeNodeLog.getRemovedNode();
     synchronized (allNodes) {
-      if (allNodes.contains(oldNode)) {
+      if (partitionTable.deserialize(removeNodeLog.getPartitionTable())) {
         logger.debug("Removing a node {} from {}", oldNode, allNodes);
-        allNodes.remove(oldNode);
-        idNodeMap.remove(oldNode.nodeIdentifier);
 
-        // update the partition table
-        NodeRemovalResult result = partitionTable.removeNode(oldNode);
-        ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex());
+        if (allNodes.contains(oldNode)) {
+          allNodes.remove(oldNode);
+          idNodeMap.remove(oldNode.nodeIdentifier);
+
+        }
+
+        // save the updated partition table
+        savePartitionTable();
 
         // update DataGroupMembers, as the node is removed, the members of some groups are
         // changed and there will also be one less group
+        NodeRemovalResult result = partitionTable.getNodeRemovalResult();
         getDataClusterServer().removeNode(oldNode, result);
+
         // the leader is removed, start the next election ASAP
         if (oldNode.equals(leader.get())) {
           setCharacter(NodeCharacter.ELECTOR);
@@ -2065,21 +2080,19 @@ public class MetaGroupMember extends RaftMember {
         } else if (thisNode.equals(leader.get())) {
           // as the old node is removed, it cannot know this by heartbeat or log, so it should be
           // directly kicked out of the cluster
-          exileNode(oldNode);
+          exileNode(removeNodeLog);
         }
-
-        // save the updated partition table
-        savePartitionTable();
       }
     }
   }
 
-  private void exileNode(Node node) {
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+  private void exileNode(RemoveNodeLog removeNodeLog) {
+    Node node = removeNodeLog.getRemovedNode();
+    if (config.isUseAsyncServer()) {
       AsyncMetaClient asyncMetaClient = (AsyncMetaClient) getAsyncClient(node);
       try {
         if (asyncMetaClient != null) {
-          asyncMetaClient.exile(new GenericHandler<>(node, null));
+          asyncMetaClient.exile(removeNodeLog.serialize(), new GenericHandler<>(node, null));
         }
       } catch (TException e) {
         logger.warn("Cannot inform {} its removal", node, e);
@@ -2090,7 +2103,7 @@ public class MetaGroupMember extends RaftMember {
         return;
       }
       try {
-        client.exile();
+        client.exile(removeNodeLog.serialize());
       } catch (TException e) {
         client.getInputProtocol().getTransport().close();
         logger.warn("Cannot inform {} its removal", node, e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0526285..57f22bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -93,6 +93,7 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -899,13 +900,23 @@ public abstract class RaftMember {
       return StatusUtils.NODE_READ_ONLY;
     }
     long startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
-    PhysicalPlanLog log = new PhysicalPlanLog();
+    Log log;
+    if (plan instanceof LogPlan) {
+      try {
+        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+      } catch (UnknownLogTypeException e) {
+        logger.error("Can not parse LogPlan {}", plan, e);
+        return StatusUtils.PARSE_LOG_ERROR;
+      }
+    } else {
+      log = new PhysicalPlanLog();
+      ((PhysicalPlanLog)log).setPlan(plan);
+    }
     // assign term and index to the new log and append it
     synchronized (logManager) {
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
-      log.setPlan(plan);
       plan.setIndex(log.getCurrLogIndex());
       logManager.append(log);
     }
@@ -1404,7 +1415,7 @@ public abstract class RaftMember {
   }
 
   private TSStatus handleLogExecutionException(
-      PhysicalPlanLog log, LogExecutionException e) {
+      Log log, LogExecutionException e) {
     Throwable cause = IOUtils.getRootCause(e);
     if (cause instanceof BatchProcessException) {
       return RpcUtils
@@ -1536,7 +1547,7 @@ public abstract class RaftMember {
         logger.debug("Has lose leadership, so need not to send log");
         return false;
       }
-      AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);
+      AppendLogResult result = sendLogToFollowers(log);
       Timer.Statistic.RAFT_SENDER_SEND_LOG_TO_FOLLOWERS.calOperationCostTimeFromStart(startTime);
       switch (result) {
         case OK:
@@ -1568,10 +1579,11 @@ public abstract class RaftMember {
    *                       0, half of the cluster size will be used.
    * @return an AppendLogResult
    */
-  private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) {
+  protected AppendLogResult sendLogToFollowers(Log log) {
+    int requiredQuorum = allNodes.size() / 2;
     if (requiredQuorum <= 0) {
       // use half of the members' size as the quorum
-      return sendLogToFollowers(log, new AtomicInteger(allNodes.size() / 2));
+      return sendLogToFollowers(log, new AtomicInteger(requiredQuorum));
     } else {
       // make sure quorum does not exceed the number of members - 1
       return sendLogToFollowers(log, new AtomicInteger(Math.min(requiredQuorum,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 4ca6eb0..3b2df98 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -19,10 +19,12 @@
 
 package org.apache.iotdb.cluster.server.service;
 
+import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.exception.AddSelfException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
@@ -195,8 +197,10 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
    * @param resultHandler
    */
   @Override
-  public void exile(AsyncMethodCallback<Void> resultHandler) {
-    metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
+  public void exile(ByteBuffer removeNodeLogBuffer, AsyncMethodCallback<Void> resultHandler) {
+    RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+    removeNodeLog.deserialize(removeNodeLogBuffer);
+    metaGroupMember.applyRemoveNode(removeNodeLog);
     resultHandler.onComplete(null);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 3b5f445..48c0e58 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.cluster.server.service;
 
+import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
 import org.apache.iotdb.cluster.exception.AddSelfException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
@@ -188,7 +190,9 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
    * must tell it directly.
    */
   @Override
-  public void exile() {
-    metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
+  public void exile(ByteBuffer removeNodeLogBuffer) {
+    RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+    removeNodeLog.deserialize(removeNodeLogBuffer);
+    metaGroupMember.applyRemoveNode(removeNodeLog);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
index 4d1205f..5a3168a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
@@ -45,6 +45,7 @@ public class StatusUtils {
   public static final TSStatus CONSISTENCY_FAILURE = getStatus(TSStatusCode.CONSISTENCY_FAILURE);
   public static final TSStatus TIMESERIES_NOT_EXIST_ERROR = getStatus(TSStatusCode.TIMESERIES_NOT_EXIST);
   public static final TSStatus NO_CONNECTION = getStatus(TSStatusCode.NO_CONNECTION);
+  public static final TSStatus PARSE_LOG_ERROR = getStatus(TSStatusCode.PARSE_LOG_ERROR);
 
 
   private static TSStatus getStatus(TSStatusCode statusCode) {
@@ -197,6 +198,9 @@ public class StatusUtils {
       case NO_CONNECTION:
         status.setMessage("Node cannot be reached.");
         break;
+      case PARSE_LOG_ERROR:
+        status.setMessage("Parse log error.");
+        break;
       default:
         status.setMessage("");
         break;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
index 771112b..1f1f3ba 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.common;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -65,6 +66,8 @@ public class TestUtils {
 
   public static long TEST_TIME_OUT_MS = 200;
 
+  public static ByteBuffer seralizePartitionTable = new SlotPartitionTable(getNode(0)).serialize();
+
   private TestUtils() {
     // util class
   }
@@ -83,6 +86,7 @@ public class TestUtils {
     for (int i = 0; i < logNum; i++) {
       AddNodeLog log = new AddNodeLog();
       log.setNewNode(getNode(i));
+      log.setPartitionTable(seralizePartitionTable);
       log.setCurrLogIndex(i);
       log.setCurrLogTerm(i);
       logList.add(log);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 66a9615..76efe5f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -42,6 +42,7 @@ public class LogParserTest {
   public void testAddNodeLog() throws UnknownLogTypeException {
     AddNodeLog log = new AddNodeLog();
     log.setNewNode(TestUtils.getNode(5));
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     log.setCurrLogIndex(8);
     log.setCurrLogTerm(8);
 
@@ -78,6 +79,7 @@ public class LogParserTest {
   @Test
   public void testRemoveNodeLog() throws UnknownLogTypeException {
     RemoveNodeLog log = new RemoveNodeLog();
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     log.setRemovedNode(TestUtils.getNode(0));
     log.setCurrLogIndex(8);
     log.setCurrLogTerm(8);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
index 95c8fe4..fd1a87b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
@@ -24,11 +24,13 @@ import static junit.framework.TestCase.assertFalse;
 import static junit.framework.TestCase.assertTrue;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.iotdb.cluster.common.IoTDBTest;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
+import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
@@ -54,13 +56,13 @@ public class MetaLogApplierTest extends IoTDBTest {
 
   private TestMetaGroupMember testMetaGroupMember = new TestMetaGroupMember() {
     @Override
-    public void applyAddNode(Node newNode) {
-      nodes.add(newNode);
+    public void applyAddNode(AddNodeLog addNodeLog) {
+      nodes.add(addNodeLog.getNewNode());
     }
 
     @Override
-    public void applyRemoveNode(Node oldNode) {
-      nodes.remove(oldNode);
+    public void applyRemoveNode(RemoveNodeLog removeNodeLog) {
+      nodes.remove(removeNodeLog.getRemovedNode());
     }
   };
 
@@ -82,6 +84,7 @@ public class MetaLogApplierTest extends IoTDBTest {
     Node node = new Node("localhost", 1111, 0, 2222, 55560);
     AddNodeLog log = new AddNodeLog();
     log.setNewNode(node);
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     applier.apply(log);
 
     assertTrue(nodes.contains(node));
@@ -94,6 +97,7 @@ public class MetaLogApplierTest extends IoTDBTest {
 
     Node node = testMetaGroupMember.getThisNode();
     RemoveNodeLog log = new RemoveNodeLog();
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     log.setRemovedNode(node);
     applier.apply(log);
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 09b42e4..d6fec31 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -87,6 +87,7 @@ public class SerializeLogTest {
   @Test
   public void testAddNodeLog() throws UnknownLogTypeException {
     AddNodeLog log = new AddNodeLog();
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     log.setCurrLogIndex(2);
     log.setCurrLogTerm(2);
     log.setNewNode(new Node("apache.iotdb.com", 1234, 1, 4321, 55560));
@@ -110,6 +111,7 @@ public class SerializeLogTest {
   @Test
   public void testRemoveNodeLog() throws UnknownLogTypeException {
     RemoveNodeLog log = new RemoveNodeLog();
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     log.setCurrLogIndex(2);
     log.setCurrLogTerm(2);
     log.setRemovedNode(TestUtils.getNode(0));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
index b00e755..30315dc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
@@ -499,7 +499,8 @@ public class SlotPartitionTableTest {
   @Test
   public void testRemoveNode() {
     List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0), raftId);
-    NodeRemovalResult nodeRemovalResult = localTable.removeNode(getNode(0));
+    localTable.removeNode(getNode(0));
+    NodeRemovalResult nodeRemovalResult = localTable.getNodeRemovalResult();
     assertFalse(localTable.getAllNodes().contains(getNode(0)));
     PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup(0);
     for (int i = 0; i < 5; i++) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index f6bb254..2275a63 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -63,12 +63,22 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
-    public NodeAdditionResult addNode(Node node) {
+    public void addNode(Node node) {
+      return;
+    }
+
+    @Override
+    public NodeAdditionResult getNodeAdditionResult(Node node) {
       return null;
     }
 
     @Override
-    public NodeRemovalResult removeNode(Node node) {
+    public void removeNode(Node node) {
+      return;
+    }
+
+    @Override
+    public NodeRemovalResult getNodeRemovalResult() {
       return null;
     }
 
@@ -93,8 +103,8 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
-    public void deserialize(ByteBuffer buffer) {
-
+    public boolean deserialize(ByteBuffer buffer) {
+      return true;
     }
 
     @Override
@@ -108,8 +118,13 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
+    public List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing) {
+      return null;
+    }
+
+    @Override
     public boolean judgeHoldSlot(Node node, int slot) {
-      return true;
+      return false;
     }
   };
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index c920f06..ca4fd92 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -898,8 +898,8 @@ public class DataGroupMemberTest extends MemberTest {
   public void testRemoveLeader() {
     System.out.println("Start testRemoveLeader()");
     Node nodeToRemove = TestUtils.getNode(10);
-    SlotNodeRemovalResult nodeRemovalResult = (SlotNodeRemovalResult) testMetaMember.getPartitionTable()
-        .removeNode(nodeToRemove);
+    testMetaMember.getPartitionTable().removeNode(nodeToRemove);
+    SlotNodeRemovalResult nodeRemovalResult = (SlotNodeRemovalResult) testMetaMember.getPartitionTable().getNodeRemovalResult();
     dataGroupMember.setLeader(nodeToRemove);
     dataGroupMember.start();
 
@@ -926,8 +926,9 @@ public class DataGroupMemberTest extends MemberTest {
   public void testRemoveNonLeader() {
     System.out.println("Start testRemoveNonLeader()");
     Node nodeToRemove = TestUtils.getNode(10);
-    NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable()
+    testMetaMember.getPartitionTable()
         .removeNode(nodeToRemove);
+    NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable().getNodeRemovalResult();
     dataGroupMember.setLeader(TestUtils.getNode(20));
     dataGroupMember.start();
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 1badcd2..a1e563e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -60,7 +60,9 @@ import org.apache.iotdb.cluster.exception.EmptyIntervalException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
 import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -423,7 +425,7 @@ public class MetaGroupMemberTest extends MemberTest {
             }
 
             @Override
-            public void exile(AsyncMethodCallback<Void> resultHandler) {
+            public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
               System.out.printf("%s was exiled%n", node);
               exiledNode = node;
             }
@@ -431,7 +433,7 @@ public class MetaGroupMemberTest extends MemberTest {
             @Override
             public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
               new Thread(() -> {
-                testMetaMember.applyRemoveNode(node);
+                testMetaMember.applyRemoveNode(new RemoveNodeLog(TestUtils.seralizePartitionTable, node));
                 resultHandler.onComplete(Response.RESPONSE_AGREE);
               }).start();
             }
@@ -554,7 +556,7 @@ public class MetaGroupMemberTest extends MemberTest {
     System.out.println("Start testAddNode()");
     Node newNode = TestUtils.getNode(10);
     testMetaMember.onElectionWins();
-    testMetaMember.applyAddNode(newNode);
+    testMetaMember.applyAddNode(new AddNodeLog(TestUtils.seralizePartitionTable, newNode));
     assertTrue(partitionTable.getAllNodes().contains(newNode));
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 955deb4..a2083d1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2178,7 +2178,7 @@ public class StorageGroupProcessor {
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.
    */
-  private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
+  private boolean loadTsFileByType(LoadTsFileType type, File tsFileToLoad,
       TsFileResource tsFileResource, long filePartitionId)
       throws LoadFileException, DiskSpaceInsufficientException {
     File targetFile;
@@ -2195,7 +2195,7 @@ public class StorageGroupProcessor {
         }
         tsFileManagement.add(tsFileResource, false);
         logger.info("Load tsfile in unsequence list, move file from {} to {}",
-            syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
+            tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath());
         break;
       case LOAD_SEQUENCE:
         targetFile =
@@ -2209,7 +2209,7 @@ public class StorageGroupProcessor {
         }
         tsFileManagement.add(tsFileResource, true);
         logger.info("Load tsfile in sequence list, move file from {} to {}",
-            syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
+            tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath());
         break;
       default:
         throw new LoadFileException(
@@ -2221,29 +2221,47 @@ public class StorageGroupProcessor {
       targetFile.getParentFile().mkdirs();
     }
     try {
-      FileUtils.moveFile(syncedTsFile, targetFile);
+      FileUtils.moveFile(tsFileToLoad, targetFile);
     } catch (IOException e) {
       logger.error("File renaming failed when loading tsfile. Origin: {}, Target: {}",
-          syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e);
+          tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e);
       throw new LoadFileException(String.format(
           "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
-          syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
+          tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
     }
 
-    File syncedResourceFile = fsFactory.getFile(
-        syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    File resourceFileToLoad = fsFactory.getFile(
+        tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     File targetResourceFile = fsFactory.getFile(
         targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     try {
-      FileUtils.moveFile(syncedResourceFile, targetResourceFile);
+      FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
     } catch (IOException e) {
       logger.error("File renaming failed when loading .resource file. Origin: {}, Target: {}",
-          syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e);
+          resourceFileToLoad.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e);
       throw new LoadFileException(String.format(
           "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
-          syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
+          resourceFileToLoad.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
           e.getMessage()));
     }
+
+    File modFileToLoad = fsFactory.getFile(
+        tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    if (modFileToLoad.exists()) {
+      File targetModFile = fsFactory.getFile(
+          targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+      try {
+        FileUtils.moveFile(modFileToLoad, targetModFile);
+      } catch (IOException e) {
+        logger.error("File renaming failed when loading .mod file. Origin: {}, Target: {}",
+            resourceFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(), e);
+        throw new LoadFileException(String.format(
+            "File renaming failed when loading .mod file. Origin: %s, Target: %s, because %s",
+            resourceFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(),
+            e.getMessage()));
+      }
+    }
+
     return true;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index d40f1eb..0d64b30 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -348,6 +349,10 @@ public abstract class PhysicalPlan {
           plan = new StorageGroupMNodePlan();
           plan.deserialize(buffer);
           break;
+        case CLUSTER_LOG:
+          plan = new LogPlan();
+          plan.deserialize(buffer);
+          break;
         default:
           throw new IOException("unrecognized log type " + type);
       }
@@ -361,7 +366,7 @@ public abstract class PhysicalPlan {
     REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER,
     DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, CREATE_MULTI_TIMESERIES,
     ALTER_TIMESERIES, FLUSH, CREATE_INDEX, DROP_INDEX,
-    CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE
+    CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, CLUSTER_LOG
   }
 
   public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
new file mode 100644
index 0000000..725c803
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+/**
+ * It's used by cluster to wrap log to plan
+ */
+public class LogPlan extends PhysicalPlan {
+
+  private ByteBuffer log;
+
+  public LogPlan() {
+    super(false);
+  }
+
+  public LogPlan(ByteBuffer log) {
+    super(false);
+    this.log = log;
+  }
+
+  public ByteBuffer getLog() {
+    return log;
+  }
+
+  public void setLog(ByteBuffer log) {
+    this.log = log;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeByte((byte) PhysicalPlanType.CLUSTER_LOG.ordinal());
+    stream.writeInt(log.array().length);
+    stream.write(log.array());
+  }
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    int len = buffer.getInt();
+    log = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+  }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b839c53..c41834b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -86,7 +86,8 @@ public enum TSStatusCode {
   NODE_READ_ONLY(704),
   CONSISTENCY_FAILURE(705),
   NO_CONNECTION(706),
-  NEED_REDIRECTION(707)
+  NEED_REDIRECTION(707),
+  PARSE_LOG_ERROR(708)
 
   ;
 
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 2a24106..124a154 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -335,7 +335,6 @@ service RaftService {
 }
 
 
-
 service TSDataService extends RaftService {
 
   /**
@@ -468,7 +467,7 @@ service TSMetaService extends RaftService {
   * the commit command by heartbeat since it has been removed, so the leader should tell it
   * directly that it is no longer in the cluster.
   **/
-  void exile()
+  void exile(binary removeNodeLog)
 
   TNodeStatus queryNodeStatus()