You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/01/14 08:09:40 UTC

[iotdb] branch cluster_scalability created (now 327eb7e)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 327eb7e  1. fix ut tests 2. The two-stage relative order problem of double logs is solved.

This branch includes the following new commits:

     new 109d432  fix some issues of multi-raft
     new 325e4f5  fix bugs of wrong previous groups, pull snapshot from self and wrong remove local data
     new 452cba3  Reimplement the function of adding and removing nodes
     new 327eb7e  1. fix ut tests 2. The two-stage relative order problem of double logs is solved.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 03/04: Reimplement the function of adding and removing nodes

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 452cba315bc66688684181e47fbff0cfcd94bf3a
Author: lta <li...@163.com>
AuthorDate: Wed Jan 13 15:49:28 2021 +0800

    Reimplement the function of adding and removing nodes
---
 .../iotdb/cluster/client/DataClientProvider.java   |   3 -
 ...ception.java => ChangeMembershipException.java} |  14 +-
 .../exception/CheckConsistencyException.java       |   4 +-
 .../java/org/apache/iotdb/cluster/log/Log.java     |   1 +
 .../iotdb/cluster/log/applier/DataLogApplier.java  |   8 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |  32 ++-
 .../iotdb/cluster/log/logtypes/AddNodeLog.java     |  40 +++-
 .../iotdb/cluster/log/logtypes/RemoveNodeLog.java  | 130 +++++++-----
 .../manage/FilePartitionedSnapshotLogManager.java  |  12 ++
 .../log/manage/MetaSingleSnapshotLogManager.java   |  19 ++
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   6 +-
 .../cluster/log/snapshot/PullSnapshotTask.java     |   5 +-
 .../iotdb/cluster/partition/NodeRemovalResult.java |  32 +++
 .../iotdb/cluster/partition/PartitionGroup.java    |  21 ++
 .../iotdb/cluster/partition/PartitionTable.java    |  17 +-
 .../partition/slot/SlotNodeRemovalResult.java      |  38 +++-
 .../cluster/partition/slot/SlotPartitionTable.java |  93 +++++----
 .../iotdb/cluster/query/ClusterPlanRouter.java     |  31 ++-
 .../iotdb/cluster/server/DataClusterServer.java    |  31 +++
 .../iotdb/cluster/server/MetaClusterServer.java    |   8 +-
 .../cluster/server/member/DataGroupMember.java     |  82 +++++---
 .../cluster/server/member/MetaGroupMember.java     | 229 +++++++++++----------
 .../iotdb/cluster/server/member/RaftMember.java    |  24 ++-
 .../cluster/server/service/MetaAsyncService.java   |   8 +-
 .../cluster/server/service/MetaSyncService.java    |   8 +-
 .../apache/iotdb/cluster/utils/StatusUtils.java    |   4 +
 .../org/apache/iotdb/cluster/common/TestUtils.java |   4 +
 .../apache/iotdb/cluster/log/LogParserTest.java    |   2 +
 .../cluster/log/applier/MetaLogApplierTest.java    |  12 +-
 .../cluster/log/logtypes/SerializeLogTest.java     |   2 +
 .../cluster/partition/SlotPartitionTableTest.java  |   3 +-
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |  25 ++-
 .../cluster/server/member/DataGroupMemberTest.java |   7 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   8 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  40 +++-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   7 +-
 .../apache/iotdb/db/qp/physical/sys/LogPlan.java   |  71 +++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +-
 thrift/src/main/thrift/cluster.thrift              |   3 +-
 39 files changed, 781 insertions(+), 306 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index 9a1c4df..4c882e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -29,12 +29,9 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class DataClientProvider {
 
-  private static final Logger logger = LoggerFactory.getLogger(DataClientProvider.class);
   /**
    * dataClientPool provides reusable thrift clients to connect to the DataGroupMembers of other
    * nodes
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
similarity index 64%
copy from cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
index 12ac407..f50e668 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
@@ -16,19 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.cluster.exception;
 
 /**
- * Raised when check consistency failed, now only happens if there is a strong-consistency and
- * syncLeader failed
+ * Raised when add/remove membership log can not be sent to all data groups
  */
-public class CheckConsistencyException extends Exception {
+public class ChangeMembershipException extends Exception {
 
-  public CheckConsistencyException(String errMag) {
-    super(String.format("check consistency failed, error message=%s ", errMag));
+  public ChangeMembershipException(String errMsg) {
+    super(String.format("change membership fail, error message=%s ", errMsg));
   }
-
-  public static final CheckConsistencyException CHECK_STRONG_CONSISTENCY_EXCEPTION =
-      new CheckConsistencyException(
-      "strong consistency, sync with leader failed");
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
index 12ac407..7b0609a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
@@ -24,8 +24,8 @@ package org.apache.iotdb.cluster.exception;
  */
 public class CheckConsistencyException extends Exception {
 
-  public CheckConsistencyException(String errMag) {
-    super(String.format("check consistency failed, error message=%s ", errMag));
+  public CheckConsistencyException(String errMsg) {
+    super(String.format("check consistency failed, error message=%s ", errMsg));
   }
 
   public static final CheckConsistencyException CHECK_STRONG_CONSISTENCY_EXCEPTION =
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 0c236b2..2903fe9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -127,4 +127,5 @@ public abstract class Log implements Comparable<Log> {
   public void setEnqueueTime(long enqueueTime) {
     this.enqueueTime = enqueueTime;
   }
+
 }
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 8ce84b5..ceed787 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.cluster.log.applier;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -60,7 +62,11 @@ public class DataLogApplier extends BaseApplier {
     logger.debug("DataMember [{}] start applying Log {}", dataGroupMember.getName(), log);
 
     try {
-      if (log instanceof PhysicalPlanLog) {
+      if (log instanceof AddNodeLog) {
+        metaGroupMember.getDataClusterServer().preAddNodeForDataGroup((AddNodeLog) log, dataGroupMember);
+      } else if (log instanceof RemoveNodeLog) {
+        metaGroupMember.getDataClusterServer().preRemoveNodeForDataGroup((RemoveNodeLog) log, dataGroupMember);
+      } else if (log instanceof PhysicalPlanLog) {
         PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
         PhysicalPlan plan = physicalPlanLog.getPlan();
         if (plan instanceof InsertPlan) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index d7dd5f9..94437ae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -19,15 +19,18 @@
 
 package org.apache.iotdb.cluster.log.applier;
 
+import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,25 +49,40 @@ public class MetaLogApplier extends BaseApplier {
 
   @Override
   public void apply(Log log) {
+    apply(log, false);
+  }
+
+  public void apply(Log log, boolean isLeader) {
     try {
       logger.debug("MetaMember [{}] starts applying Log {}", metaGroupMember.getName(), log);
       if (log instanceof AddNodeLog) {
-        AddNodeLog addNodeLog = (AddNodeLog) log;
-        Node newNode = addNodeLog.getNewNode();
-        member.applyAddNode(newNode);
+        if (isLeader) {
+          sendLogToAllDataGroups(log);
+        }
+        member.applyAddNode((AddNodeLog) log);
       } else if (log instanceof PhysicalPlanLog) {
         applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
       } else if (log instanceof RemoveNodeLog) {
-        RemoveNodeLog removeNodeLog = ((RemoveNodeLog) log);
-        member.applyRemoveNode(removeNodeLog.getRemovedNode());
+        if (isLeader) {
+          sendLogToAllDataGroups(log);
+        }
+        member.applyRemoveNode(((RemoveNodeLog) log));
       } else {
         logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
       }
-    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) {
+    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException e) {
       logger.debug("Exception occurred when executing {}", log, e);
       log.setException(e);
     } finally {
       log.setApplied(true);
     }
   }
+
+  private void sendLogToAllDataGroups(Log log) throws ChangeMembershipException {
+    LogPlan plan = new LogPlan(log.serialize());
+    TSStatus status = member.executeNonQueryPlan(plan);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new ChangeMembershipException(String.format("apply %s failed", log));
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
index f54725d..824c3f2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
@@ -33,16 +33,34 @@ import org.apache.iotdb.db.utils.SerializeUtils;
  */
 public class AddNodeLog extends Log {
 
+  private ByteBuffer partitionTable;
+
   private Node newNode;
 
-  public Node getNewNode() {
-    return newNode;
+  public AddNodeLog(ByteBuffer partitionTable, Node newNode) {
+    this.partitionTable = partitionTable;
+    this.newNode = newNode;
+  }
+
+  public AddNodeLog() {
+  }
+
+  public void setPartitionTable(ByteBuffer partitionTable) {
+    this.partitionTable = partitionTable;
   }
 
   public void setNewNode(Node newNode) {
     this.newNode = newNode;
   }
 
+  public Node getNewNode() {
+    return newNode;
+  }
+
+  public ByteBuffer getPartitionTable() {
+    return partitionTable;
+  }
+
   @Override
   public ByteBuffer serialize() {
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -52,6 +70,9 @@ public class AddNodeLog extends Log {
       dataOutputStream.writeLong(getCurrLogTerm());
 
       SerializeUtils.serialize(newNode, dataOutputStream);
+
+      dataOutputStream.write(partitionTable.array().length);
+      dataOutputStream.write(partitionTable.array());
     } catch (IOException e) {
       // ignored
     }
@@ -69,6 +90,9 @@ public class AddNodeLog extends Log {
 
     newNode = new Node();
     SerializeUtils.deserialize(newNode, buffer);
+
+    int len = buffer.getInt();
+    partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
   }
 
   @Override
@@ -83,11 +107,19 @@ public class AddNodeLog extends Log {
       return false;
     }
     AddNodeLog that = (AddNodeLog) o;
-    return Objects.equals(newNode, that.newNode);
+    return Objects.equals(newNode, that.newNode) && Objects
+        .equals(partitionTable, that.partitionTable);
+  }
+
+  @Override
+  public String toString() {
+    return "AddNodeLog{" +
+        "newNode=" + newNode +
+        '}';
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), newNode);
+    return Objects.hash(super.hashCode(), newNode, partitionTable);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 02d89d0..800b77d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -19,69 +19,101 @@
 
 package org.apache.iotdb.cluster.log.logtypes;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.nio.ByteBuffer;
 import org.apache.iotdb.db.utils.SerializeUtils;
 
 public class RemoveNodeLog extends Log {
 
-    private Node removedNode;
-
-    @Override
-    public ByteBuffer serialize() {
-        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-        try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
-            dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
-            dataOutputStream.writeLong(getCurrLogIndex());
-            dataOutputStream.writeLong(getCurrLogTerm());
-
-            SerializeUtils.serialize(removedNode, dataOutputStream);
-        } catch (IOException e) {
-            // ignored
-        }
-        return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
-    }
+  private ByteBuffer partitionTable;
 
-    @Override
-    public void deserialize(ByteBuffer buffer) {
-        setCurrLogIndex(buffer.getLong());
-        setCurrLogTerm(buffer.getLong());
+  private Node removedNode;
 
-        removedNode = new Node();
-        SerializeUtils.deserialize(removedNode, buffer);
-    }
+  public RemoveNodeLog(ByteBuffer partitionTable,
+      Node removedNode) {
+    this.partitionTable = partitionTable;
+    this.removedNode = removedNode;
+  }
 
-    public Node getRemovedNode() {
-        return removedNode;
-    }
+  public RemoveNodeLog() {
+  }
 
-    public void setRemovedNode(Node removedNode) {
-        this.removedNode = removedNode;
-    }
+  public ByteBuffer getPartitionTable() {
+    return partitionTable;
+  }
+
+  public void setPartitionTable(ByteBuffer partitionTable) {
+    this.partitionTable = partitionTable;
+  }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        if (!super.equals(o)) {
-            return false;
-        }
-        RemoveNodeLog that = (RemoveNodeLog) o;
-        return Objects.equals(removedNode, that.removedNode);
+  @Override
+  public ByteBuffer serialize() {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+      dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
+      dataOutputStream.writeLong(getCurrLogIndex());
+      dataOutputStream.writeLong(getCurrLogTerm());
+
+      SerializeUtils.serialize(removedNode, dataOutputStream);
+
+      dataOutputStream.write(partitionTable.array().length);
+      dataOutputStream.write(partitionTable.array());
+    } catch (IOException e) {
+      // ignored
     }
+    return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) {
+    setCurrLogIndex(buffer.getLong());
+    setCurrLogTerm(buffer.getLong());
+
+    removedNode = new Node();
+    SerializeUtils.deserialize(removedNode, buffer);
+
+    int len = buffer.getInt();
+    partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+  }
+
+  public Node getRemovedNode() {
+    return removedNode;
+  }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(super.hashCode(), removedNode);
+  public void setRemovedNode(Node removedNode) {
+    this.removedNode = removedNode;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
     }
+    RemoveNodeLog that = (RemoveNodeLog) o;
+    return Objects.equals(removedNode, that.removedNode) && Objects
+        .equals(partitionTable, that.partitionTable);
+  }
+
+  @Override
+  public String toString() {
+    return "RemoveNodeLog{" +
+        "removedNode=" + removedNode +
+        '}';
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), removedNode, partitionTable);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 79f3cd1..682da96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -28,7 +28,10 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.iotdb.cluster.exception.EntryCompactedException;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot.Factory;
 import org.apache.iotdb.cluster.partition.PartitionTable;
@@ -202,4 +205,13 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
     }
     return true;
   }
+
+  @Override
+  public long append(Log entry) {
+    long lastLogIndex = super.append(entry);
+    if (lastLogIndex != -1 && (entry instanceof AddNodeLog || entry instanceof RemoveNodeLog)) {
+      logApplier.apply(entry);
+    }
+    return lastLogIndex;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index ff650e3..1e86e11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@ -20,11 +20,15 @@
 package org.apache.iotdb.cluster.log.manage;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.Snapshot;
+import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
 import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
 import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
+import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
@@ -81,4 +85,19 @@ public class MetaSingleSnapshotLogManager extends RaftLogManager {
     snapshot.setLastLogTerm(term);
     return snapshot;
   }
+
+  @Override
+  void applyEntries(List<Log> entries) {
+    for (Log entry : entries) {
+      if (blockAppliedCommitIndex > 0 && entry.getCurrLogIndex() > blockAppliedCommitIndex) {
+        blockedUnappliedLogList.add(entry);
+        continue;
+      }
+      try {
+        ((MetaLogApplier)logApplier).apply(entry, metaGroupMember.getCharacter() == NodeCharacter.LEADER);
+      } catch (Exception e) {
+        entry.setException(e);
+      }
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..bd65c26 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -82,10 +82,10 @@ public abstract class RaftLogManager {
    * The committed log whose index is larger than blockAppliedCommitIndex will be blocked. if
    * blockAppliedCommitIndex < 0(default is -1), will not block any operation.
    */
-  private volatile long blockAppliedCommitIndex;
+  protected volatile long blockAppliedCommitIndex;
 
 
-  private LogApplier logApplier;
+  protected LogApplier logApplier;
 
   /**
    * to distinguish managers of different members
@@ -116,7 +116,7 @@ public abstract class RaftLogManager {
    */
   private final Object logUpdateCondition = new Object();
 
-  private List<Log> blockedUnappliedLogList;
+  protected List<Log> blockedUnappliedLogList;
 
   protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier applier, String name) {
     this.logApplier = applier;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 4a79485..752e3e3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -166,6 +166,9 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
   public Void call() {
     // If this node is the member of previous holder, it's unnecessary to pull data again
     if (descriptor.getPreviousHolders().contains(newMember.getThisNode())) {
+      for (Integer slot: descriptor.getSlots()) {
+        newMember.getSlotManager().setToNull(slot);
+      }
       // inform the previous holders that one member has successfully pulled snapshot directly
       newMember.registerPullSnapshotHint(descriptor);
     } else {
@@ -176,7 +179,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
       request.setRequireReadOnly(descriptor.isRequireReadOnly());
 
       boolean finished = false;
-      int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode());
+      int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()) - 1;
       while (!finished) {
         try {
           // sequentially pick up a node that may have this slot
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 5493980..4193ffd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -19,8 +19,13 @@
 
 package org.apache.iotdb.cluster.partition;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
 
 /**
  * NodeRemovalResult stores the removed partition group.
@@ -61,4 +66,31 @@ public class NodeRemovalResult {
     }
     return null;
   }
+
+  public void serialize(DataOutputStream dataOutputStream) throws IOException {
+    dataOutputStream.writeInt(removedGroupList.size());
+    for (PartitionGroup group: removedGroupList) {
+      group.serialize(dataOutputStream);
+    }
+    dataOutputStream.writeInt(newGroupList.size());
+    for (PartitionGroup group: newGroupList) {
+      group.serialize(dataOutputStream);
+    }
+  }
+
+  public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+    int removedGroupListSize = buffer.getInt();
+    for (int i = 0 ; i < removedGroupListSize; i++) {
+      PartitionGroup group = new PartitionGroup();
+      group.deserialize(buffer, idNodeMap);
+      removedGroupList.add(group);
+    }
+
+    int newGroupListSize = buffer.getInt();
+    for (int i = 0 ; i < newGroupListSize; i++) {
+      PartitionGroup group = new PartitionGroup();
+      group.deserialize(buffer, idNodeMap);
+      newGroupList.add(group);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index 2a562ac..b35cc10 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -19,9 +19,13 @@
 
 package org.apache.iotdb.cluster.partition;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 
@@ -64,6 +68,23 @@ public class PartitionGroup extends ArrayList<Node> {
         super.equals(group);
   }
 
+  public void serialize(DataOutputStream dataOutputStream)
+      throws IOException {
+    dataOutputStream.writeInt(getId());
+    dataOutputStream.writeInt(size());
+    for (Node node : this) {
+      dataOutputStream.writeInt(node.getNodeIdentifier());
+    }
+  }
+
+  public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+    id = buffer.getInt();
+    int nodeNum = buffer.getInt();
+    for (int i2 = 0; i2 < nodeNum; i2++) {
+      add(idNodeMap.get(buffer.getInt()));
+    }
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(id, super.hashCode());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 079aad1..6bf6c0c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -63,14 +63,18 @@ public interface PartitionTable {
    * @param node
    * @return the new group generated by the node
    */
-  NodeAdditionResult addNode(Node node);
+  void addNode(Node node);
+
+  NodeAdditionResult getNodeAdditionResult(Node node);
 
   /**
    * Remove a node and update the partition table.
    *
    * @param node
    */
-  NodeRemovalResult removeNode(Node node);
+  void removeNode(Node node);
+
+  NodeRemovalResult getNodeRemovalResult();
 
   /**
    * @return All data groups where all VNodes of this node is the header. The first index indicates
@@ -88,12 +92,19 @@ public interface PartitionTable {
 
   ByteBuffer serialize();
 
-  void deserialize(ByteBuffer buffer);
+  /**
+   * Deserialize partition table and check whether the partition table in byte buffer is valid
+   * @param buffer
+   * @return true if the partition table is valid
+   */
+  boolean deserialize(ByteBuffer buffer);
 
   List<Node> getAllNodes();
 
   List<PartitionGroup> getGlobalGroups();
 
+  List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing);
+
   /**
    * Judge whether the data of slot is held by node
    * @param node target node
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
index 17a0c93..a04a289 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
@@ -19,9 +19,15 @@
 
 package org.apache.iotdb.cluster.partition.slot;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.cluster.partition.NodeRemovalResult;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 
 /**
@@ -29,7 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
  */
 public class SlotNodeRemovalResult extends NodeRemovalResult {
 
-  private Map<RaftNode, List<Integer>> newSlotOwners;
+  private Map<RaftNode, List<Integer>> newSlotOwners = new HashMap<>();
 
   public Map<RaftNode, List<Integer>> getNewSlotOwners() {
     return newSlotOwners;
@@ -38,4 +44,34 @@ public class SlotNodeRemovalResult extends NodeRemovalResult {
   public void addNewSlotOwners(Map<RaftNode, List<Integer>> newSlotOwners) {
     this.newSlotOwners = newSlotOwners;
   }
+
+  @Override
+  public void serialize(DataOutputStream dataOutputStream) throws IOException {
+    super.serialize(dataOutputStream);
+    dataOutputStream.writeInt(newSlotOwners.size());
+    for (Map.Entry<RaftNode, List<Integer>> entry: newSlotOwners.entrySet()) {
+      RaftNode raftNode = entry.getKey();
+      dataOutputStream.writeInt(raftNode.getNode().nodeIdentifier);
+      dataOutputStream.writeInt(raftNode.getRaftId());
+      dataOutputStream.writeInt(entry.getValue().size());
+      for (Integer slot: entry.getValue()) {
+        dataOutputStream.writeInt(slot);
+      }
+    }
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+    super.deserialize(buffer, idNodeMap);
+    int size = buffer.getInt();
+    for (int i = 0 ; i < size; i++) {
+      RaftNode raftNode = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt());
+      List<Integer> slots = new ArrayList<>();
+      int slotSize = buffer.getInt();
+      for (int j = 0 ; j < slotSize; j++) {
+        slots.add(buffer.getInt());
+      }
+      newSlotOwners.put(raftNode, slots);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 2a5ae3c..f441e4a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -62,8 +62,11 @@ public class SlotPartitionTable implements PartitionTable {
   private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM];
   // the nodes that each slot belongs to before a new node is added, used for the new node to
   // find the data source
+  // find the data source
   private Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = new ConcurrentHashMap<>();
 
+  private NodeRemovalResult nodeRemovalResult = new NodeRemovalResult();
+
   //the filed is used for determining which nodes need to be a group.
   // the data groups which this node belongs to.
   private List<PartitionGroup> localGroups;
@@ -231,11 +234,11 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public NodeAdditionResult addNode(Node node) {
+  public void addNode(Node node) {
     List<Node> oldRing;
     synchronized (nodeRing) {
       if (nodeRing.contains(node)) {
-        return null;
+        return;
       }
 
       oldRing = new ArrayList<>(nodeRing);
@@ -270,21 +273,34 @@ public class SlotPartitionTable implements PartitionTable {
       }
     }
 
-    SlotNodeAdditionResult result = new SlotNodeAdditionResult();
     for (int raftId = 0; raftId < multiRaftFactor; raftId++) {
       PartitionGroup newGroup = getHeaderGroup(new RaftNode(node, raftId));
       if (newGroup.contains(thisNode)) {
         localGroups.add(newGroup);
       }
-      result.addNewGroup(newGroup);
     }
 
-    calculateGlobalGroups();
+    calculateGlobalGroups(nodeRing);
 
     // the slots movement is only done logically, the new node itself will pull data from the
     // old node
-    result.setLostSlots(moveSlotsToNew(node, oldRing));
+    moveSlotsToNew(node, oldRing);
 
+  }
+
+  @Override
+  public NodeAdditionResult getNodeAdditionResult(Node node) {
+    SlotNodeAdditionResult result = new SlotNodeAdditionResult();
+    Map<RaftNode, Set<Integer>> lostSlotsMap = new HashMap<>();
+    for (int raftId = 0; raftId < multiRaftFactor; raftId++) {
+      RaftNode raftNode = new RaftNode(node, raftId);
+      result.addNewGroup(getHeaderGroup(raftNode));
+      for (Entry<Integer, PartitionGroup> entry: previousNodeMap.get(raftNode).entrySet()) {
+        RaftNode header = new RaftNode(entry.getValue().getHeader(), entry.getValue().getId());
+        lostSlotsMap.computeIfAbsent(header, k -> new HashSet<>()).add(entry.getKey());
+      }
+    }
+    result.setLostSlots(lostSlotsMap);
     return result;
   }
 
@@ -294,10 +310,8 @@ public class SlotPartitionTable implements PartitionTable {
    * node.
    *
    * @param newNode
-   * @return a map recording what slots each group lost.
    */
-  private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode, List<Node> oldRing) {
-    Map<RaftNode, Set<Integer>> result = new HashMap<>();
+  private void moveSlotsToNew(Node newNode, List<Node> oldRing) {
     // as a node is added, the average slots for each node decrease
     // move the slots to the new node if any previous node have more slots than the new average
     int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor;
@@ -324,7 +338,6 @@ public class SlotPartitionTable implements PartitionTable {
           previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
           slotNodes[slot] = curNode;
         }
-        result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove);
         transferNum -= numToMove;
         if (transferNum > 0) {
           curNode = new RaftNode(newNode, ++raftId);
@@ -335,11 +348,9 @@ public class SlotPartitionTable implements PartitionTable {
             previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
             slotNodes[slot] = curNode;
           }
-          result.get(entry.getKey()).addAll(slotsToMove);
         }
       }
     }
-    return result;
   }
 
   @Override
@@ -354,6 +365,7 @@ public class SlotPartitionTable implements PartitionTable {
     DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
 
     try {
+      dataOutputStream.writeLong(lastLogIndex);
       dataOutputStream.writeInt(totalSlotNumbers);
       dataOutputStream.writeInt(nodeSlotMap.size());
       for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
@@ -370,16 +382,11 @@ public class SlotPartitionTable implements PartitionTable {
         dataOutputStream.writeInt(prevHolders.size());
         for (Entry<Integer, PartitionGroup> integerNodeEntry : prevHolders.entrySet()) {
           dataOutputStream.writeInt(integerNodeEntry.getKey());
-          PartitionGroup group = integerNodeEntry.getValue();
-          dataOutputStream.writeInt(group.getId());
-          dataOutputStream.writeInt(group.size());
-          for (Node node : group) {
-            dataOutputStream.writeInt(node.getNodeIdentifier());
-          }
+          integerNodeEntry.getValue().serialize(dataOutputStream);
         }
       }
 
-      dataOutputStream.writeLong(lastLogIndex);
+      nodeRemovalResult.serialize(dataOutputStream);
     } catch (IOException ignored) {
       // not reachable
     }
@@ -387,8 +394,14 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public void deserialize(ByteBuffer buffer) {
+  public synchronized boolean deserialize(ByteBuffer buffer) {
+    long newLastLogIndex = buffer.getLong();
 
+    // judge whether the partition table of byte buffer is out of date
+    if (lastLogIndex >= newLastLogIndex) {
+      return lastLogIndex <= newLastLogIndex;
+    }
+    lastLogIndex = newLastLogIndex;
     logger.info("Initializing the partition table from buffer");
     totalSlotNumbers = buffer.getInt();
     int size = buffer.getInt();
@@ -415,17 +428,15 @@ public class SlotPartitionTable implements PartitionTable {
       Map<Integer, PartitionGroup> prevHolders = new HashMap<>();
       int holderNum = buffer.getInt();
       for (int i1 = 0; i1 < holderNum; i1++) {
-        int slot = buffer.getInt();
-        PartitionGroup group = new PartitionGroup(buffer.getInt());
-        int nodeNum = buffer.getInt();
-        for (int i2 = 0 ; i2 < nodeNum; i2++) {
-          group.add(idNodeMap.get(buffer.getInt()));
-        }
-        prevHolders.put(slot, group);
+        PartitionGroup group = new PartitionGroup();
+        group.deserialize(buffer, idNodeMap);
+        prevHolders.put(buffer.getInt(), group);
       }
       previousNodeMap.put(node, prevHolders);
     }
-    lastLogIndex = buffer.getLong();
+
+    nodeRemovalResult = new NodeRemovalResult();
+    nodeRemovalResult.deserialize(buffer, idNodeMap);
 
     for (RaftNode raftNode : nodeSlotMap.keySet()) {
       if (!nodeRing.contains(raftNode.getNode())) {
@@ -436,6 +447,7 @@ public class SlotPartitionTable implements PartitionTable {
     logger.info("All known nodes: {}", nodeRing);
 
     localGroups = getPartitionGroups(thisNode);
+    return true;
   }
 
   @Override
@@ -485,10 +497,10 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public NodeRemovalResult removeNode(Node target) {
+  public void removeNode(Node target) {
     synchronized (nodeRing) {
       if (!nodeRing.contains(target)) {
-        return null;
+        return;
       }
 
       SlotNodeRemovalResult result = new SlotNodeRemovalResult();
@@ -532,16 +544,21 @@ public class SlotPartitionTable implements PartitionTable {
         result.addNewGroup(newGrp);
       }
 
-      calculateGlobalGroups();
+      calculateGlobalGroups(nodeRing);
 
       // the slots movement is only done logically, the new node itself will pull data from the
       // old node
       Map<RaftNode, List<Integer>> raftNodeListMap = retrieveSlots(target);
       result.addNewSlotOwners(raftNodeListMap);
-      return result;
+      this.nodeRemovalResult = result;
     }
   }
 
+  @Override
+  public NodeRemovalResult getNodeRemovalResult() {
+    return nodeRemovalResult;
+  }
+
   private Map<RaftNode, List<Integer>> retrieveSlots(Node target) {
     Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>();
     for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) {
@@ -563,7 +580,7 @@ public class SlotPartitionTable implements PartitionTable {
     // preventing a thread from getting incomplete globalGroups
     synchronized (nodeRing) {
       if (globalGroups == null) {
-        calculateGlobalGroups();
+        globalGroups = calculateGlobalGroups(nodeRing);
       }
       return globalGroups;
     }
@@ -574,13 +591,15 @@ public class SlotPartitionTable implements PartitionTable {
     return getHeaderGroup(slotNodes[slot]).contains(node);
   }
 
-  private void calculateGlobalGroups() {
-    globalGroups = new ArrayList<>();
-    for (Node node : getAllNodes()) {
+  @Override
+  public List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing) {
+    List<PartitionGroup> result = new ArrayList<>();
+    for (Node node : nodeRing) {
       for (int i = 0; i < multiRaftFactor; i++) {
-        globalGroups.add(getHeaderGroup(new RaftNode(node, i)));
+        result.add(getHeaderGroup(new RaftNode(node, i)));
       }
     }
+    return result;
   }
 
   public synchronized long getLastLogIndex() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index f052294..0c8cf25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -21,12 +21,19 @@ package org.apache.iotdb.cluster.query;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogParser;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -41,6 +48,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CountPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
 import org.apache.iotdb.db.service.IoTDB;
@@ -108,7 +116,7 @@ public class ClusterPlanRouter {
   }
 
   public Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(PhysicalPlan plan)
-      throws UnsupportedPlanException, MetadataException {
+      throws UnsupportedPlanException, MetadataException, UnknownLogTypeException {
     if (plan instanceof InsertTabletPlan) {
       return splitAndRoutePlan((InsertTabletPlan) plan);
     } else if (plan instanceof CountPlan) {
@@ -121,6 +129,8 @@ public class ClusterPlanRouter {
       return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
     } else if (plan instanceof CreateMultiTimeSeriesPlan) {
       return splitAndRoutePlan((CreateMultiTimeSeriesPlan) plan);
+    } else if (plan instanceof LogPlan) {
+      return splitAndRoutePlan((LogPlan)plan);
     }
     //the if clause can be removed after the program is stable
     if (PartitionUtils.isLocalNonQueryPlan(plan)) {
@@ -134,6 +144,25 @@ public class ClusterPlanRouter {
     throw new UnsupportedPlanException(plan);
   }
 
+  private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan)
+      throws UnknownLogTypeException, UnsupportedPlanException {
+    Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
+    Log log = LogParser.getINSTANCE().parse(plan.getLog());
+    List<Node> oldRing = new ArrayList<>(partitionTable.getAllNodes());
+    if (log instanceof AddNodeLog) {
+      oldRing.remove(((AddNodeLog) log).getNewNode());
+    } else if (log instanceof RemoveNodeLog) {
+      oldRing.add(((RemoveNodeLog) log).getRemovedNode());
+      oldRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
+    } else {
+      throw new UnsupportedPlanException(plan);
+    }
+    for (PartitionGroup partitionGroup: partitionTable.calculateGlobalGroups(oldRing)) {
+      result.put(plan, partitionGroup);
+    }
+    return result;
+  }
+
   private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowPlan plan)
       throws MetadataException {
     PartitionGroup partitionGroup = partitionTable.partitionByPathTime(plan.getDeviceId(),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index b023c36..16c1da6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -33,6 +33,8 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.NoHeaderNodeException;
 import org.apache.iotdb.cluster.exception.NotInSameGroupException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.partition.NodeAdditionResult;
 import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -490,6 +492,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     return "DataServerThread-";
   }
 
+  public void preAddNodeForDataGroup(AddNodeLog log, DataGroupMember targetDataGroupMember) {
+    // Make sure the previous add/remove node log has applied
+    metaGroupMember.syncLeader();
+
+    // Check the validity of the partition table
+    if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+      return;
+    }
+
+    targetDataGroupMember.preAddNode(log.getNewNode());
+  }
+
   /**
    * Try adding the node into the group of each DataGroupMember, and if the DataGroupMember no
    * longer stays in that group, also remove and stop it. If the new group contains this node, also
@@ -499,6 +513,10 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    * @param result
    */
   public void addNode(Node node, NodeAdditionResult result) {
+    // If the node executed adding itself to the cluster, it's unnecessary to add new groups because they already exist.
+    if (node.equals(thisNode)) {
+      return;
+    }
     Iterator<Entry<RaftNode, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
     synchronized (headerGroupMap) {
       while (entryIterator.hasNext()) {
@@ -581,6 +599,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     logger.info("Data group members are ready");
   }
 
+  public void preRemoveNodeForDataGroup(RemoveNodeLog log, DataGroupMember targetDataGroupMember) {
+    // Make sure the previous add/remove node log has applied
+    metaGroupMember.syncLeader();
+
+    // Check the validity of the partition table
+    if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+      return;
+    }
+
+    targetDataGroupMember.preRemoveNode(log.getRemovedNode());
+  }
+
   /**
    * Try removing a node from the groups of each DataGroupMember. If the node is the header of some
    * group, set the member to read only so that it can still provide data for other nodes that has
@@ -625,6 +655,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   /**
    * When the node joins a cluster, it also creates a new data group and a corresponding member
+   * When the node joins a cluster, it also creates a new data group and a corresponding member
    * which has no data. This is to make that member pull data from other nodes.
    */
   public void pullSnapshots() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index e4a7304..d198039 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -248,8 +248,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void exile(AsyncMethodCallback<Void> resultHandler) {
-    asyncService.exile(resultHandler);
+  public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
+    asyncService.exile(removeNodeLog, resultHandler);
   }
 
   @Override
@@ -274,8 +274,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void exile() {
-    syncService.exile();
+  public void exile(ByteBuffer removeNodeLog) {
+    syncService.exile(removeNodeLog);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 4737520..077d61d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -261,33 +261,10 @@ public class DataGroupMember extends RaftMember {
     }
   }
 
-  /**
-   * Try to add a Node into the group to which the member belongs.
-   *
-   * @param node
-   * @return true if this node should leave the group because of the addition of the node, false
-   * otherwise
-   */
-  public synchronized boolean addNode(Node node, NodeAdditionResult result) {
-    // when a new node is added, start an election instantly to avoid the stale leader still
-    // taking the leadership, which guarantees the valid leader will not have the stale
-    // partition table
-    synchronized (term) {
-      term.incrementAndGet();
-      setLeader(ClusterConstant.EMPTY_NODE);
-      setVoteFor(thisNode);
-      updateHardState(term.get(), getVoteFor());
-      setLastHeartbeatReceivedTime(System.currentTimeMillis());
-      setCharacter(NodeCharacter.ELECTOR);
-    }
-
-    // mark slots that do not belong to this group any more
-    Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots()
-        .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()), Collections.emptySet());
-    for (Integer lostSlot : lostSlots) {
-      slotManager.setToSending(lostSlot);
+  public void preAddNode(Node node) {
+    if (allNodes.contains(node)) {
+      return;
     }
-
     synchronized (allNodes) {
       int insertIndex = -1;
       // find the position to insert the new node, the nodes are ordered by their identifiers
@@ -307,12 +284,41 @@ public class DataGroupMember extends RaftMember {
       if (insertIndex > 0) {
         allNodes.add(insertIndex, node);
         peerMap.putIfAbsent(node, new Peer(logManager.getLastLogIndex()));
-        // remove the last node because the group size is fixed to replication number
-        Node removedNode = allNodes.remove(allNodes.size() - 1);
-        peerMap.remove(removedNode);
         // if the local node is the last node and the insertion succeeds, this node should leave
         // the group
         logger.debug("{}: Node {} is inserted into the data group {}", name, node, allNodes);
+      }
+    }
+  }
+
+  /**
+   * Try to add a Node into the group to which the member belongs.
+   *
+   * @param node
+   * @return true if this node should leave the group because of the addition of the node, false
+   * otherwise
+   */
+  public boolean addNode(Node node, NodeAdditionResult result) {
+
+    // mark slots that do not belong to this group any more
+    Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots()
+        .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()), Collections.emptySet());
+    for (Integer lostSlot : lostSlots) {
+      slotManager.setToSending(lostSlot);
+    }
+
+    synchronized (allNodes) {
+      if (allNodes.contains(node) && allNodes.size() > config.getReplicationNum()) {
+        // remove the last node because the group size is fixed to replication number
+        Node removedNode = allNodes.remove(allNodes.size() - 1);
+        peerMap.remove(removedNode);
+        if (removedNode.equals(leader.get())) {
+          // if the leader is removed, also start an election immediately
+          synchronized (term) {
+            setCharacter(NodeCharacter.ELECTOR);
+            setLastHeartbeatReceivedTime(Long.MIN_VALUE);
+          }
+        }
         return removedNode.equals(thisNode);
       }
       return false;
@@ -737,6 +743,18 @@ public class DataGroupMember extends RaftMember {
     }
   }
 
+  public void preRemoveNode(Node removedNode) {
+    synchronized (allNodes) {
+      if (allNodes.contains(removedNode)) {
+        // update the group if the deleted node was in it
+        PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
+        Node newNodeToGroup = newGroup.get(newGroup.size() - 1);
+        allNodes.add(newNodeToGroup);
+        peerMap.putIfAbsent(newNodeToGroup, new Peer(logManager.getLastLogIndex()));
+      }
+    }
+  }
+
   /**
    * When a node is removed and IT IS NOT THE HEADER of the group, the member should take over some
    * slots from the removed group, and add a new node to the group the removed node was in the
@@ -747,8 +765,8 @@ public class DataGroupMember extends RaftMember {
     synchronized (allNodes) {
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
-        allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
-        initPeerMap();
+        allNodes.remove(removedNode);
+        peerMap.remove(removedNode);
         if (removedNode.equals(leader.get())) {
           // if the leader is removed, also start an election immediately
           synchronized (term) {
@@ -837,7 +855,7 @@ public class DataGroupMember extends RaftMember {
         continue;
       }
       int sentReplicaNum = slotManager.sentOneReplication(slot);
-      if (sentReplicaNum >= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) {
+      if (sentReplicaNum >= config.getReplicationNum()) {
         removableSlots.add(slot);
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7e73f61..22520e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
@@ -186,11 +187,6 @@ public class MetaGroupMember extends RaftMember {
    * members in this node
    */
   private static final int REPORT_INTERVAL_SEC = 10;
-  /**
-   * how many times is a data record replicated, also the number of nodes in a data group
-   */
-  private static final int REPLICATION_NUM =
-      ClusterDescriptor.getInstance().getConfig().getReplicationNum();
 
   /**
    * during snapshot, hardlinks of data files are created to for downloading. hardlinks will be
@@ -421,19 +417,23 @@ public class MetaGroupMember extends RaftMember {
    * Apply the addition of a new node. Register its identifier, add it to the node list and
    * partition table, serialize the partition table and update the DataGroupMembers.
    */
-  public void applyAddNode(Node newNode) {
+  public void applyAddNode(AddNodeLog addNodeLog) {
+
+    Node newNode = addNodeLog.getNewNode();
     synchronized (allNodes) {
-      if (!allNodes.contains(newNode)) {
+      if (partitionTable.deserialize(addNodeLog.getPartitionTable())) {
         logger.debug("Adding a new node {} into {}", newNode, allNodes);
-        registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
-        allNodes.add(newNode);
+
+        if (!allNodes.contains(newNode)) {
+          registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
+          allNodes.add(newNode);
+        }
 
         // update the partition table
-        NodeAdditionResult result = partitionTable.addNode(newNode);
-        ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex());
         savePartitionTable();
 
         // update local data members
+        NodeAdditionResult result = partitionTable.getNodeAdditionResult(newNode);
         getDataClusterServer().addNode(newNode, result);
       }
     }
@@ -856,7 +856,12 @@ public class MetaGroupMember extends RaftMember {
 
     // node adding is serialized to reduce potential concurrency problem
     synchronized (logManager) {
+      // update partition table
+      partitionTable.addNode(node);
+      ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
+
       AddNodeLog addNodeLog = new AddNodeLog();
+      addNodeLog.setPartitionTable(partitionTable.serialize());
       addNodeLog.setCurrLogTerm(getTerm().get());
       addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
@@ -868,11 +873,11 @@ public class MetaGroupMember extends RaftMember {
       while (true) {
         logger
             .info("Send the join request of {} to other nodes, retry time: {}", node, retryTime);
-        AppendLogResult result = sendLogToAllGroups(addNodeLog);
+        AppendLogResult result = sendLogToFollowers(addNodeLog);
         switch (result) {
           case OK:
-            logger.info("Join request of {} is accepted", node);
             commitLog(addNodeLog);
+            logger.info("Join request of {} is accepted", node);
 
             synchronized (partitionTable) {
               response.setPartitionTableBytes(partitionTable.serialize());
@@ -902,9 +907,9 @@ public class MetaGroupMember extends RaftMember {
     long localPartitionInterval = IoTDBDescriptor.getInstance().getConfig()
         .getPartitionInterval();
     int localHashSalt = ClusterConstant.HASH_SALT;
-    int localReplicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
-    String localClusterName = ClusterDescriptor.getInstance().getConfig().getClusterName();
-    int localMultiRaftFactor = ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
+    int localReplicationNum = config.getReplicationNum();
+    String localClusterName = config.getClusterName();
+    int localMultiRaftFactor = config.getMultiRaftFactor();
     boolean partitionIntervalEquals = true;
     boolean multiRaftFactorEquals = true;
     boolean hashSaltEquals = true;
@@ -1027,7 +1032,7 @@ public class MetaGroupMember extends RaftMember {
   }
 
   private CheckStatusResponse checkStatus(Node seedNode) {
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+    if (config.isUseAsyncServer()) {
       AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(seedNode);
       if (client == null) {
         return null;
@@ -1061,29 +1066,29 @@ public class MetaGroupMember extends RaftMember {
    * Send the log the all data groups and return a success only when each group's quorum has
    * accepted this log.
    */
-  private AppendLogResult sendLogToAllGroups(Log log) {
-    List<Node> nodeRing = partitionTable.getAllNodes();
-
-    AtomicLong newLeaderTerm = new AtomicLong(term.get());
-    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
-    AppendEntryRequest request = buildAppendEntryRequest(log, true);
-
-    // ask for votes from each node
-    int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm);
-
-    if (!leaderShipStale.get()) {
-      // if all quorums of all groups have received this log, it is considered succeeded.
-      for (int remaining : groupRemainings) {
-        if (remaining > 0) {
-          return AppendLogResult.TIME_OUT;
-        }
-      }
-    } else {
-      return AppendLogResult.LEADERSHIP_STALE;
-    }
-
-    return AppendLogResult.OK;
-  }
+//  private AppendLogResult sendLogToAllGroups(Log log) {
+//    List<Node> nodeRing = partitionTable.getAllNodes();
+//
+//    AtomicLong newLeaderTerm = new AtomicLong(term.get());
+//    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
+//    AppendEntryRequest request = buildAppendEntryRequest(log, true);
+//
+//    // ask for votes from each node
+//    int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm);
+//
+//    if (!leaderShipStale.get()) {
+//      // if all quorums of all groups have received this log, it is considered succeeded.
+//      for (int remaining : groupRemainings) {
+//        if (remaining > 0) {
+//          return AppendLogResult.TIME_OUT;
+//        }
+//      }
+//    } else {
+//      return AppendLogResult.LEADERSHIP_STALE;
+//    }
+//
+//    return AppendLogResult.OK;
+//  }
 
   /**
    * Send "request" to each node in "nodeRing" and when a node returns a success, decrease all
@@ -1094,54 +1099,54 @@ public class MetaGroupMember extends RaftMember {
   @SuppressWarnings({"java:S2445", "java:S2274"})
   // groupRemaining is shared with the handlers,
   // and we do not wait infinitely to enable timeouts
-  private int[] askGroupVotes(List<Node> nodeRing,
-      AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log,
-      AtomicLong newLeaderTerm) {
-    // each node will be the header of a group, we use the node to represent the group
-    int nodeSize = nodeRing.size();
-    // the decreasing counters of how many nodes in a group has received the log, each time a
-    // node receive the log, the counters of all groups it is in will decrease by 1
-    int[] groupRemainings = new int[nodeSize];
-    // a group is considered successfully received the log if such members receive the log
-    int groupQuorum = REPLICATION_NUM / 2 + 1;
-    Arrays.fill(groupRemainings, groupQuorum);
-
-    synchronized (groupRemainings) {
-      // ask a vote from every node
-      for (int i = 0; i < nodeSize; i++) {
-        Node node = nodeRing.get(i);
-        if (node.equals(thisNode)) {
-          // this node automatically gives an agreement, decrease counters of all groups the local
-          // node is in
-          for (int j = 0; j < REPLICATION_NUM; j++) {
-            int groupIndex = i - j;
-            if (groupIndex < 0) {
-              groupIndex += groupRemainings.length;
-            }
-            groupRemainings[groupIndex]--;
-          }
-        } else {
-          askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, newLeaderTerm,
-              request);
-        }
-      }
-
-      try {
-        groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS());
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        logger.error("Unexpected interruption when waiting for the group votes", e);
-      }
-    }
-    return groupRemainings;
-  }
+//  private int[] askGroupVotes(List<Node> nodeRing,
+//      AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log,
+//      AtomicLong newLeaderTerm) {
+//    // each node will be the header of a group, we use the node to represent the group
+//    int nodeSize = nodeRing.size();
+//    // the decreasing counters of how many nodes in a group has received the log, each time a
+//    // node receive the log, the counters of all groups it is in will decrease by 1
+//    int[] groupRemainings = new int[nodeSize];
+//    // a group is considered successfully received the log if such members receive the log
+//    int groupQuorum = REPLICATION_NUM / 2 + 1;
+//    Arrays.fill(groupRemainings, groupQuorum);
+//
+//    synchronized (groupRemainings) {
+//      // ask a vote from every node
+//      for (int i = 0; i < nodeSize; i++) {
+//        Node node = nodeRing.get(i);
+//        if (node.equals(thisNode)) {
+//          // this node automatically gives an agreement, decrease counters of all groups the local
+//          // node is in
+//          for (int j = 0; j < REPLICATION_NUM; j++) {
+//            int groupIndex = i - j;
+//            if (groupIndex < 0) {
+//              groupIndex += groupRemainings.length;
+//            }
+//            groupRemainings[groupIndex]--;
+//          }
+//        } else {
+//          askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, newLeaderTerm,
+//              request);
+//        }
+//      }
+//
+//      try {
+//        groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS());
+//      } catch (InterruptedException e) {
+//        Thread.currentThread().interrupt();
+//        logger.error("Unexpected interruption when waiting for the group votes", e);
+//      }
+//    }
+//    return groupRemainings;
+//  }
 
   private void askRemoteGroupVote(Node node, int[] groupRemainings, int nodeIndex,
       AtomicBoolean leaderShipStale, Log log,
       AtomicLong newLeaderTerm, AppendEntryRequest request) {
     AppendGroupEntryHandler handler = new AppendGroupEntryHandler(groupRemainings,
         nodeIndex, node, leaderShipStale, log, newLeaderTerm, this);
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+    if (config.isUseAsyncServer()) {
       AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(node);
       try {
         if (client != null) {
@@ -1469,7 +1474,7 @@ public class MetaGroupMember extends RaftMember {
     if (planGroupMap == null || planGroupMap.isEmpty()) {
       if ((plan instanceof InsertPlan || plan instanceof CreateTimeSeriesPlan
           || plan instanceof CreateMultiTimeSeriesPlan)
-          && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+          && config.isEnableAutoCreateSchema()) {
         logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan);
         try {
           ((CMManager) IoTDB.metaManager).createSchema(plan);
@@ -1499,10 +1504,10 @@ public class MetaGroupMember extends RaftMember {
       syncLeaderWithConsistencyCheck(true);
       try {
         planGroupMap = router.splitAndRoutePlan(plan);
-      } catch (MetadataException ex) {
+      } catch (MetadataException | UnknownLogTypeException ex) {
         // ignore
       }
-    } catch (MetadataException e) {
+    } catch (MetadataException | UnknownLogTypeException e) {
       logger.error("Cannot route plan {}", plan, e);
     }
     return planGroupMap;
@@ -1534,7 +1539,7 @@ public class MetaGroupMember extends RaftMember {
     }
     if (plan instanceof InsertPlan
         && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
-        && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+        && config.isEnableAutoCreateSchema()) {
       TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan));
       if (tmpStatus != null) {
         status = tmpStatus;
@@ -1773,7 +1778,7 @@ public class MetaGroupMember extends RaftMember {
       try {
         // only data plans are partitioned, so it must be processed by its data server instead of
         // meta server
-        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+        if (config.isUseAsyncServer()) {
           status = forwardDataPlanAsync(plan, node, group.getHeader());
         } else {
           status = forwardDataPlanSync(plan, node, group.getHeader());
@@ -1880,7 +1885,7 @@ public class MetaGroupMember extends RaftMember {
     }
 
     try {
-      if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+      if (config.isUseAsyncServer()) {
         getNodeStatusAsync(nodeStatus);
       } else {
         getNodeStatusSync(nodeStatus);
@@ -1974,7 +1979,7 @@ public class MetaGroupMember extends RaftMember {
     }
 
     // if we cannot have enough replica after the removal, reject it
-    if (allNodes.size() <= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) {
+    if (allNodes.size() <= config.getReplicationNum()) {
       return Response.RESPONSE_CLUSTER_TOO_SMALL;
     }
 
@@ -1996,7 +2001,12 @@ public class MetaGroupMember extends RaftMember {
 
     // node removal must be serialized to reduce potential concurrency problem
     synchronized (logManager) {
+      // update partition table
+      partitionTable.addNode(node);
+      ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
+
       RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+      removeNodeLog.setPartitionTable(partitionTable.serialize());
       removeNodeLog.setCurrLogTerm(getTerm().get());
       removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
@@ -2008,12 +2018,11 @@ public class MetaGroupMember extends RaftMember {
       while (true) {
         logger.info("Send the node removal request of {} to other nodes, retry time: {}", target,
             retryTime);
-        AppendLogResult result = sendLogToAllGroups(removeNodeLog);
-
+        AppendLogResult result = sendLogToFollowers(removeNodeLog);
         switch (result) {
           case OK:
-            logger.info("Removal request of {} is accepted", target);
             commitLog(removeNodeLog);
+            logger.info("Removal request of {} is accepted", target);
             return Response.RESPONSE_AGREE;
           case TIME_OUT:
             logger.info("Removal request of {} timed out", target);
@@ -2033,22 +2042,28 @@ public class MetaGroupMember extends RaftMember {
    * and catch-up service of data are kept alive for other nodes to pull data. If the removed node
    * is a leader, send an exile to the removed node so that it can know it is removed.
    *
-   * @param oldNode the node to be removed
    */
-  public void applyRemoveNode(Node oldNode) {
+  public void applyRemoveNode(RemoveNodeLog removeNodeLog) {
+
+    Node oldNode = removeNodeLog.getRemovedNode();
     synchronized (allNodes) {
-      if (allNodes.contains(oldNode)) {
+      if (partitionTable.deserialize(removeNodeLog.getPartitionTable())) {
         logger.debug("Removing a node {} from {}", oldNode, allNodes);
-        allNodes.remove(oldNode);
-        idNodeMap.remove(oldNode.nodeIdentifier);
 
-        // update the partition table
-        NodeRemovalResult result = partitionTable.removeNode(oldNode);
-        ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex());
+        if (allNodes.contains(oldNode)) {
+          allNodes.remove(oldNode);
+          idNodeMap.remove(oldNode.nodeIdentifier);
+
+        }
+
+        // save the updated partition table
+        savePartitionTable();
 
         // update DataGroupMembers, as the node is removed, the members of some groups are
         // changed and there will also be one less group
+        NodeRemovalResult result = partitionTable.getNodeRemovalResult();
         getDataClusterServer().removeNode(oldNode, result);
+
         // the leader is removed, start the next election ASAP
         if (oldNode.equals(leader.get())) {
           setCharacter(NodeCharacter.ELECTOR);
@@ -2065,21 +2080,19 @@ public class MetaGroupMember extends RaftMember {
         } else if (thisNode.equals(leader.get())) {
           // as the old node is removed, it cannot know this by heartbeat or log, so it should be
           // directly kicked out of the cluster
-          exileNode(oldNode);
+          exileNode(removeNodeLog);
         }
-
-        // save the updated partition table
-        savePartitionTable();
       }
     }
   }
 
-  private void exileNode(Node node) {
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+  private void exileNode(RemoveNodeLog removeNodeLog) {
+    Node node = removeNodeLog.getRemovedNode();
+    if (config.isUseAsyncServer()) {
       AsyncMetaClient asyncMetaClient = (AsyncMetaClient) getAsyncClient(node);
       try {
         if (asyncMetaClient != null) {
-          asyncMetaClient.exile(new GenericHandler<>(node, null));
+          asyncMetaClient.exile(removeNodeLog.serialize(), new GenericHandler<>(node, null));
         }
       } catch (TException e) {
         logger.warn("Cannot inform {} its removal", node, e);
@@ -2090,7 +2103,7 @@ public class MetaGroupMember extends RaftMember {
         return;
       }
       try {
-        client.exile();
+        client.exile(removeNodeLog.serialize());
       } catch (TException e) {
         client.getInputProtocol().getTransport().close();
         logger.warn("Cannot inform {} its removal", node, e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0526285..57f22bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -93,6 +93,7 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -899,13 +900,23 @@ public abstract class RaftMember {
       return StatusUtils.NODE_READ_ONLY;
     }
     long startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
-    PhysicalPlanLog log = new PhysicalPlanLog();
+    Log log;
+    if (plan instanceof LogPlan) {
+      try {
+        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+      } catch (UnknownLogTypeException e) {
+        logger.error("Can not parse LogPlan {}", plan, e);
+        return StatusUtils.PARSE_LOG_ERROR;
+      }
+    } else {
+      log = new PhysicalPlanLog();
+      ((PhysicalPlanLog)log).setPlan(plan);
+    }
     // assign term and index to the new log and append it
     synchronized (logManager) {
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
-      log.setPlan(plan);
       plan.setIndex(log.getCurrLogIndex());
       logManager.append(log);
     }
@@ -1404,7 +1415,7 @@ public abstract class RaftMember {
   }
 
   private TSStatus handleLogExecutionException(
-      PhysicalPlanLog log, LogExecutionException e) {
+      Log log, LogExecutionException e) {
     Throwable cause = IOUtils.getRootCause(e);
     if (cause instanceof BatchProcessException) {
       return RpcUtils
@@ -1536,7 +1547,7 @@ public abstract class RaftMember {
         logger.debug("Has lose leadership, so need not to send log");
         return false;
       }
-      AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);
+      AppendLogResult result = sendLogToFollowers(log);
       Timer.Statistic.RAFT_SENDER_SEND_LOG_TO_FOLLOWERS.calOperationCostTimeFromStart(startTime);
       switch (result) {
         case OK:
@@ -1568,10 +1579,11 @@ public abstract class RaftMember {
    *                       0, half of the cluster size will be used.
    * @return an AppendLogResult
    */
-  private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) {
+  protected AppendLogResult sendLogToFollowers(Log log) {
+    int requiredQuorum = allNodes.size() / 2;
     if (requiredQuorum <= 0) {
       // use half of the members' size as the quorum
-      return sendLogToFollowers(log, new AtomicInteger(allNodes.size() / 2));
+      return sendLogToFollowers(log, new AtomicInteger(requiredQuorum));
     } else {
       // make sure quorum does not exceed the number of members - 1
       return sendLogToFollowers(log, new AtomicInteger(Math.min(requiredQuorum,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 4ca6eb0..3b2df98 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -19,10 +19,12 @@
 
 package org.apache.iotdb.cluster.server.service;
 
+import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.exception.AddSelfException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
@@ -195,8 +197,10 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
    * @param resultHandler
    */
   @Override
-  public void exile(AsyncMethodCallback<Void> resultHandler) {
-    metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
+  public void exile(ByteBuffer removeNodeLogBuffer, AsyncMethodCallback<Void> resultHandler) {
+    RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+    removeNodeLog.deserialize(removeNodeLogBuffer);
+    metaGroupMember.applyRemoveNode(removeNodeLog);
     resultHandler.onComplete(null);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 3b5f445..48c0e58 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.cluster.server.service;
 
+import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
 import org.apache.iotdb.cluster.exception.AddSelfException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
@@ -188,7 +190,9 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
    * must tell it directly.
    */
   @Override
-  public void exile() {
-    metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
+  public void exile(ByteBuffer removeNodeLogBuffer) {
+    RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+    removeNodeLog.deserialize(removeNodeLogBuffer);
+    metaGroupMember.applyRemoveNode(removeNodeLog);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
index 4d1205f..5a3168a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
@@ -45,6 +45,7 @@ public class StatusUtils {
   public static final TSStatus CONSISTENCY_FAILURE = getStatus(TSStatusCode.CONSISTENCY_FAILURE);
   public static final TSStatus TIMESERIES_NOT_EXIST_ERROR = getStatus(TSStatusCode.TIMESERIES_NOT_EXIST);
   public static final TSStatus NO_CONNECTION = getStatus(TSStatusCode.NO_CONNECTION);
+  public static final TSStatus PARSE_LOG_ERROR = getStatus(TSStatusCode.PARSE_LOG_ERROR);
 
 
   private static TSStatus getStatus(TSStatusCode statusCode) {
@@ -197,6 +198,9 @@ public class StatusUtils {
       case NO_CONNECTION:
         status.setMessage("Node cannot be reached.");
         break;
+      case PARSE_LOG_ERROR:
+        status.setMessage("Parse log error.");
+        break;
       default:
         status.setMessage("");
         break;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
index 771112b..1f1f3ba 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.common;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -65,6 +66,8 @@ public class TestUtils {
 
   public static long TEST_TIME_OUT_MS = 200;
 
+  public static ByteBuffer seralizePartitionTable = new SlotPartitionTable(getNode(0)).serialize();
+
   private TestUtils() {
     // util class
   }
@@ -83,6 +86,7 @@ public class TestUtils {
     for (int i = 0; i < logNum; i++) {
       AddNodeLog log = new AddNodeLog();
       log.setNewNode(getNode(i));
+      log.setPartitionTable(seralizePartitionTable);
       log.setCurrLogIndex(i);
       log.setCurrLogTerm(i);
       logList.add(log);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 66a9615..76efe5f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -42,6 +42,7 @@ public class LogParserTest {
   public void testAddNodeLog() throws UnknownLogTypeException {
     AddNodeLog log = new AddNodeLog();
     log.setNewNode(TestUtils.getNode(5));
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     log.setCurrLogIndex(8);
     log.setCurrLogTerm(8);
 
@@ -78,6 +79,7 @@ public class LogParserTest {
   @Test
   public void testRemoveNodeLog() throws UnknownLogTypeException {
     RemoveNodeLog log = new RemoveNodeLog();
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     log.setRemovedNode(TestUtils.getNode(0));
     log.setCurrLogIndex(8);
     log.setCurrLogTerm(8);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
index 95c8fe4..fd1a87b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
@@ -24,11 +24,13 @@ import static junit.framework.TestCase.assertFalse;
 import static junit.framework.TestCase.assertTrue;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.iotdb.cluster.common.IoTDBTest;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
+import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
@@ -54,13 +56,13 @@ public class MetaLogApplierTest extends IoTDBTest {
 
   private TestMetaGroupMember testMetaGroupMember = new TestMetaGroupMember() {
     @Override
-    public void applyAddNode(Node newNode) {
-      nodes.add(newNode);
+    public void applyAddNode(AddNodeLog addNodeLog) {
+      nodes.add(addNodeLog.getNewNode());
     }
 
     @Override
-    public void applyRemoveNode(Node oldNode) {
-      nodes.remove(oldNode);
+    public void applyRemoveNode(RemoveNodeLog removeNodeLog) {
+      nodes.remove(removeNodeLog.getRemovedNode());
     }
   };
 
@@ -82,6 +84,7 @@ public class MetaLogApplierTest extends IoTDBTest {
     Node node = new Node("localhost", 1111, 0, 2222, 55560);
     AddNodeLog log = new AddNodeLog();
     log.setNewNode(node);
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     applier.apply(log);
 
     assertTrue(nodes.contains(node));
@@ -94,6 +97,7 @@ public class MetaLogApplierTest extends IoTDBTest {
 
     Node node = testMetaGroupMember.getThisNode();
     RemoveNodeLog log = new RemoveNodeLog();
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     log.setRemovedNode(node);
     applier.apply(log);
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 09b42e4..d6fec31 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -87,6 +87,7 @@ public class SerializeLogTest {
   @Test
   public void testAddNodeLog() throws UnknownLogTypeException {
     AddNodeLog log = new AddNodeLog();
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     log.setCurrLogIndex(2);
     log.setCurrLogTerm(2);
     log.setNewNode(new Node("apache.iotdb.com", 1234, 1, 4321, 55560));
@@ -110,6 +111,7 @@ public class SerializeLogTest {
   @Test
   public void testRemoveNodeLog() throws UnknownLogTypeException {
     RemoveNodeLog log = new RemoveNodeLog();
+    log.setPartitionTable(TestUtils.seralizePartitionTable);
     log.setCurrLogIndex(2);
     log.setCurrLogTerm(2);
     log.setRemovedNode(TestUtils.getNode(0));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
index b00e755..30315dc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
@@ -499,7 +499,8 @@ public class SlotPartitionTableTest {
   @Test
   public void testRemoveNode() {
     List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0), raftId);
-    NodeRemovalResult nodeRemovalResult = localTable.removeNode(getNode(0));
+    localTable.removeNode(getNode(0));
+    NodeRemovalResult nodeRemovalResult = localTable.getNodeRemovalResult();
     assertFalse(localTable.getAllNodes().contains(getNode(0)));
     PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup(0);
     for (int i = 0; i < 5; i++) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index f6bb254..2275a63 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -63,12 +63,22 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
-    public NodeAdditionResult addNode(Node node) {
+    public void addNode(Node node) {
+      return;
+    }
+
+    @Override
+    public NodeAdditionResult getNodeAdditionResult(Node node) {
       return null;
     }
 
     @Override
-    public NodeRemovalResult removeNode(Node node) {
+    public void removeNode(Node node) {
+      return;
+    }
+
+    @Override
+    public NodeRemovalResult getNodeRemovalResult() {
       return null;
     }
 
@@ -93,8 +103,8 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
-    public void deserialize(ByteBuffer buffer) {
-
+    public boolean deserialize(ByteBuffer buffer) {
+      return true;
     }
 
     @Override
@@ -108,8 +118,13 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     }
 
     @Override
+    public List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing) {
+      return null;
+    }
+
+    @Override
     public boolean judgeHoldSlot(Node node, int slot) {
-      return true;
+      return false;
     }
   };
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index c920f06..ca4fd92 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -898,8 +898,8 @@ public class DataGroupMemberTest extends MemberTest {
   public void testRemoveLeader() {
     System.out.println("Start testRemoveLeader()");
     Node nodeToRemove = TestUtils.getNode(10);
-    SlotNodeRemovalResult nodeRemovalResult = (SlotNodeRemovalResult) testMetaMember.getPartitionTable()
-        .removeNode(nodeToRemove);
+    testMetaMember.getPartitionTable().removeNode(nodeToRemove);
+    SlotNodeRemovalResult nodeRemovalResult = (SlotNodeRemovalResult) testMetaMember.getPartitionTable().getNodeRemovalResult();
     dataGroupMember.setLeader(nodeToRemove);
     dataGroupMember.start();
 
@@ -926,8 +926,9 @@ public class DataGroupMemberTest extends MemberTest {
   public void testRemoveNonLeader() {
     System.out.println("Start testRemoveNonLeader()");
     Node nodeToRemove = TestUtils.getNode(10);
-    NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable()
+    testMetaMember.getPartitionTable()
         .removeNode(nodeToRemove);
+    NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable().getNodeRemovalResult();
     dataGroupMember.setLeader(TestUtils.getNode(20));
     dataGroupMember.start();
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 1badcd2..a1e563e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -60,7 +60,9 @@ import org.apache.iotdb.cluster.exception.EmptyIntervalException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
 import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -423,7 +425,7 @@ public class MetaGroupMemberTest extends MemberTest {
             }
 
             @Override
-            public void exile(AsyncMethodCallback<Void> resultHandler) {
+            public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
               System.out.printf("%s was exiled%n", node);
               exiledNode = node;
             }
@@ -431,7 +433,7 @@ public class MetaGroupMemberTest extends MemberTest {
             @Override
             public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
               new Thread(() -> {
-                testMetaMember.applyRemoveNode(node);
+                testMetaMember.applyRemoveNode(new RemoveNodeLog(TestUtils.seralizePartitionTable, node));
                 resultHandler.onComplete(Response.RESPONSE_AGREE);
               }).start();
             }
@@ -554,7 +556,7 @@ public class MetaGroupMemberTest extends MemberTest {
     System.out.println("Start testAddNode()");
     Node newNode = TestUtils.getNode(10);
     testMetaMember.onElectionWins();
-    testMetaMember.applyAddNode(newNode);
+    testMetaMember.applyAddNode(new AddNodeLog(TestUtils.seralizePartitionTable, newNode));
     assertTrue(partitionTable.getAllNodes().contains(newNode));
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 955deb4..a2083d1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2178,7 +2178,7 @@ public class StorageGroupProcessor {
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.
    */
-  private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
+  private boolean loadTsFileByType(LoadTsFileType type, File tsFileToLoad,
       TsFileResource tsFileResource, long filePartitionId)
       throws LoadFileException, DiskSpaceInsufficientException {
     File targetFile;
@@ -2195,7 +2195,7 @@ public class StorageGroupProcessor {
         }
         tsFileManagement.add(tsFileResource, false);
         logger.info("Load tsfile in unsequence list, move file from {} to {}",
-            syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
+            tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath());
         break;
       case LOAD_SEQUENCE:
         targetFile =
@@ -2209,7 +2209,7 @@ public class StorageGroupProcessor {
         }
         tsFileManagement.add(tsFileResource, true);
         logger.info("Load tsfile in sequence list, move file from {} to {}",
-            syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
+            tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath());
         break;
       default:
         throw new LoadFileException(
@@ -2221,29 +2221,47 @@ public class StorageGroupProcessor {
       targetFile.getParentFile().mkdirs();
     }
     try {
-      FileUtils.moveFile(syncedTsFile, targetFile);
+      FileUtils.moveFile(tsFileToLoad, targetFile);
     } catch (IOException e) {
       logger.error("File renaming failed when loading tsfile. Origin: {}, Target: {}",
-          syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e);
+          tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e);
       throw new LoadFileException(String.format(
           "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
-          syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
+          tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
     }
 
-    File syncedResourceFile = fsFactory.getFile(
-        syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    File resourceFileToLoad = fsFactory.getFile(
+        tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     File targetResourceFile = fsFactory.getFile(
         targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     try {
-      FileUtils.moveFile(syncedResourceFile, targetResourceFile);
+      FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
     } catch (IOException e) {
       logger.error("File renaming failed when loading .resource file. Origin: {}, Target: {}",
-          syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e);
+          resourceFileToLoad.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e);
       throw new LoadFileException(String.format(
           "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
-          syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
+          resourceFileToLoad.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
           e.getMessage()));
     }
+
+    File modFileToLoad = fsFactory.getFile(
+        tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    if (modFileToLoad.exists()) {
+      File targetModFile = fsFactory.getFile(
+          targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+      try {
+        FileUtils.moveFile(modFileToLoad, targetModFile);
+      } catch (IOException e) {
+        logger.error("File renaming failed when loading .mod file. Origin: {}, Target: {}",
+            resourceFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(), e);
+        throw new LoadFileException(String.format(
+            "File renaming failed when loading .mod file. Origin: %s, Target: %s, because %s",
+            resourceFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(),
+            e.getMessage()));
+      }
+    }
+
     return true;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index d40f1eb..0d64b30 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -348,6 +349,10 @@ public abstract class PhysicalPlan {
           plan = new StorageGroupMNodePlan();
           plan.deserialize(buffer);
           break;
+        case CLUSTER_LOG:
+          plan = new LogPlan();
+          plan.deserialize(buffer);
+          break;
         default:
           throw new IOException("unrecognized log type " + type);
       }
@@ -361,7 +366,7 @@ public abstract class PhysicalPlan {
     REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER,
     DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, CREATE_MULTI_TIMESERIES,
     ALTER_TIMESERIES, FLUSH, CREATE_INDEX, DROP_INDEX,
-    CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE
+    CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, CLUSTER_LOG
   }
 
   public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
new file mode 100644
index 0000000..725c803
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+/**
+ * It's used by cluster to wrap log to plan
+ */
+public class LogPlan extends PhysicalPlan {
+
+  private ByteBuffer log;
+
+  public LogPlan() {
+    super(false);
+  }
+
+  public LogPlan(ByteBuffer log) {
+    super(false);
+    this.log = log;
+  }
+
+  public ByteBuffer getLog() {
+    return log;
+  }
+
+  public void setLog(ByteBuffer log) {
+    this.log = log;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeByte((byte) PhysicalPlanType.CLUSTER_LOG.ordinal());
+    stream.writeInt(log.array().length);
+    stream.write(log.array());
+  }
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    int len = buffer.getInt();
+    log = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+  }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b839c53..c41834b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -86,7 +86,8 @@ public enum TSStatusCode {
   NODE_READ_ONLY(704),
   CONSISTENCY_FAILURE(705),
   NO_CONNECTION(706),
-  NEED_REDIRECTION(707)
+  NEED_REDIRECTION(707),
+  PARSE_LOG_ERROR(708)
 
   ;
 
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 2a24106..124a154 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -335,7 +335,6 @@ service RaftService {
 }
 
 
-
 service TSDataService extends RaftService {
 
   /**
@@ -468,7 +467,7 @@ service TSMetaService extends RaftService {
   * the commit command by heartbeat since it has been removed, so the leader should tell it
   * directly that it is no longer in the cluster.
   **/
-  void exile()
+  void exile(binary removeNodeLog)
 
   TNodeStatus queryNodeStatus()
 


[iotdb] 04/04: 1. fix ut tests 2. The two-stage relative order problem of double logs is solved.

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 327eb7e80d7165931b4fa41d4bb1c25eebd38f65
Author: lta <li...@163.com>
AuthorDate: Thu Jan 14 15:40:58 2021 +0800

    1. fix ut tests
    2. The two-stage relative order problem of double logs is solved.
---
 .../apache/iotdb/cluster/config/ClusterConfig.java |   2 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |   2 +-
 .../iotdb/cluster/log/logtypes/AddNodeLog.java     |  14 ++-
 .../iotdb/cluster/log/logtypes/RemoveNodeLog.java  |  14 ++-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   4 +
 .../iotdb/cluster/partition/NodeRemovalResult.java |   1 +
 .../partition/balancer/DefaultSlotBalancer.java    | 118 ++++++++++++++++++
 .../cluster/partition/balancer/SlotBalancer.java   |  43 +++++++
 .../iotdb/cluster/partition/slot/SlotManager.java  |  17 ++-
 .../cluster/partition/slot/SlotPartitionTable.java | 115 +++++++-----------
 .../iotdb/cluster/partition/slot/SlotStrategy.java |  18 ++-
 .../iotdb/cluster/query/ClusterPlanRouter.java     |   2 +-
 .../iotdb/cluster/server/DataClusterServer.java    |   4 +-
 .../cluster/server/member/DataGroupMember.java     |  15 ++-
 .../cluster/server/member/MetaGroupMember.java     | 132 +++------------------
 .../iotdb/cluster/server/member/RaftMember.java    |  22 ++--
 .../iotdb/cluster/common/TestAsyncDataClient.java  |  27 +++--
 .../org/apache/iotdb/cluster/common/TestUtils.java |   2 +
 .../cluster/server/member/DataGroupMemberTest.java |  30 ++++-
 .../iotdb/cluster/server/member/MemberTest.java    |  34 ++++--
 .../cluster/server/member/MetaGroupMemberTest.java |  55 +++++++--
 .../apache/iotdb/db/qp/physical/sys/LogPlan.java   |   1 +
 22 files changed, 421 insertions(+), 251 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 05745fd..621149d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -46,7 +46,7 @@ public class ClusterConfig {
   private int replicationNum = 2;
 
   @ClusterConsistent
-  private int multiRaftFactor = 2;
+  private int multiRaftFactor = 1;
 
   @ClusterConsistent
   private String clusterName = "default";
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index 94437ae..aaa03a4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -82,7 +82,7 @@ public class MetaLogApplier extends BaseApplier {
     LogPlan plan = new LogPlan(log.serialize());
     TSStatus status = member.executeNonQueryPlan(plan);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new ChangeMembershipException(String.format("apply %s failed", log));
+      throw new ChangeMembershipException(String.format("apply %s failed with status {%s}", log, status));
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
index 824c3f2..380ba08 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
@@ -37,6 +37,8 @@ public class AddNodeLog extends Log {
 
   private Node newNode;
 
+  private long metaLogIndex;
+
   public AddNodeLog(ByteBuffer partitionTable, Node newNode) {
     this.partitionTable = partitionTable;
     this.newNode = newNode;
@@ -45,6 +47,14 @@ public class AddNodeLog extends Log {
   public AddNodeLog() {
   }
 
+  public long getMetaLogIndex() {
+    return metaLogIndex;
+  }
+
+  public void setMetaLogIndex(long metaLogIndex) {
+    this.metaLogIndex = metaLogIndex;
+  }
+
   public void setPartitionTable(ByteBuffer partitionTable) {
     this.partitionTable = partitionTable;
   }
@@ -68,10 +78,11 @@ public class AddNodeLog extends Log {
       dataOutputStream.writeByte(Types.ADD_NODE.ordinal());
       dataOutputStream.writeLong(getCurrLogIndex());
       dataOutputStream.writeLong(getCurrLogTerm());
+      dataOutputStream.writeLong(getMetaLogIndex());
 
       SerializeUtils.serialize(newNode, dataOutputStream);
 
-      dataOutputStream.write(partitionTable.array().length);
+      dataOutputStream.writeInt(partitionTable.array().length);
       dataOutputStream.write(partitionTable.array());
     } catch (IOException e) {
       // ignored
@@ -87,6 +98,7 @@ public class AddNodeLog extends Log {
     // ipLength(int), inBytes(byte[]), port(int), identifier(int), dataPort(int)
     setCurrLogIndex(buffer.getLong());
     setCurrLogTerm(buffer.getLong());
+    setMetaLogIndex(buffer.getLong());
 
     newNode = new Node();
     SerializeUtils.deserialize(newNode, buffer);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 800b77d..22af482 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -34,6 +34,8 @@ public class RemoveNodeLog extends Log {
 
   private Node removedNode;
 
+  private long metaLogIndex;
+
   public RemoveNodeLog(ByteBuffer partitionTable,
       Node removedNode) {
     this.partitionTable = partitionTable;
@@ -43,6 +45,14 @@ public class RemoveNodeLog extends Log {
   public RemoveNodeLog() {
   }
 
+  public long getMetaLogIndex() {
+    return metaLogIndex;
+  }
+
+  public void setMetaLogIndex(long metaLogIndex) {
+    this.metaLogIndex = metaLogIndex;
+  }
+
   public ByteBuffer getPartitionTable() {
     return partitionTable;
   }
@@ -58,10 +68,11 @@ public class RemoveNodeLog extends Log {
       dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
       dataOutputStream.writeLong(getCurrLogIndex());
       dataOutputStream.writeLong(getCurrLogTerm());
+      dataOutputStream.writeLong(getMetaLogIndex());
 
       SerializeUtils.serialize(removedNode, dataOutputStream);
 
-      dataOutputStream.write(partitionTable.array().length);
+      dataOutputStream.writeInt(partitionTable.array().length);
       dataOutputStream.write(partitionTable.array());
     } catch (IOException e) {
       // ignored
@@ -73,6 +84,7 @@ public class RemoveNodeLog extends Log {
   public void deserialize(ByteBuffer buffer) {
     setCurrLogIndex(buffer.getLong());
     setCurrLogTerm(buffer.getLong());
+    setMetaLogIndex(buffer.getLong());
 
     removedNode = new Node();
     SerializeUtils.deserialize(removedNode, buffer);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bd65c26..c57ee06 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -946,4 +946,8 @@ public abstract class RaftLogManager {
   public long getBlockAppliedCommitIndex() {
     return blockAppliedCommitIndex;
   }
+
+  public RaftLogManager(LogApplier logApplier) {
+    this.logApplier = logApplier;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 4193ffd..86ff9a2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -93,4 +93,5 @@ public class NodeRemovalResult {
       newGroupList.add(group);
     }
   }
+
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
new file mode 100644
index 0000000..eb1825f
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.partition.balancer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+
+/**
+ * This balancer aims to avg slots to all raft groups.
+ */
+public class DefaultSlotBalancer implements SlotBalancer {
+
+  private int multiRaftFactor =
+      ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
+  private SlotPartitionTable table;
+
+  public DefaultSlotBalancer(SlotPartitionTable partitionTable) {
+    this.table = partitionTable;
+  }
+
+  /**
+   * Move last slots from each group whose slot number is bigger than the new average to the new node.
+   */
+  @Override
+  public void moveSlotsToNew(Node newNode, List<Node> oldRing) {
+    Map<RaftNode, List<Integer>> nodeSlotMap = table.getAllNodeSlots();
+    Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = table.getPreviousNodeMap();
+    RaftNode[] slotNodes = table.getSlotNodes();
+
+    // as a node is added, the average slots for each node decrease
+    // move the slots to the new node if any previous node have more slots than the new average
+    int newAvg = table.getTotalSlotNumbers() / table.getAllNodes().size() / multiRaftFactor;
+    Map<RaftNode, List<Integer>> newNodeSlotMap = new HashMap<>();
+    int raftId = 0;
+    for (int i = 0; i < multiRaftFactor; i++) {
+      RaftNode raftNode = new RaftNode(newNode, i);
+      newNodeSlotMap.putIfAbsent(raftNode, new ArrayList<>());
+      previousNodeMap.putIfAbsent(raftNode, new HashMap<>());
+    }
+    for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
+      List<Integer> slots = entry.getValue();
+      int transferNum = slots.size() - newAvg;
+      if (transferNum > 0) {
+        RaftNode curNode = new RaftNode(newNode, raftId);
+        int numToMove = transferNum;
+        if (raftId != multiRaftFactor - 1) {
+          numToMove = Math.min(numToMove, newAvg - newNodeSlotMap.get(curNode).size());
+        }
+        List<Integer> slotsToMove = slots
+            .subList(slots.size() - transferNum, slots.size() - transferNum + numToMove);
+        newNodeSlotMap.get(curNode).addAll(slotsToMove);
+        for (Integer slot : slotsToMove) {
+          // record what node previously hold the integer
+          previousNodeMap.get(curNode).put(slot, table.getHeaderGroup(entry.getKey(), oldRing));
+          slotNodes[slot] = curNode;
+        }
+        transferNum -= numToMove;
+        if (transferNum > 0) {
+          curNode = new RaftNode(newNode, ++raftId);
+          slotsToMove = slots.subList(slots.size() - transferNum, slots.size());
+          newNodeSlotMap.get(curNode).addAll(slotsToMove);
+          for (Integer slot : slotsToMove) {
+            // record what node previously hold the integer
+            previousNodeMap.get(curNode).put(slot, table.getHeaderGroup(entry.getKey(), oldRing));
+            slotNodes[slot] = curNode;
+          }
+        }
+      }
+    }
+    nodeSlotMap.putAll(newNodeSlotMap);
+  }
+
+  @Override
+  public Map<RaftNode, List<Integer>> retrieveSlots(Node target) {
+    Map<RaftNode, List<Integer>> nodeSlotMap = table.getAllNodeSlots();
+    RaftNode[] slotNodes = table.getSlotNodes();
+    List<Node> nodeRing = table.getAllNodes();
+
+    Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>();
+    for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) {
+      RaftNode raftNode = new RaftNode(target, raftId);
+      List<Integer> slots = nodeSlotMap.remove(raftNode);
+      for (int i = 0; i < slots.size(); i++) {
+        int slot = slots.get(i);
+        RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), raftId);
+        slotNodes[slot] = newHolder;
+        nodeSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
+        newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
+      }
+    }
+    return newHolderSlotMap;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/SlotBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/SlotBalancer.java
new file mode 100644
index 0000000..95f806b
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/SlotBalancer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.partition.balancer;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+
+/**
+ * When add/remove node, the slots need to be redistributed.
+ */
+public interface SlotBalancer {
+
+  /**
+   * When add a new node, new raft groups will take over some hash slots from another raft groups.
+   */
+  void moveSlotsToNew(Node newNode, List<Node> oldRing);
+
+  /**
+   * When remove a old node, all hash slots of the removed groups will assigned to other raft groups.
+   * @param target the node to be removed
+   */
+  Map<RaftNode, List<Integer>> retrieveSlots(Node target);
+
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
index f0fc11c..7165145 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
@@ -1,5 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at      http://www.apache.org/licenses/LICENSE-2.0  Unless required by applicable law or ag [...]
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.iotdb.cluster.partition.slot;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index f441e4a..255fb22 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -1,5 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at      http://www.apache.org/licenses/LICENSE-2.0  Unless required by applicable law or ag [...]
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.iotdb.cluster.partition.slot;
@@ -27,6 +42,8 @@ import org.apache.iotdb.cluster.partition.NodeAdditionResult;
 import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
+import org.apache.iotdb.cluster.partition.balancer.DefaultSlotBalancer;
+import org.apache.iotdb.cluster.partition.balancer.SlotBalancer;
 import org.apache.iotdb.cluster.partition.slot.SlotStrategy.DefaultStrategy;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
@@ -78,6 +95,8 @@ public class SlotPartitionTable implements PartitionTable {
   // last log index that modifies the partition table
   private long lastLogIndex = -1;
 
+  private SlotBalancer slotBalancer = new DefaultSlotBalancer(this);
+
   /**
    * only used for deserialize.
    *
@@ -105,6 +124,14 @@ public class SlotPartitionTable implements PartitionTable {
     SlotPartitionTable.slotStrategy = slotStrategy;
   }
 
+  public SlotBalancer getLoadBalancer() {
+    return slotBalancer;
+  }
+
+  public void setLoadBalancer(SlotBalancer slotBalancer) {
+    this.slotBalancer = slotBalancer;
+  }
+
   private void init(Collection<Node> nodes) {
     logger.info("Initializing a new partition table");
     nodeRing.addAll(nodes);
@@ -167,7 +194,7 @@ public class SlotPartitionTable implements PartitionTable {
     return ret;
   }
 
-  private PartitionGroup getHeaderGroup(RaftNode raftNode, List<Node> nodeRing) {
+  public PartitionGroup getHeaderGroup(RaftNode raftNode, List<Node> nodeRing) {
     PartitionGroup ret = new PartitionGroup(raftNode.getRaftId());
 
     // assuming the nodes are [1,2,3,4,5]
@@ -284,7 +311,7 @@ public class SlotPartitionTable implements PartitionTable {
 
     // the slots movement is only done logically, the new node itself will pull data from the
     // old node
-    moveSlotsToNew(node, oldRing);
+    slotBalancer.moveSlotsToNew(node, oldRing);
 
   }
 
@@ -304,55 +331,6 @@ public class SlotPartitionTable implements PartitionTable {
     return result;
   }
 
-
-  /**
-   * Move last slots from each group whose slot number is bigger than the new average to the new
-   * node.
-   *
-   * @param newNode
-   */
-  private void moveSlotsToNew(Node newNode, List<Node> oldRing) {
-    // as a node is added, the average slots for each node decrease
-    // move the slots to the new node if any previous node have more slots than the new average
-    int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor;
-    int raftId = 0;
-    for (int i = 0; i < multiRaftFactor; i++) {
-      RaftNode raftNode = new RaftNode(newNode, i);
-      nodeSlotMap.putIfAbsent(raftNode, new ArrayList<>());
-      previousNodeMap.putIfAbsent(raftNode, new HashMap<>());
-    }
-    for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
-      List<Integer> slots = entry.getValue();
-      int transferNum = slots.size() - newAvg;
-      if (transferNum > 0) {
-        RaftNode curNode = new RaftNode(newNode, raftId);
-        int numToMove = transferNum;
-        if (raftId != multiRaftFactor - 1) {
-          numToMove = Math.min(numToMove, newAvg - nodeSlotMap.get(curNode).size());
-        }
-        List<Integer> slotsToMove = slots
-            .subList(slots.size() - transferNum, slots.size() - transferNum + numToMove);
-        nodeSlotMap.get(curNode).addAll(slotsToMove);
-        for (Integer slot : slotsToMove) {
-          // record what node previously hold the integer
-          previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
-          slotNodes[slot] = curNode;
-        }
-        transferNum -= numToMove;
-        if (transferNum > 0) {
-          curNode = new RaftNode(newNode, ++raftId);
-          slotsToMove = slots.subList(slots.size() - transferNum, slots.size());
-          nodeSlotMap.get(curNode).addAll(slotsToMove);
-          for (Integer slot : slotsToMove) {
-            // record what node previously hold the integer
-            previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
-            slotNodes[slot] = curNode;
-          }
-        }
-      }
-    }
-  }
-
   @Override
   public List<PartitionGroup> getLocalGroups() {
     return localGroups;
@@ -381,8 +359,8 @@ public class SlotPartitionTable implements PartitionTable {
         Map<Integer, PartitionGroup> prevHolders = nodeMapEntry.getValue();
         dataOutputStream.writeInt(prevHolders.size());
         for (Entry<Integer, PartitionGroup> integerNodeEntry : prevHolders.entrySet()) {
-          dataOutputStream.writeInt(integerNodeEntry.getKey());
           integerNodeEntry.getValue().serialize(dataOutputStream);
+          dataOutputStream.writeInt(integerNodeEntry.getKey());
         }
       }
 
@@ -398,7 +376,7 @@ public class SlotPartitionTable implements PartitionTable {
     long newLastLogIndex = buffer.getLong();
 
     // judge whether the partition table of byte buffer is out of date
-    if (lastLogIndex >= newLastLogIndex) {
+    if (lastLogIndex != -1 && lastLogIndex >= newLastLogIndex) {
       return lastLogIndex <= newLastLogIndex;
     }
     lastLogIndex = newLastLogIndex;
@@ -455,6 +433,10 @@ public class SlotPartitionTable implements PartitionTable {
     return nodeRing;
   }
 
+  public Map<RaftNode, Map<Integer, PartitionGroup>> getPreviousNodeMap() {
+    return previousNodeMap;
+  }
+
   public Map<Integer, PartitionGroup> getPreviousNodeMap(RaftNode raftNode) {
     return previousNodeMap.get(raftNode);
   }
@@ -488,7 +470,8 @@ public class SlotPartitionTable implements PartitionTable {
         Objects.equals(nodeRing, that.nodeRing) &&
         Objects.equals(nodeSlotMap, that.nodeSlotMap) &&
         Arrays.equals(slotNodes, that.slotNodes) &&
-        Objects.equals(previousNodeMap, that.previousNodeMap);
+        Objects.equals(previousNodeMap, that.previousNodeMap) &&
+        lastLogIndex == that.lastLogIndex;
   }
 
   @Override
@@ -548,7 +531,7 @@ public class SlotPartitionTable implements PartitionTable {
 
       // the slots movement is only done logically, the new node itself will pull data from the
       // old node
-      Map<RaftNode, List<Integer>> raftNodeListMap = retrieveSlots(target);
+      Map<RaftNode, List<Integer>> raftNodeListMap = slotBalancer.retrieveSlots(target);
       result.addNewSlotOwners(raftNodeListMap);
       this.nodeRemovalResult = result;
     }
@@ -559,22 +542,6 @@ public class SlotPartitionTable implements PartitionTable {
     return nodeRemovalResult;
   }
 
-  private Map<RaftNode, List<Integer>> retrieveSlots(Node target) {
-    Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>();
-    for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) {
-      RaftNode raftNode = new RaftNode(target, raftId);
-      List<Integer> slots = nodeSlotMap.remove(raftNode);
-      for (int i = 0; i < slots.size(); i++) {
-        int slot = slots.get(i);
-        RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), raftId);
-        slotNodes[slot] = newHolder;
-        nodeSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
-        newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
-      }
-    }
-    return newHolderSlotMap;
-  }
-
   @Override
   public List<PartitionGroup> getGlobalGroups() {
     // preventing a thread from getting incomplete globalGroups
@@ -609,4 +576,8 @@ public class SlotPartitionTable implements PartitionTable {
   public synchronized void setLastLogIndex(long lastLogIndex) {
     this.lastLogIndex = Math.max(this.lastLogIndex, lastLogIndex);
   }
+
+  public RaftNode[] getSlotNodes() {
+    return slotNodes;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java
index b5a45c0..5144bcb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java
@@ -1,8 +1,22 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at      http://www.apache.org/licenses/LICENSE-2.0  Unless required by applicable law or ag [...]
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-
 package org.apache.iotdb.cluster.partition.slot;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.HASH_SALT;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index 0c8cf25..487b88d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -144,7 +144,7 @@ public class ClusterPlanRouter {
     throw new UnsupportedPlanException(plan);
   }
 
-  private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan)
+  protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan)
       throws UnknownLogTypeException, UnsupportedPlanException {
     Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
     Log log = LogParser.getINSTANCE().parse(plan.getLog());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 16c1da6..3d233ed 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -494,7 +494,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   public void preAddNodeForDataGroup(AddNodeLog log, DataGroupMember targetDataGroupMember) {
     // Make sure the previous add/remove node log has applied
-    metaGroupMember.syncLeader();
+    metaGroupMember.waitUtil(log.getMetaLogIndex() - 1);
 
     // Check the validity of the partition table
     if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
@@ -601,7 +601,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
 
   public void preRemoveNodeForDataGroup(RemoveNodeLog log, DataGroupMember targetDataGroupMember) {
     // Make sure the previous add/remove node log has applied
-    metaGroupMember.syncLeader();
+    metaGroupMember.waitUtil(log.getMetaLogIndex() - 1);
 
     // Check the validity of the partition table
     if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 077d61d..df45b53 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -261,9 +261,9 @@ public class DataGroupMember extends RaftMember {
     }
   }
 
-  public void preAddNode(Node node) {
+  public boolean preAddNode(Node node) {
     if (allNodes.contains(node)) {
-      return;
+      return false;
     }
     synchronized (allNodes) {
       int insertIndex = -1;
@@ -288,6 +288,7 @@ public class DataGroupMember extends RaftMember {
         // the group
         logger.debug("{}: Node {} is inserted into the data group {}", name, node, allNodes);
       }
+      return insertIndex > 0;
     }
   }
 
@@ -299,6 +300,7 @@ public class DataGroupMember extends RaftMember {
    * otherwise
    */
   public boolean addNode(Node node, NodeAdditionResult result) {
+    syncLeader();
 
     // mark slots that do not belong to this group any more
     Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots()
@@ -604,9 +606,9 @@ public class DataGroupMember extends RaftMember {
    * @param partitionId
    * @param isSeq
    */
-  void closePartition(String storageGroupName, long partitionId, boolean isSeq) {
+  boolean closePartition(String storageGroupName, long partitionId, boolean isSeq) {
     if (character != NodeCharacter.LEADER) {
-      return;
+      return false;
     }
     CloseFileLog log = new CloseFileLog(storageGroupName, partitionId, isSeq);
     synchronized (logManager) {
@@ -618,10 +620,11 @@ public class DataGroupMember extends RaftMember {
       logger.info("Send the close file request of {} to other nodes", log);
     }
     try {
-      appendLogInGroup(log);
+      return appendLogInGroup(log);
     } catch (LogExecutionException e) {
       logger.error("Cannot close partition {}#{} seq:{}", storageGroupName, partitionId, isSeq, e);
     }
+    return false;
   }
 
   public boolean flushFileWhenDoSnapshot(
@@ -762,6 +765,8 @@ public class DataGroupMember extends RaftMember {
    */
   @SuppressWarnings("java:S2445") // the reference of allNodes is unchanged
   public void removeNode(Node removedNode, NodeRemovalResult removalResult) {
+    syncLeader();
+
     synchronized (allNodes) {
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 22520e7..1534323 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -302,20 +302,24 @@ public class MetaGroupMember extends RaftMember {
    * closed by the method.
    *
    */
-  public void closePartition(String storageGroupName, long partitionId, boolean isSeq) {
+  public boolean closePartition(String storageGroupName, long partitionId, boolean isSeq) {
     RaftNode raftNode = partitionTable.routeToHeaderByTime(storageGroupName,
         partitionId * StorageEngine.getTimePartitionInterval());
     DataGroupMember localDataMember = getLocalDataMember(raftNode);
     if (localDataMember == null || localDataMember.getCharacter() != NodeCharacter.LEADER) {
-      return;
+      return false;
     }
-    localDataMember.closePartition(storageGroupName, partitionId, isSeq);
+    return localDataMember.closePartition(storageGroupName, partitionId, isSeq);
   }
 
   public DataClusterServer getDataClusterServer() {
     return dataClusterServer;
   }
 
+  public void setDataClusterServer(DataClusterServer dataClusterServer) {
+    this.dataClusterServer = dataClusterServer;
+  }
+
   public DataHeartbeatServer getDataHeartbeatServer() {
     return dataHeartbeatServer;
   }
@@ -864,6 +868,7 @@ public class MetaGroupMember extends RaftMember {
       addNodeLog.setPartitionTable(partitionTable.serialize());
       addNodeLog.setCurrLogTerm(getTerm().get());
       addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+      addNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
 
       addNodeLog.setNewNode(node);
 
@@ -1062,120 +1067,6 @@ public class MetaGroupMember extends RaftMember {
     return null;
   }
 
-  /**
-   * Send the log the all data groups and return a success only when each group's quorum has
-   * accepted this log.
-   */
-//  private AppendLogResult sendLogToAllGroups(Log log) {
-//    List<Node> nodeRing = partitionTable.getAllNodes();
-//
-//    AtomicLong newLeaderTerm = new AtomicLong(term.get());
-//    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
-//    AppendEntryRequest request = buildAppendEntryRequest(log, true);
-//
-//    // ask for votes from each node
-//    int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm);
-//
-//    if (!leaderShipStale.get()) {
-//      // if all quorums of all groups have received this log, it is considered succeeded.
-//      for (int remaining : groupRemainings) {
-//        if (remaining > 0) {
-//          return AppendLogResult.TIME_OUT;
-//        }
-//      }
-//    } else {
-//      return AppendLogResult.LEADERSHIP_STALE;
-//    }
-//
-//    return AppendLogResult.OK;
-//  }
-
-  /**
-   * Send "request" to each node in "nodeRing" and when a node returns a success, decrease all
-   * counters of the groups it is in of "groupRemainings"
-   *
-   * @return a int array indicating how many votes are left in each group to make an agreement
-   */
-  @SuppressWarnings({"java:S2445", "java:S2274"})
-  // groupRemaining is shared with the handlers,
-  // and we do not wait infinitely to enable timeouts
-//  private int[] askGroupVotes(List<Node> nodeRing,
-//      AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log,
-//      AtomicLong newLeaderTerm) {
-//    // each node will be the header of a group, we use the node to represent the group
-//    int nodeSize = nodeRing.size();
-//    // the decreasing counters of how many nodes in a group has received the log, each time a
-//    // node receive the log, the counters of all groups it is in will decrease by 1
-//    int[] groupRemainings = new int[nodeSize];
-//    // a group is considered successfully received the log if such members receive the log
-//    int groupQuorum = REPLICATION_NUM / 2 + 1;
-//    Arrays.fill(groupRemainings, groupQuorum);
-//
-//    synchronized (groupRemainings) {
-//      // ask a vote from every node
-//      for (int i = 0; i < nodeSize; i++) {
-//        Node node = nodeRing.get(i);
-//        if (node.equals(thisNode)) {
-//          // this node automatically gives an agreement, decrease counters of all groups the local
-//          // node is in
-//          for (int j = 0; j < REPLICATION_NUM; j++) {
-//            int groupIndex = i - j;
-//            if (groupIndex < 0) {
-//              groupIndex += groupRemainings.length;
-//            }
-//            groupRemainings[groupIndex]--;
-//          }
-//        } else {
-//          askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, newLeaderTerm,
-//              request);
-//        }
-//      }
-//
-//      try {
-//        groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS());
-//      } catch (InterruptedException e) {
-//        Thread.currentThread().interrupt();
-//        logger.error("Unexpected interruption when waiting for the group votes", e);
-//      }
-//    }
-//    return groupRemainings;
-//  }
-
-  private void askRemoteGroupVote(Node node, int[] groupRemainings, int nodeIndex,
-      AtomicBoolean leaderShipStale, Log log,
-      AtomicLong newLeaderTerm, AppendEntryRequest request) {
-    AppendGroupEntryHandler handler = new AppendGroupEntryHandler(groupRemainings,
-        nodeIndex, node, leaderShipStale, log, newLeaderTerm, this);
-    if (config.isUseAsyncServer()) {
-      AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(node);
-      try {
-        if (client != null) {
-          client.appendEntry(request, handler);
-        }
-      } catch (TException e) {
-        logger.error("Cannot send log to node {}", node, e);
-      }
-    } else {
-      SyncMetaClient client = (SyncMetaClient) getSyncClient(node);
-      if (client == null) {
-        logger.error("No available client for {}", node);
-        return;
-      }
-      getSerialToParallelPool().submit(() -> {
-        try {
-          handler.onComplete(client.appendEntry(request));
-        } catch (TException e) {
-          client.getInputProtocol().getTransport().close();
-          handler.onError(e);
-        } finally {
-          ClientUtils.putBackSyncClient(client);
-        }
-      });
-    }
-
-  }
-
-
   public Set<Node> getIdConflictNodes() {
     return idConflictNodes;
   }
@@ -1519,7 +1410,7 @@ public class MetaGroupMember extends RaftMember {
    *
    * @param planGroupMap sub-plan -> belong data group pairs
    */
-  private TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
     // the error codes from the groups that cannot execute the plan
     TSStatus status;
     if (planGroupMap.size() == 1) {
@@ -2009,6 +1900,7 @@ public class MetaGroupMember extends RaftMember {
       removeNodeLog.setPartitionTable(partitionTable.serialize());
       removeNodeLog.setCurrLogTerm(getTerm().get());
       removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+      removeNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
 
       removeNodeLog.setRemovedNode(target);
 
@@ -2203,4 +2095,8 @@ public class MetaGroupMember extends RaftMember {
   public void setClientProvider(DataClientProvider dataClientProvider) {
     this.dataClientProvider = dataClientProvider;
   }
+
+  public void setRouter(ClusterPlanRouter router) {
+    this.router = router;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 57f22bc..121892b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -275,6 +275,7 @@ public abstract class RaftMember {
     this.asyncHeartbeatClientPool = asyncHeartbeatPool;
     this.syncHeartbeatClientPool = syncHeartbeatPool;
     this.asyncSendLogClientPool = asyncSendLogClientPool;
+    this.stopStatus = new StopStatus();
   }
 
   /**
@@ -836,8 +837,6 @@ public abstract class RaftMember {
    * @return true if this node has caught up before timeout, false otherwise
    */
   private boolean waitUntilCatchUp() {
-    long startTime = System.currentTimeMillis();
-    long waitedTime = 0;
     long leaderCommitId;
     try {
       leaderCommitId = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer() ?
@@ -854,16 +853,25 @@ public abstract class RaftMember {
       logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
       return false;
     }
+    logger.debug("Start to sync with leader, leader commit id is {}", leaderCommitId);
+    return waitUtil(leaderCommitId);
+  }
 
+  /**
+   * Wait until local commit index becomes not less than target log index
+   */
+  public boolean waitUtil(long targetLogIndex) {
+    long startTime = System.currentTimeMillis();
+    long waitedTime = 0;
     while (waitedTime < RaftServer.getSyncLeaderMaxWaitMs()) {
       try {
         long localAppliedId = logManager.getMaxHaveAppliedCommitIndex();
-        logger.debug("{}: synchronizing commitIndex {}/{}", name, localAppliedId, leaderCommitId);
-        if (leaderCommitId <= localAppliedId) {
+        logger.debug("{}: synchronizing commitIndex {}/{}", name, localAppliedId, targetLogIndex);
+        if (targetLogIndex <= localAppliedId) {
           // this node has caught up
           if (logger.isDebugEnabled()) {
             waitedTime = System.currentTimeMillis() - startTime;
-            logger.debug("{}: synchronized with the leader after {}ms", name, waitedTime);
+            logger.debug("{}: synchronized to target index {} after {}ms", name, targetLogIndex, waitedTime);
           }
           return true;
         }
@@ -879,7 +887,7 @@ public abstract class RaftMember {
         logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
       }
     }
-    logger.warn("{}: Failed to synchronize with the leader after {}ms", name, waitedTime);
+    logger.warn("{}: Failed to synchronize to target index {} after {}ms", name, targetLogIndex, waitedTime);
     return false;
   }
 
@@ -911,13 +919,13 @@ public abstract class RaftMember {
     } else {
       log = new PhysicalPlanLog();
       ((PhysicalPlanLog)log).setPlan(plan);
+      plan.setIndex(log.getCurrLogIndex());
     }
     // assign term and index to the new log and append it
     synchronized (logManager) {
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
-      plan.setIndex(log.getCurrLogIndex());
       logManager.append(log);
     }
     Timer.Statistic.RAFT_SENDER_APPEND_LOG.calOperationCostTimeFromStart(startTime);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 51f0c8a..62cd0c8 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MemberTest;
@@ -53,10 +54,10 @@ import org.apache.thrift.async.AsyncMethodCallback;
 public class TestAsyncDataClient extends AsyncDataClient {
 
   private PlanExecutor planExecutor;
-  private Map<Node, DataGroupMember> dataGroupMemberMap;
+  private Map<RaftNode, DataGroupMember> dataGroupMemberMap;
 
   public TestAsyncDataClient(Node node,
-      Map<Node, DataGroupMember> dataGroupMemberMap)
+      Map<RaftNode, DataGroupMember> dataGroupMemberMap)
       throws IOException {
     super(null, null, node, null);
     this.dataGroupMemberMap = dataGroupMemberMap;
@@ -70,35 +71,35 @@ public class TestAsyncDataClient extends AsyncDataClient {
   @Override
   public void fetchSingleSeries(Node header, int raftId, long readerId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeries(header, raftId, readerId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(header,0))).fetchSingleSeries(header, raftId, readerId,
         resultHandler)).start();
   }
 
   @Override
   public void getAggrResult(GetAggrResultRequest request,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).getAggrResult(request,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).getAggrResult(request,
         resultHandler)).start();
   }
 
   @Override
   public void querySingleSeries(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).querySingleSeries(request,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).querySingleSeries(request,
         resultHandler)).start();
   }
 
   @Override
   public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeriesByTimestamp(header, raftId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(header,0))).fetchSingleSeriesByTimestamp(header, raftId,
         readerId, time, resultHandler)).start();
   }
 
   @Override
   public void getAllPaths(Node header, int raftId, List<String> paths, boolean withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getAllPaths(header, raftId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(header,0))).getAllPaths(header, raftId,
         paths, withAlias, resultHandler)).start();
   }
 
@@ -152,39 +153,39 @@ public class TestAsyncDataClient extends AsyncDataClient {
   @Override
   public void pullTimeSeriesSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).pullTimeSeriesSchema(request, resultHandler)).start();
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).pullTimeSeriesSchema(request, resultHandler)).start();
   }
 
   @Override
   public void pullMeasurementSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).pullMeasurementSchema(request,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).pullMeasurementSchema(request,
         resultHandler)).start();
   }
 
   @Override
   public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).querySingleSeriesByTimestamp(request,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).querySingleSeriesByTimestamp(request,
         resultHandler)).start();
   }
 
   @Override
   public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).getGroupByExecutor(request,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).getGroupByExecutor(request,
         resultHandler)).start();
   }
 
   @Override
   public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getGroupByResult(header, raftId, executorId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(header,0))).getGroupByResult(header, raftId, executorId,
         startTime, endTime, resultHandler)).start();
   }
 
   @Override
   public void previousFill(PreviousFillRequest request,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).previousFill(request, resultHandler)).start();
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).previousFill(request, resultHandler)).start();
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
index 1f1f3ba..17d04a1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
@@ -103,6 +103,8 @@ public class TestUtils {
         .setReplicationNumber(ClusterDescriptor.getInstance().getConfig().getReplicationNum());
     startUpStatus
         .setClusterName(ClusterDescriptor.getInstance().getConfig().getClusterName());
+    startUpStatus
+        .setMultiRaftFactor(ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor());
     List<Node> seedNodeList = new ArrayList<>();
     for (int i = 0; i < 100; i += 10) {
       seedNodeList.add(getNode(i));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index ca4fd92..fbffb7f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.slot.SlotNodeAdditionResult;
 import org.apache.iotdb.cluster.partition.slot.SlotNodeRemovalResult;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
 import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
@@ -71,6 +72,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
@@ -165,9 +167,9 @@ public class DataGroupMemberTest extends MemberTest {
     };
   }
 
-  DataGroupMember getDataGroupMember(Node node) {
-    PartitionGroup nodes = partitionTable.getHeaderGroup(node);
-    return dataGroupMemberMap.computeIfAbsent(node, n -> getDataGroupMember(n, nodes));
+  DataGroupMember getDataGroupMember(RaftNode raftNode) {
+    PartitionGroup nodes = partitionTable.getHeaderGroup(raftNode.getNode());
+    return dataGroupMemberMap.computeIfAbsent(raftNode, n -> getDataGroupMember(n.getNode(), nodes));
   }
 
   private DataGroupMember getDataGroupMember(Node node, PartitionGroup nodes) {
@@ -179,6 +181,11 @@ public class DataGroupMemberTest extends MemberTest {
       }
 
       @Override
+      public long appendEntry(AppendEntryRequest request) {
+        return Response.RESPONSE_AGREE;
+      }
+
+      @Override
       public void updateHardState(long currentTerm, Node leader) {
       }
 
@@ -267,16 +274,25 @@ public class DataGroupMemberTest extends MemberTest {
 
     try {
       Node newNodeBeforeGroup = TestUtils.getNode(-5);
+      assertFalse(firstMember.preAddNode(newNodeBeforeGroup));
+      assertFalse(midMember.preAddNode(newNodeBeforeGroup));
+      assertFalse(lastMember.preAddNode(newNodeBeforeGroup));
       assertFalse(firstMember.addNode(newNodeBeforeGroup, result));
       assertFalse(midMember.addNode(newNodeBeforeGroup, result));
       assertFalse(lastMember.addNode(newNodeBeforeGroup, result));
 
       Node newNodeInGroup = TestUtils.getNode(66);
+      assertTrue(firstMember.preAddNode(newNodeInGroup));
+      assertTrue(midMember.preAddNode(newNodeInGroup));
+      assertTrue(lastMember.preAddNode(newNodeInGroup));
       assertFalse(firstMember.addNode(newNodeInGroup, result));
       assertFalse(midMember.addNode(newNodeInGroup, result));
       assertTrue(lastMember.addNode(newNodeInGroup, result));
 
       Node newNodeAfterGroup = TestUtils.getNode(101);
+      assertFalse(firstMember.preAddNode(newNodeAfterGroup));
+      assertFalse(midMember.preAddNode(newNodeAfterGroup));
+      assertFalse(lastMember.preAddNode(newNodeAfterGroup));
       assertFalse(firstMember.addNode(newNodeAfterGroup, result));
       assertFalse(midMember.addNode(newNodeAfterGroup, result));
     } finally {
@@ -480,6 +496,7 @@ public class DataGroupMemberTest extends MemberTest {
   @Test
   public void testForwardPullSnapshot() {
     System.out.println("Start testForwardPullSnapshot()");
+    hasInitialSnapshots = true;
     dataGroupMember.setCharacter(NodeCharacter.FOLLOWER);
     dataGroupMember.setLeader(TestUtils.getNode(1));
     PullSnapshotRequest request = new PullSnapshotRequest();
@@ -904,13 +921,15 @@ public class DataGroupMemberTest extends MemberTest {
     dataGroupMember.start();
 
     try {
+      dataGroupMember.preRemoveNode(nodeToRemove);
       dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult);
 
       assertEquals(NodeCharacter.ELECTOR, dataGroupMember.getCharacter());
       assertEquals(Long.MIN_VALUE, dataGroupMember.getLastHeartbeatReceivedTime());
       assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30)));
       assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove));
-      List<Integer> newSlots = nodeRemovalResult.getNewSlotOwners().get(TestUtils.getNode(0));
+      List<Integer> newSlots = nodeRemovalResult.getNewSlotOwners()
+          .get(new RaftNode(TestUtils.getNode(0), raftId));
       while (newSlots.size() != pulledSnapshots.size()) {
 
       }
@@ -933,13 +952,14 @@ public class DataGroupMemberTest extends MemberTest {
     dataGroupMember.start();
 
     try {
+      dataGroupMember.preRemoveNode(nodeToRemove);
       dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult);
 
       assertEquals(0, dataGroupMember.getLastHeartbeatReceivedTime());
       assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30)));
       assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove));
       List<Integer> newSlots =
-          ((SlotNodeRemovalResult) nodeRemovalResult).getNewSlotOwners().get(TestUtils.getNode(0));
+          ((SlotNodeRemovalResult) nodeRemovalResult).getNewSlotOwners().get(new RaftNode(TestUtils.getNode(0), 0));
       while (newSlots.size() != pulledSnapshots.size()) {
 
       }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 6065370..8c1beef 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.server.member;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,8 +51,10 @@ import org.apache.iotdb.cluster.metadata.MetaPuller;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.query.ClusterPlanRouter;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.NodeCharacter;
@@ -60,6 +63,8 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -73,7 +78,7 @@ public class MemberTest {
 
   public static AtomicLong dummyResponse = new AtomicLong(Response.RESPONSE_AGREE);
 
-  Map<Node, DataGroupMember> dataGroupMemberMap;
+  Map<RaftNode, DataGroupMember> dataGroupMemberMap;
   private Map<Node, MetaGroupMember> metaGroupMemberMap;
   PartitionGroup allNodes;
   protected MetaGroupMember testMetaMember;
@@ -95,7 +100,6 @@ public class MemberTest {
     prevUseAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
     preLogBufferSize = ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize();
     ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
-    ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(4096);
     testThreadPool = Executors.newFixedThreadPool(4);
     prevLeaderWait = RaftMember.getWaitLeaderTimeMs();
     RaftMember.setWaitLeaderTimeMs(10);
@@ -105,7 +109,12 @@ public class MemberTest {
       allNodes.add(TestUtils.getNode(i));
     }
 
-    partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
+    partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0)) {
+      @Override
+      public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) {
+        return new RaftNode(TestUtils.getNode(0), 0);
+      }
+    };
 
     dataGroupMemberMap = new HashMap<>();
     metaGroupMemberMap = new HashMap<>();
@@ -170,11 +179,15 @@ public class MemberTest {
   }
 
   DataGroupMember getDataGroupMember(Node node) {
+    return getDataGroupMember(new RaftNode(node, 0));
+  }
+
+  DataGroupMember getDataGroupMember(RaftNode node) {
     return dataGroupMemberMap.computeIfAbsent(node, this::newDataGroupMember);
   }
 
-  private DataGroupMember newDataGroupMember(Node node) {
-    DataGroupMember newMember = new TestDataGroupMember(node, partitionTable.getHeaderGroup(node)) {
+  private DataGroupMember newDataGroupMember(RaftNode raftNode) {
+    DataGroupMember newMember = new TestDataGroupMember(raftNode.getNode(), partitionTable.getHeaderGroup(raftNode)) {
 
       @Override
       public boolean syncLeader() {
@@ -200,9 +213,9 @@ public class MemberTest {
         return getAsyncClient(node);
       }
     };
-    newMember.setThisNode(node);
+    newMember.setThisNode(raftNode.getNode());
     newMember.setMetaGroupMember(testMetaMember);
-    newMember.setLeader(node);
+    newMember.setLeader(raftNode.getNode());
     newMember.setCharacter(NodeCharacter.LEADER);
     newMember
         .setLogManager(
@@ -232,11 +245,16 @@ public class MemberTest {
       @Override
       public DataGroupMember getLocalDataMember(Node header, int raftId,
           Object request) {
-        return getDataGroupMember(header);
+        return getDataGroupMember(new RaftNode(header, raftId));
       }
 
       @Override
       public DataGroupMember getLocalDataMember(Node header, int raftId) {
+        return getDataGroupMember(new RaftNode(header, raftId));
+      }
+
+      @Override
+      public DataGroupMember getLocalDataMember(RaftNode header) {
         return getDataGroupMember(header);
       }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index a1e563e..94f5067 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -51,6 +51,8 @@ import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.common.TestAsyncClient;
 import org.apache.iotdb.cluster.common.TestAsyncDataClient;
 import org.apache.iotdb.cluster.common.TestAsyncMetaClient;
+import org.apache.iotdb.cluster.common.TestLogApplier;
+import org.apache.iotdb.cluster.common.TestLogManager;
 import org.apache.iotdb.cluster.common.TestPartitionedLogManager;
 import org.apache.iotdb.cluster.common.TestSnapshot;
 import org.apache.iotdb.cluster.common.TestUtils;
@@ -60,6 +62,9 @@ import org.apache.iotdb.cluster.exception.EmptyIntervalException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
@@ -68,6 +73,7 @@ import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.query.ClusterPlanRouter;
 import org.apache.iotdb.cluster.query.LocalQueryExecutor;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
@@ -82,6 +88,7 @@ import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
@@ -113,6 +120,7 @@ import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -133,6 +141,7 @@ import org.apache.thrift.protocol.TCompactProtocol.Factory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class MetaGroupMemberTest extends MemberTest {
@@ -177,6 +186,12 @@ public class MetaGroupMemberTest extends MemberTest {
 
     buildDataGroups(dataClusterServer);
     testMetaMember.getThisNode().setNodeIdentifier(0);
+    testMetaMember.setRouter(new ClusterPlanRouter(testMetaMember.getPartitionTable()){
+      @Override
+      protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan) {
+        return Collections.singletonMap(plan, partitionTable.getHeaderGroup(testMetaMember.getThisNode()));
+      }
+    });
     mockDataClusterServer = false;
     QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember);
     exiledNode = null;
@@ -207,6 +222,11 @@ public class MetaGroupMemberTest extends MemberTest {
       }
 
       @Override
+      protected AppendLogResult sendLogToFollowers(Log log) {
+        return AppendLogResult.OK;
+      }
+
+      @Override
       public AsyncClient getAsyncClient(Node node) {
         return getClient(node);
       }
@@ -287,6 +307,7 @@ public class MetaGroupMemberTest extends MemberTest {
     return resp;
   }
 
+  @Override
   protected MetaGroupMember getMetaGroupMember(Node node) throws QueryProcessException {
     MetaGroupMember metaGroupMember = new MetaGroupMember(new Factory(), node) {
 
@@ -309,11 +330,16 @@ public class MetaGroupMemberTest extends MemberTest {
       @Override
       public DataGroupMember getLocalDataMember(Node header, int raftId,
           Object request) {
-        return getDataGroupMember(header);
+        return getDataGroupMember(new RaftNode(header, raftId));
       }
 
       @Override
       public DataGroupMember getLocalDataMember(Node header, int raftId) {
+        return getDataGroupMember(new RaftNode(header, raftId));
+      }
+
+      @Override
+      public DataGroupMember getLocalDataMember(RaftNode header) {
         return getDataGroupMember(header);
       }
 
@@ -433,7 +459,7 @@ public class MetaGroupMemberTest extends MemberTest {
             @Override
             public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
               new Thread(() -> {
-                testMetaMember.applyRemoveNode(new RemoveNodeLog(TestUtils.seralizePartitionTable, node));
+                testMetaMember.applyRemoveNode(new RemoveNodeLog(partitionTable.serialize(), node));
                 resultHandler.onComplete(Response.RESPONSE_AGREE);
               }).start();
             }
@@ -503,7 +529,7 @@ public class MetaGroupMemberTest extends MemberTest {
     }
 
     ExecutorService testThreadPool = Executors.newFixedThreadPool(4);
-    testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true);
+    assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
 
     StorageGroupProcessor processor =
         StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
@@ -522,15 +548,13 @@ public class MetaGroupMemberTest extends MemberTest {
       // the net work is down
       dummyResponse.set(Long.MIN_VALUE);
 
-      // network resume in 100ms
-      new Thread(() -> {
-        await().atLeast(200, TimeUnit.MILLISECONDS);
-        dummyResponse.set(Response.RESPONSE_AGREE);
-      }).start();
-
       System.out.println("Close the first file");
+      assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
+      assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty());
 
-      testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true);
+      // network resume in 100ms
+      dummyResponse.set(Response.RESPONSE_AGREE);
+      assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
       assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
 
       System.out.println("Create the second file");
@@ -540,10 +564,11 @@ public class MetaGroupMemberTest extends MemberTest {
         PlanExecutor planExecutor = new PlanExecutor();
         planExecutor.processNonQuery(insertPlan);
       }
+
       // indicating the leader is stale
       System.out.println("Close the second file");
       dummyResponse.set(100);
-      testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true);
+      assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
       assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty());
     } finally {
       RaftServer.setConnectionTimeoutInMS(prevTimeout);
@@ -554,9 +579,10 @@ public class MetaGroupMemberTest extends MemberTest {
   @Test
   public void testAddNode() {
     System.out.println("Start testAddNode()");
-    Node newNode = TestUtils.getNode(10);
+    Node newNode = TestUtils.getNode(11);
+    testMetaMember.getPartitionTable().addNode(newNode);
     testMetaMember.onElectionWins();
-    testMetaMember.applyAddNode(new AddNodeLog(TestUtils.seralizePartitionTable, newNode));
+    testMetaMember.applyAddNode(new AddNodeLog(testMetaMember.getPartitionTable().serialize(), newNode));
     assertTrue(partitionTable.getAllNodes().contains(newNode));
   }
 
@@ -867,6 +893,7 @@ public class MetaGroupMemberTest extends MemberTest {
   public void testProcessValidHeartbeatReq() throws QueryProcessException {
     System.out.println("Start testProcessValidHeartbeatReq()");
     MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10));
+    partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
     try {
       HeartBeatRequest request = new HeartBeatRequest();
       request.setRequireIdentifier(true);
@@ -1105,6 +1132,7 @@ public class MetaGroupMemberTest extends MemberTest {
     AtomicReference<Long> resultRef = new AtomicReference<>();
     testMetaMember.setLeader(testMetaMember.getThisNode());
     testMetaMember.setCharacter(LEADER);
+
     doRemoveNode(resultRef, testMetaMember.getThisNode());
 
     assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
@@ -1203,6 +1231,7 @@ public class MetaGroupMemberTest extends MemberTest {
 
       @Override
       public void onError(Exception e) {
+        resultRef.set(-1L);
         e.printStackTrace();
       }
     });
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
index 725c803..bdc19c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
@@ -44,6 +44,7 @@ public class LogPlan extends PhysicalPlan {
   }
 
   public ByteBuffer getLog() {
+    log.clear();
     return log;
   }
 


[iotdb] 01/04: fix some issues of multi-raft

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 109d432adecdee83d2881e54d07a91ddc87e1761
Author: lta <li...@163.com>
AuthorDate: Tue Jan 5 11:48:53 2021 +0800

    fix some issues of multi-raft
---
 .../resources/conf/iotdb-cluster.properties        |  2 +-
 .../cluster/client/sync/SyncClientAdaptor.java     |  4 +--
 .../cluster/log/catchup/SnapshotCatchUpTask.java   |  1 +
 .../manage/FilePartitionedSnapshotLogManager.java  |  2 +-
 .../iotdb/cluster/log/snapshot/FileSnapshot.java   | 11 ++++---
 .../cluster/log/snapshot/PullSnapshotTask.java     |  3 +-
 .../log/snapshot/PullSnapshotTaskDescriptor.java   |  3 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  1 +
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |  1 +
 .../cluster/partition/slot/SlotPartitionTable.java |  9 ++---
 .../iotdb/cluster/server/DataClusterServer.java    | 18 +++++-----
 .../iotdb/cluster/server/MetaClusterServer.java    | 16 ++++-----
 .../cluster/server/member/DataGroupMember.java     |  9 ++---
 .../cluster/server/member/MetaGroupMember.java     | 38 +++++++++++++---------
 .../cluster/server/service/BaseAsyncService.java   |  4 +--
 .../cluster/server/service/BaseSyncService.java    |  4 +--
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |  8 ++++-
 .../cluster/client/sync/SyncClientAdaptorTest.java |  7 ++--
 .../iotdb/cluster/common/TestAsyncDataClient.java  |  2 +-
 .../cluster/log/snapshot/DataSnapshotTest.java     |  7 ++--
 .../cluster/log/snapshot/PullSnapshotTaskTest.java |  2 +-
 .../cluster/server/member/DataGroupMemberTest.java |  2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  4 +--
 thrift/src/main/thrift/cluster.thrift              |  6 ++--
 24 files changed, 93 insertions(+), 71 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 7cd8c22..4b9b2f5 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -63,7 +63,7 @@ max_concurrent_client_num=10000
 default_replica_num=2
 
 # sub raft num for multi-raft
-multi_raft_factor=2
+multi_raft_factor=1
 
 # cluster name to identify different clusters
 # all node's cluster_name in one cluster are the same
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 7a31957..019a08b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -387,12 +387,12 @@ public class SyncClientAdaptor {
   }
 
   public static ByteBuffer readFile(AsyncDataClient client, String remotePath, long offset,
-      int fetchSize)
+      int fetchSize, int raftId)
       throws InterruptedException, TException {
     AtomicReference<ByteBuffer> result = new AtomicReference<>();
     GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), result);
 
-    client.readFile(remotePath, offset, fetchSize, handler);
+    client.readFile(remotePath, offset, fetchSize, raftId, handler);
     return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
index 548a2db..483ba64 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
@@ -59,6 +59,7 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
   private void doSnapshotCatchUp()
       throws TException, InterruptedException, LeaderUnknownException {
     SendSnapshotRequest request = new SendSnapshotRequest();
+    request.setRaftId(raftMember.getRaftGroupId());
     if (raftMember.getHeader() != null) {
       request.setHeader(raftMember.getHeader());
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index a3b0153..79f3cd1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -113,7 +113,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
     // 1.collect tsfile
     collectTsFiles();
 
-    //2.register the measurement
+    // 2.register the measurement
     for (Map.Entry<Integer, Collection<TimeseriesSchema>> entry : slotTimeseries.entrySet()) {
       int slotNum = entry.getKey();
       FileSnapshot snapshot = slotSnapshots.computeIfAbsent(slotNum,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index fe3a7a0..9f1b562 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -311,7 +311,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         if (client != null) {
           try {
             client.removeHardLink(resource.getTsFile().getAbsolutePath(),
-                new GenericHandler<>(sourceNode, null));
+                dataGroupMember.getRaftGroupId(), new GenericHandler<>(sourceNode, null));
           } catch (TException e) {
             logger
                 .error("Cannot remove hardlink {} from {}", resource.getTsFile().getAbsolutePath(),
@@ -326,7 +326,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
           return;
         }
         try {
-          client.removeHardLink(resource.getTsFile().getAbsolutePath());
+          client.removeHardLink(resource.getTsFile().getAbsolutePath(),
+              dataGroupMember.getRaftGroupId());
         } catch (TException te) {
           client.getInputProtocol().getTransport().close();
           logger
@@ -516,7 +517,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         if (client == null) {
           throw new IOException("No available client for " + node.toString());
         }
-        ByteBuffer buffer = SyncClientAdaptor.readFile(client, remotePath, offset, fetchSize);
+        ByteBuffer buffer = SyncClientAdaptor
+            .readFile(client, remotePath, offset, fetchSize, dataGroupMember.getRaftGroupId());
         int len = writeBuffer(buffer, dest);
         if (len == 0) {
           break;
@@ -552,7 +554,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
 
       try {
         while (true) {
-          ByteBuffer buffer = client.readFile(remotePath, offset, fetchSize);
+          ByteBuffer buffer = client.readFile(remotePath, offset, fetchSize,
+              dataGroupMember.getRaftGroupId());
           int len = writeBuffer(buffer, dest);
           if (len == 0) {
             break;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index f0aa3f0..9dc6231 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -80,6 +80,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
     this.newMember = newMember;
     this.snapshotFactory = snapshotFactory;
     this.snapshotSave = snapshotSave;
+    persistTask();
   }
 
   @SuppressWarnings("java:S3740") // type cannot be known ahead
@@ -162,9 +163,9 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
 
   @Override
   public Void call() {
-    persistTask();
     request = new PullSnapshotRequest();
     request.setHeader(descriptor.getPreviousHolders().getHeader());
+    request.setRaftId(descriptor.getPreviousHolders().getId());
     request.setRequiredSlots(descriptor.getSlots());
     request.setRequireReadOnly(descriptor.isRequireReadOnly());
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
index 9e1fb90..441e960 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
@@ -75,6 +75,7 @@ public class PullSnapshotTaskDescriptor {
       dataOutputStream.writeInt(slot);
     }
 
+    dataOutputStream.writeInt(previousHolders.getId());
     dataOutputStream.writeInt(previousHolders.size());
     for (Node previousHolder : previousHolders) {
       SerializeUtils.serialize(previousHolder, dataOutputStream);
@@ -90,8 +91,8 @@ public class PullSnapshotTaskDescriptor {
       slots.add(dataInputStream.readInt());
     }
 
+    previousHolders = new PartitionGroup(dataInputStream.readInt());
     int holderSize = dataInputStream.readInt();
-    previousHolders = new PartitionGroup();
     for (int i = 0; i < holderSize; i++) {
       Node node = new Node();
       SerializeUtils.deserialize(node, dataInputStream);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 01817a4..82ef895 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -679,6 +679,7 @@ public class CMManager extends MManager {
     // pull schemas from a remote node
     PullSchemaRequest pullSchemaRequest = new PullSchemaRequest();
     pullSchemaRequest.setHeader(partitionGroup.getHeader());
+    pullSchemaRequest.setRaftId(partitionGroup.getId());
     pullSchemaRequest.setPrefixPaths(prefixPaths);
 
     // decide the node access order with the help of QueryCoordinator
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index e185e9d..16a4988 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -137,6 +137,7 @@ public class MetaPuller {
     // pull schemas from a remote node
     PullSchemaRequest pullSchemaRequest = new PullSchemaRequest();
     pullSchemaRequest.setHeader(partitionGroup.getHeader());
+    pullSchemaRequest.setRaftId(partitionGroup.getId());
     pullSchemaRequest.setPrefixPaths(prefixPaths.stream().map(PartialPath::getFullPath).collect(
         Collectors.toList()));
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 5fcfbcb..f8f89b9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -265,7 +265,7 @@ public class SlotPartitionTable implements PartitionTable {
     }
 
     SlotNodeAdditionResult result = new SlotNodeAdditionResult();
-    for (int raftId = 0 ;raftId < multiRaftFactor; raftId++) {
+    for (int raftId = 0; raftId < multiRaftFactor; raftId++) {
       PartitionGroup newGroup = getHeaderGroup(new RaftNode(node, raftId));
       if (newGroup.contains(thisNode)) {
         localGroups.add(newGroup);
@@ -296,7 +296,7 @@ public class SlotPartitionTable implements PartitionTable {
     // move the slots to the new node if any previous node have more slots than the new average
     int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor;
     int raftId = 0;
-    for(int i = 0 ; i < multiRaftFactor; i++) {
+    for (int i = 0; i < multiRaftFactor; i++) {
       RaftNode raftNode = new RaftNode(newNode, i);
       nodeSlotMap.putIfAbsent(raftNode, new ArrayList<>());
       previousNodeMap.putIfAbsent(raftNode, new HashMap<>());
@@ -307,10 +307,11 @@ public class SlotPartitionTable implements PartitionTable {
       if (transferNum > 0) {
         RaftNode curNode = new RaftNode(newNode, raftId);
         int numToMove = transferNum;
-        if(raftId != multiRaftFactor - 1) {
+        if (raftId != multiRaftFactor - 1) {
           numToMove = Math.min(numToMove, newAvg - nodeSlotMap.get(curNode).size());
         }
-        List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size() - transferNum + numToMove);
+        List<Integer> slotsToMove = slots
+            .subList(slots.size() - transferNum, slots.size() - transferNum + numToMove);
         nodeSlotMap.get(curNode).addAll(slotsToMove);
         for (Integer slot : slotsToMove) {
           // record what node previously hold the integer
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index e37aa7e..81ae373 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -317,12 +317,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void readFile(String filePath, long offset, int length,
+  public void readFile(String filePath, long offset, int length, int raftId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, 0), resultHandler,
+    DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, raftId), resultHandler,
         "Read file:" + filePath);
     if (service != null) {
-      service.readFile(filePath, offset, length, resultHandler);
+      service.readFile(filePath, offset, length, raftId, resultHandler);
     }
   }
 
@@ -849,8 +849,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
-    return getDataSyncService(new RaftNode(thisNode, 0)).readFile(filePath, offset, length);
+  public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
+    return getDataSyncService(new RaftNode(thisNode, raftId)).readFile(filePath, offset, length, raftId);
   }
 
   @Override
@@ -874,14 +874,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath) throws TException {
-    getDataSyncService(new RaftNode(thisNode, 0)).removeHardLink(hardLinkPath);
+  public void removeHardLink(String hardLinkPath, int raftId) throws TException {
+    getDataSyncService(new RaftNode(thisNode, raftId)).removeHardLink(hardLinkPath, raftId);
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath,
+  public void removeHardLink(String hardLinkPath, int raftId,
       AsyncMethodCallback<Void> resultHandler) {
-    getDataAsyncService(new RaftNode(thisNode, 0), resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
+    getDataAsyncService(new RaftNode(thisNode, raftId), resultHandler, hardLinkPath).removeHardLink(hardLinkPath, raftId,
         resultHandler);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index f8830c0..e4a7304 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -222,9 +222,9 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void readFile(String filePath, long offset, int length,
+  public void readFile(String filePath, long offset, int length, int raftId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    asyncService.readFile(filePath, offset, length, resultHandler);
+    asyncService.readFile(filePath, offset, length, raftId, resultHandler);
   }
 
   @Override
@@ -324,8 +324,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
-    return syncService.readFile(filePath, offset, length);
+  public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
+    return syncService.readFile(filePath, offset, length, raftId);
   }
 
   @Override
@@ -334,13 +334,13 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath) throws TException {
-    syncService.removeHardLink(hardLinkPath);
+  public void removeHardLink(String hardLinkPath, int raftId) throws TException {
+    syncService.removeHardLink(hardLinkPath, raftId);
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath,
+  public void removeHardLink(String hardLinkPath, int raftId,
       AsyncMethodCallback<Void> resultHandler) {
-    asyncService.removeHardLink(hardLinkPath, resultHandler);
+    asyncService.removeHardLink(hardLinkPath, raftId, resultHandler);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 1fb8336..2cd675f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -572,8 +572,9 @@ public class DataGroupMember extends RaftMember {
    * @return the path of the directory that is provided exclusively for the member.
    */
   private String getMemberDir() {
-    return IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator +
-        "raft" + File.separator + getHeader().nodeIdentifier + File.separator;
+    return IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "raft"
+        + File.separator + getHeader().nodeIdentifier + File.separator + getRaftGroupId()
+        + File.separator;
   }
 
   public MetaGroupMember getMetaGroupMember() {
@@ -625,8 +626,8 @@ public class DataGroupMember extends RaftMember {
         RaftNode raftNode = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
             partitionId * StorageEngine.getTimePartitionInterval());
         DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(raftNode);
-        if (localDataMember.getHeader().equals(this.getHeader())) {
-          localListPair.add(new Pair<>(partitionId, pair.right));
+        if (localDataMember.getHeader().equals(thisNode)) {
+          localListPair.add(pair);
         }
       }
       try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index c97223b..d93efc0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -532,6 +532,7 @@ public class MetaGroupMember extends RaftMember {
     newStartUpStatus
         .setReplicationNumber(ClusterDescriptor.getInstance().getConfig().getReplicationNum());
     newStartUpStatus.setClusterName(ClusterDescriptor.getInstance().getConfig().getClusterName());
+    newStartUpStatus.setMultiRaftFactor(ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor());
     List<String> seedUrls = ClusterDescriptor.getInstance().getConfig().getSeedNodeUrls();
     List<Node> seedNodeList = new ArrayList<>();
     for (String seedUrl : seedUrls) {
@@ -545,7 +546,7 @@ public class MetaGroupMember extends RaftMember {
    * Send a join cluster request to "node". If the joining is accepted, set the partition table,
    * start DataClusterServer and ClientServer and initialize DataGroupMembers.
    *
-   * @return rue if the node has successfully joined the cluster, false otherwise.
+   * @return true if the node has successfully joined the cluster, false otherwise.
    */
   private boolean joinCluster(Node node, StartUpStatus startUpStatus)
       throws TException, InterruptedException, ConfigInconsistentException {
@@ -594,18 +595,17 @@ public class MetaGroupMember extends RaftMember {
   }
 
   private void handleConfigInconsistency(AddNodeResponse resp) throws ConfigInconsistentException {
-    if (logger.isInfoEnabled()) {
-      CheckStatusResponse checkStatusResponse = resp.getCheckStatusResponse();
-      String parameters =
-          (checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval")
-              + (checkStatusResponse.isHashSaltEquals() ? "" : ", hash salt")
-              + (checkStatusResponse.isReplicationNumEquals() ? "" : ", replication number")
-              + (checkStatusResponse.isSeedNodeEquals() ? "" : ", seedNodes")
-              + (checkStatusResponse.isClusterNameEquals() ? "" : ", clusterName");
-      logger.error(
-          "The start up configuration{} conflicts the cluster. Please reset the configurations. ",
-          parameters.substring(1));
-    }
+    CheckStatusResponse checkStatusResponse = resp.getCheckStatusResponse();
+    String parameters =
+        (checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval")
+            + (checkStatusResponse.isHashSaltEquals() ? "" : ", hash salt")
+            + (checkStatusResponse.isReplicationNumEquals() ? "" : ", replication number")
+            + (checkStatusResponse.isSeedNodeEquals() ? "" : ", seedNodes")
+            + (checkStatusResponse.isClusterNameEquals() ? "" : ", clusterName")
+            + (checkStatusResponse.isMultiRaftFactorEquals() ? "" : ", multiRaftFactor");
+    logger.error(
+        "The start up configuration{} conflicts the cluster. Please reset the configurations. ",
+        parameters.substring(1));
     throw new ConfigInconsistentException();
   }
 
@@ -897,6 +897,7 @@ public class MetaGroupMember extends RaftMember {
     long remotePartitionInterval = remoteStartUpStatus.getPartitionInterval();
     int remoteHashSalt = remoteStartUpStatus.getHashSalt();
     int remoteReplicationNum = remoteStartUpStatus.getReplicationNumber();
+    int remoteMultiRaftFactor = remoteStartUpStatus.getMultiRaftFactor();
     String remoteClusterName = remoteStartUpStatus.getClusterName();
     List<Node> remoteSeedNodeList = remoteStartUpStatus.getSeedNodeList();
     long localPartitionInterval = IoTDBDescriptor.getInstance().getConfig()
@@ -904,7 +905,9 @@ public class MetaGroupMember extends RaftMember {
     int localHashSalt = ClusterConstant.HASH_SALT;
     int localReplicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
     String localClusterName = ClusterDescriptor.getInstance().getConfig().getClusterName();
+    int localMultiRaftFactor = ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
     boolean partitionIntervalEquals = true;
+    boolean multiRaftFactorEquals = true;
     boolean hashSaltEquals = true;
     boolean replicationNumEquals = true;
     boolean seedNodeEquals = true;
@@ -915,6 +918,11 @@ public class MetaGroupMember extends RaftMember {
       logger.info("Remote partition interval conflicts with the leader's. Leader: {}, remote: {}",
           localPartitionInterval, remotePartitionInterval);
     }
+    if (localMultiRaftFactor != remoteMultiRaftFactor) {
+      multiRaftFactorEquals = false;
+      logger.info("Remote multi-raft factor conflicts with the leader's. Leader: {}, remote: {}",
+          localMultiRaftFactor, remoteMultiRaftFactor);
+    }
     if (localHashSalt != remoteHashSalt) {
       hashSaltEquals = false;
       logger.info("Remote hash salt conflicts with the leader's. Leader: {}, remote: {}",
@@ -938,11 +946,11 @@ public class MetaGroupMember extends RaftMember {
       }
     }
     if (!(partitionIntervalEquals && hashSaltEquals && replicationNumEquals && seedNodeEquals
-        && clusterNameEquals)) {
+        && clusterNameEquals && multiRaftFactorEquals)) {
       response.setRespNum((int) Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT);
       response.setCheckStatusResponse(
           new CheckStatusResponse(partitionIntervalEquals, hashSaltEquals,
-              replicationNumEquals, seedNodeEquals, clusterNameEquals));
+              replicationNumEquals, seedNodeEquals, clusterNameEquals, multiRaftFactorEquals));
       return false;
     }
     return true;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 521050e..425a8d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -104,7 +104,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
   }
 
   @Override
-  public void readFile(String filePath, long offset, int length,
+  public void readFile(String filePath, long offset, int length, int raftId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
       resultHandler.onComplete(IOUtils.readFile(filePath, offset, length));
@@ -114,7 +114,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath,
+  public void removeHardLink(String hardLinkPath, int raftId,
       AsyncMethodCallback<Void> resultHandler) {
     try {
       Files.deleteIfExists(new File(hardLinkPath).toPath());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index f9bda1f..92f7018 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -122,7 +122,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
   }
 
   @Override
-  public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
+  public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
     try {
       return IOUtils.readFile(filePath, offset, length);
     } catch (IOException e) {
@@ -131,7 +131,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
   }
 
   @Override
-  public void removeHardLink(String hardLinkPath) throws TException {
+  public void removeHardLink(String hardLinkPath, int raftId) throws TException {
     try {
       Files.deleteIfExists(new File(hardLinkPath).toPath());
     } catch (IOException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index a777f5a..4d1c87a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -82,12 +82,18 @@ public class ClusterUtils {
     boolean replicationNumEquals = true;
     boolean seedNodeListEquals = true;
     boolean clusterNameEqual = true;
+    boolean multiRaftFactorEqual = true;
 
     if (localStartUpStatus.getPartitionInterval() != remoteStartUpStatus.getPartitionInterval()) {
       partitionIntervalEquals = false;
       logger.error("Remote partition interval conflicts with local. local: {}, remote: {}",
           localStartUpStatus.getPartitionInterval(), remoteStartUpStatus.getPartitionInterval());
     }
+    if (localStartUpStatus.getMultiRaftFactor() != remoteStartUpStatus.getMultiRaftFactor()) {
+      multiRaftFactorEqual = false;
+      logger.error("Remote multi-raft factor conflicts with local. local: {}, remote: {}",
+          localStartUpStatus.getMultiRaftFactor(), remoteStartUpStatus.getMultiRaftFactor());
+    }
     if (localStartUpStatus.getHashSalt() != remoteStartUpStatus.getHashSalt()) {
       hashSaltEquals = false;
       logger.error("Remote hash salt conflicts with local. local: {}, remote: {}",
@@ -115,7 +121,7 @@ public class ClusterUtils {
     }
 
     return new CheckStatusResponse(partitionIntervalEquals, hashSaltEquals,
-        replicationNumEquals, seedNodeListEquals, clusterNameEqual);
+        replicationNumEquals, seedNodeListEquals, clusterNameEqual, multiRaftFactorEqual);
   }
 
   public static boolean checkSeedNodes(boolean isClusterEstablished, List<Node> localSeedNodes,
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index dcc582a..88deb3d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -64,7 +64,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 import org.apache.thrift.TException;
@@ -94,7 +93,7 @@ public class SyncClientAdaptorTest {
   @Before
   public void setUp() {
     nodeStatus = new TNodeStatus();
-    checkStatusResponse = new CheckStatusResponse(true, false, true, false, true);
+    checkStatusResponse = new CheckStatusResponse(true, false, true, false, true, true);
     addNodeResponse = new AddNodeResponse((int) Response.RESPONSE_AGREE);
     aggregateResults = Arrays.asList(ByteBuffer.wrap("1".getBytes()),
         ByteBuffer.wrap("2".getBytes()), ByteBuffer.wrap("2".getBytes()));
@@ -247,7 +246,7 @@ public class SyncClientAdaptorTest {
       }
 
       @Override
-      public void readFile(String filePath, long offset, int length,
+      public void readFile(String filePath, long offset, int length, int raftId,
           AsyncMethodCallback<ByteBuffer> resultHandler) {
         resultHandler.onComplete(readFileResult);
       }
@@ -338,7 +337,7 @@ public class SyncClientAdaptorTest {
         TestUtils.getNode(0), 0, paths));
     assertEquals(1L, (long) SyncClientAdaptor.getGroupByExecutor(dataClient, new GroupByRequest()));
     assertEquals(fillResult, SyncClientAdaptor.previousFill(dataClient, new PreviousFillRequest()));
-    assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000));
+    assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000, 0));
     assertEquals(aggregateResults, SyncClientAdaptor.getGroupByResult(dataClient,
         TestUtils.getNode(0), 0, 1, 1, 2));
     assertEquals(peekNextNotNullValueResult, SyncClientAdaptor.peekNextNotNullValue(dataClient,
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 63ce1b7..51f0c8a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -117,7 +117,7 @@ public class TestAsyncDataClient extends AsyncDataClient {
   }
 
   @Override
-  public void readFile(String filePath, long offset, int length,
+  public void readFile(String filePath, long offset, int length, int raftId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     new Thread(() -> {
       File file = new File(filePath);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
index b89d058..38393d1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
@@ -62,7 +62,7 @@ public abstract class DataSnapshotTest {
       public AsyncClient getAsyncClient(Node node) {
         return new AsyncDataClient(null, null, null) {
           @Override
-          public void readFile(String filePath, long offset, int length,
+          public void readFile(String filePath, long offset, int length, int raftId,
               AsyncMethodCallback<ByteBuffer> resultHandler) {
             new Thread(() -> {
               if (addNetFailure && (failureCnt++) % failureFrequency == 0) {
@@ -79,8 +79,7 @@ public abstract class DataSnapshotTest {
           }
 
           @Override
-          public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler)
-              throws TException {
+          public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) {
             new Thread(() -> {
               try {
                 Files.deleteIfExists(new File(hardLinkPath).toPath());
@@ -121,7 +120,7 @@ public abstract class DataSnapshotTest {
           }
         })) {
           @Override
-          public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
+          public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
             if (addNetFailure && (failureCnt++) % failureFrequency == 0) {
               // simulate failures
               throw new TException("Faked network failure");
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
index d9ce485..539e839 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
@@ -119,7 +119,7 @@ public class PullSnapshotTaskTest extends DataSnapshotTest {
           }
 
           @Override
-          public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
+          public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
             try {
               return IOUtils.readFile(filePath, offset, length);
             } catch (IOException e) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 8180d4e..c920f06 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -222,7 +222,7 @@ public class DataGroupMemberTest extends MemberTest {
             }
 
             @Override
-            public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) {
+            public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) {
               new Thread(() -> {
                 try {
                   Files.deleteIfExists(new File(hardLinkPath).toPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 781836f..b03f984 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -500,9 +500,7 @@ public class StorageEngine implements IService {
    * @throws StorageGroupNotSetException
    */
   public void closeStorageGroupProcessor(PartialPath storageGroupPath, long partitionId,
-      boolean isSeq,
-      boolean isSync)
-      throws StorageGroupNotSetException {
+      boolean isSeq, boolean isSync) throws StorageGroupNotSetException {
     StorageGroupProcessor processor = processorMap.get(storageGroupPath);
     if (processor == null) {
       throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 69a4dc5..9019680 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -132,6 +132,7 @@ struct StartUpStatus {
   3: required int replicationNumber
   4: required list<Node> seedNodeList
   5: required string clusterName
+  6: required int multiRaftFactor
 }
 
 // follower -> leader
@@ -141,6 +142,7 @@ struct CheckStatusResponse {
   3: required bool replicationNumEquals
   4: required bool seedNodeEquals
   5: required bool clusterNameEquals
+  6: required bool multiRaftFactorEquals
 }
 
 struct SendSnapshotRequest {
@@ -319,7 +321,7 @@ service RaftService {
   * bytes, only the remaining will be returned.
   * Notice that when the last chunk of the file is read, the file will be deleted immediately.
   **/
-  binary readFile(1:string filePath, 2:long offset, 3:int length)
+  binary readFile(1:string filePath, 2:long offset, 3:int length, 4: int raftId)
 
   /**
   * Test if a log of "index" and "term" exists.
@@ -330,7 +332,7 @@ service RaftService {
   * When a follower finds that it already has a file in a snapshot locally, it calls this
   * interface to notify the leader to remove the associated hardlink.
   **/
-  void removeHardLink(1: string hardLinkPath)
+  void removeHardLink(1: string hardLinkPath, 2: int raftId)
 }
 
 


[iotdb] 02/04: fix bugs of wrong previous groups, pull snapshot from self and wrong remove local data

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 325e4f556cb3d520b6a365a89317e997291979c2
Author: lta <li...@163.com>
AuthorDate: Mon Jan 11 11:26:00 2021 +0800

    fix bugs of wrong previous groups, pull snapshot from self and wrong remove local data
---
 .../iotdb/cluster/log/snapshot/FileSnapshot.java   |  6 +--
 .../cluster/log/snapshot/PullSnapshotTask.java     | 56 ++++++++++---------
 .../iotdb/cluster/partition/NodeRemovalResult.java |  5 +-
 .../iotdb/cluster/partition/PartitionTable.java    |  6 +++
 .../cluster/partition/slot/SlotPartitionTable.java | 59 ++++++++++++++------
 .../iotdb/cluster/server/DataClusterServer.java    | 30 ++++-------
 .../cluster/server/member/DataGroupMember.java     | 62 +++++++++++++++++-----
 .../cluster/server/member/MetaGroupMember.java     |  1 -
 .../iotdb/cluster/server/member/RaftMember.java    | 35 ++++++++++++
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |  5 ++
 thrift/src/main/thrift/cluster.thrift              |  1 -
 11 files changed, 185 insertions(+), 81 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index 9f1b562..b559879 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -227,12 +227,10 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         throw new SnapshotInstallationException(e);
       }
 
-      for (FileSnapshot value : snapshotMap.values()) {
-        installFileSnapshotSchema(value);
-      }
-
       for (Entry<Integer, FileSnapshot> integerSnapshotEntry : snapshotMap.entrySet()) {
         Integer slot = integerSnapshotEntry.getKey();
+        FileSnapshot snapshot = integerSnapshotEntry.getValue();
+        installFileSnapshotSchema(snapshot);
         SlotStatus status = slotManager.getStatus(slot);
         if (status == SlotStatus.PULLING) {
           // as schemas are set, writes can proceed
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 9dc6231..4a79485 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.log.Snapshot;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
@@ -163,30 +164,37 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
 
   @Override
   public Void call() {
-    request = new PullSnapshotRequest();
-    request.setHeader(descriptor.getPreviousHolders().getHeader());
-    request.setRaftId(descriptor.getPreviousHolders().getId());
-    request.setRequiredSlots(descriptor.getSlots());
-    request.setRequireReadOnly(descriptor.isRequireReadOnly());
-
-    boolean finished = false;
-    int nodeIndex = -1;
-    while (!finished) {
-      try {
-        // sequentially pick up a node that may have this slot
-        nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
-        finished = pullSnapshot(nodeIndex);
-        if (!finished) {
-          Thread
-              .sleep(ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs());
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        finished = true;
-      } catch (TException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(),
-              descriptor.getPreviousHolders().get(nodeIndex), e);
+    // If this node is the member of previous holder, it's unnecessary to pull data again
+    if (descriptor.getPreviousHolders().contains(newMember.getThisNode())) {
+      // inform the previous holders that one member has successfully pulled snapshot directly
+      newMember.registerPullSnapshotHint(descriptor);
+    } else {
+      request = new PullSnapshotRequest();
+      request.setHeader(descriptor.getPreviousHolders().getHeader());
+      request.setRaftId(descriptor.getPreviousHolders().getId());
+      request.setRequiredSlots(descriptor.getSlots());
+      request.setRequireReadOnly(descriptor.isRequireReadOnly());
+
+      boolean finished = false;
+      int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode());
+      while (!finished) {
+        try {
+          // sequentially pick up a node that may have this slot
+          nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
+          finished = pullSnapshot(nodeIndex);
+          if (!finished) {
+            Thread
+                .sleep(
+                    ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs());
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          finished = true;
+        } catch (TException e) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(),
+                descriptor.getPreviousHolders().get(nodeIndex), e);
+          }
         }
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 457af85..5493980 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.partition;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -26,10 +27,10 @@ import java.util.List;
  */
 public class NodeRemovalResult {
 
-  private List<PartitionGroup> removedGroupList;
+  private List<PartitionGroup> removedGroupList = new ArrayList<>();
   // if the removed group contains the local node, the local node should join a new group to
   // preserve the replication number
-  private List<PartitionGroup> newGroupList;
+  private List<PartitionGroup> newGroupList = new ArrayList<>();
 
   public PartitionGroup getRemovedGroup(int raftId) {
     for (PartitionGroup group : removedGroupList) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index bd8e518..079aad1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -95,6 +95,12 @@ public interface PartitionTable {
   List<PartitionGroup> getGlobalGroups();
 
   /**
+   * Judge whether the data of slot is held by node
+   * @param node target node
+   */
+  boolean judgeHoldSlot(Node node, int slot);
+
+  /**
    * @param path      can be an incomplete path (but should contain a storage group name) e.g., if
    *                  "root.sg" is a storage group, then path can not be "root".
    * @param timestamp
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index f8f89b9..2a5ae3c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -62,7 +62,7 @@ public class SlotPartitionTable implements PartitionTable {
   private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM];
   // the nodes that each slot belongs to before a new node is added, used for the new node to
   // find the data source
-  private Map<RaftNode, Map<Integer, RaftNode>> previousNodeMap = new ConcurrentHashMap<>();
+  private Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = new ConcurrentHashMap<>();
 
   //the filed is used for determining which nodes need to be a group.
   // the data groups which this node belongs to.
@@ -164,8 +164,7 @@ public class SlotPartitionTable implements PartitionTable {
     return ret;
   }
 
-  @Override
-  public PartitionGroup getHeaderGroup(RaftNode raftNode) {
+  private PartitionGroup getHeaderGroup(RaftNode raftNode, List<Node> nodeRing) {
     PartitionGroup ret = new PartitionGroup(raftNode.getRaftId());
 
     // assuming the nodes are [1,2,3,4,5]
@@ -187,6 +186,11 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
+  public PartitionGroup getHeaderGroup(RaftNode raftNode) {
+    return getHeaderGroup(raftNode, this.nodeRing);
+  }
+
+  @Override
   public PartitionGroup getHeaderGroup(Node node) {
     return getHeaderGroup(new RaftNode(node, 0));
   }
@@ -228,11 +232,13 @@ public class SlotPartitionTable implements PartitionTable {
 
   @Override
   public NodeAdditionResult addNode(Node node) {
+    List<Node> oldRing;
     synchronized (nodeRing) {
       if (nodeRing.contains(node)) {
         return null;
       }
 
+      oldRing = new ArrayList<>(nodeRing);
       nodeRing.add(node);
       nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
 
@@ -277,7 +283,7 @@ public class SlotPartitionTable implements PartitionTable {
 
     // the slots movement is only done logically, the new node itself will pull data from the
     // old node
-    result.setLostSlots(moveSlotsToNew(node));
+    result.setLostSlots(moveSlotsToNew(node, oldRing));
 
     return result;
   }
@@ -290,7 +296,7 @@ public class SlotPartitionTable implements PartitionTable {
    * @param newNode
    * @return a map recording what slots each group lost.
    */
-  private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode) {
+  private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode, List<Node> oldRing) {
     Map<RaftNode, Set<Integer>> result = new HashMap<>();
     // as a node is added, the average slots for each node decrease
     // move the slots to the new node if any previous node have more slots than the new average
@@ -315,7 +321,7 @@ public class SlotPartitionTable implements PartitionTable {
         nodeSlotMap.get(curNode).addAll(slotsToMove);
         for (Integer slot : slotsToMove) {
           // record what node previously hold the integer
-          previousNodeMap.get(curNode).put(slot, entry.getKey());
+          previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
           slotNodes[slot] = curNode;
         }
         result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove);
@@ -326,7 +332,7 @@ public class SlotPartitionTable implements PartitionTable {
           nodeSlotMap.get(curNode).addAll(slotsToMove);
           for (Integer slot : slotsToMove) {
             // record what node previously hold the integer
-            previousNodeMap.get(curNode).put(slot, entry.getKey());
+            previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
             slotNodes[slot] = curNode;
           }
           result.get(entry.getKey()).addAll(slotsToMove);
@@ -357,15 +363,19 @@ public class SlotPartitionTable implements PartitionTable {
       }
 
       dataOutputStream.writeInt(previousNodeMap.size());
-      for (Entry<RaftNode, Map<Integer, RaftNode>> nodeMapEntry : previousNodeMap.entrySet()) {
+      for (Entry<RaftNode, Map<Integer, PartitionGroup>> nodeMapEntry : previousNodeMap.entrySet()) {
         dataOutputStream.writeInt(nodeMapEntry.getKey().getNode().getNodeIdentifier());
         dataOutputStream.writeInt(nodeMapEntry.getKey().getRaftId());
-        Map<Integer, RaftNode> prevHolders = nodeMapEntry.getValue();
+        Map<Integer, PartitionGroup> prevHolders = nodeMapEntry.getValue();
         dataOutputStream.writeInt(prevHolders.size());
-        for (Entry<Integer, RaftNode> integerNodeEntry : prevHolders.entrySet()) {
+        for (Entry<Integer, PartitionGroup> integerNodeEntry : prevHolders.entrySet()) {
           dataOutputStream.writeInt(integerNodeEntry.getKey());
-          dataOutputStream.writeInt(integerNodeEntry.getValue().getNode().getNodeIdentifier());
-          dataOutputStream.writeInt(integerNodeEntry.getValue().getRaftId());
+          PartitionGroup group = integerNodeEntry.getValue();
+          dataOutputStream.writeInt(group.getId());
+          dataOutputStream.writeInt(group.size());
+          for (Node node : group) {
+            dataOutputStream.writeInt(node.getNodeIdentifier());
+          }
         }
       }
 
@@ -402,12 +412,16 @@ public class SlotPartitionTable implements PartitionTable {
       int nodeId = buffer.getInt();
       RaftNode node = new RaftNode(idNodeMap.get(nodeId), buffer.getInt());
 
-      Map<Integer, RaftNode> prevHolders = new HashMap<>();
+      Map<Integer, PartitionGroup> prevHolders = new HashMap<>();
       int holderNum = buffer.getInt();
       for (int i1 = 0; i1 < holderNum; i1++) {
         int slot = buffer.getInt();
-        RaftNode holder = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt());
-        prevHolders.put(slot, holder);
+        PartitionGroup group = new PartitionGroup(buffer.getInt());
+        int nodeNum = buffer.getInt();
+        for (int i2 = 0 ; i2 < nodeNum; i2++) {
+          group.add(idNodeMap.get(buffer.getInt()));
+        }
+        prevHolders.put(slot, group);
       }
       previousNodeMap.put(node, prevHolders);
     }
@@ -429,7 +443,7 @@ public class SlotPartitionTable implements PartitionTable {
     return nodeRing;
   }
 
-  public Map<Integer, RaftNode> getPreviousNodeMap(RaftNode raftNode) {
+  public Map<Integer, PartitionGroup> getPreviousNodeMap(RaftNode raftNode) {
     return previousNodeMap.get(raftNode);
   }
 
@@ -503,6 +517,12 @@ public class SlotPartitionTable implements PartitionTable {
         // each node exactly joins replicationNum groups, so when a group is removed, the node
         // should join a new one
         int thisNodeIdx = nodeRing.indexOf(thisNode);
+
+        // check if this node is to be removed
+        if (thisNodeIdx == -1) {
+          continue;
+        }
+
         // this node must be the last node of the new group
         int headerNodeIdx = thisNodeIdx - (replicationNum - 1);
         headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx;
@@ -531,7 +551,7 @@ public class SlotPartitionTable implements PartitionTable {
         int slot = slots.get(i);
         RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), raftId);
         slotNodes[slot] = newHolder;
-        nodeSlotMap.get(newHolder).add(slot);
+        nodeSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
         newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
       }
     }
@@ -549,6 +569,11 @@ public class SlotPartitionTable implements PartitionTable {
     }
   }
 
+  @Override
+  public boolean judgeHoldSlot(Node node, int slot) {
+    return getHeaderGroup(slotNodes[slot]).contains(node);
+  }
+
   private void calculateGlobalGroups() {
     globalGroups = new ArrayList<>();
     for (Node node : getAllNodes()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 81ae373..b023c36 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -526,9 +526,15 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     }
   }
 
+  /**
+   * Make sure the group will not receive new raft logs
+   * @param header
+   * @param dataGroupMember
+   */
   private void removeMember(RaftNode header, DataGroupMember dataGroupMember) {
-    dataGroupMember.syncLeader();
+    dataGroupMember.getStopStatus().setSyncSuccess(dataGroupMember.syncLeader());
     dataGroupMember.setReadOnly();
+    dataGroupMember.waitFollowersToSync();
     dataGroupMember.stop();
     stoppedMemberManager.put(header, dataGroupMember);
   }
@@ -578,8 +584,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   /**
    * Try removing a node from the groups of each DataGroupMember. If the node is the header of some
    * group, set the member to read only so that it can still provide data for other nodes that has
-   * not yet pulled its data. If the node is the local node, remove all members whose group is not
-   * headed by this node. Otherwise, just change the node list of the member and pull new data. And
+   * not yet pulled its data. Otherwise, just change the node list of the member and pull new data. And
    * create a new DataGroupMember if this node should join a new group because of this removal.
    *
    * @param node
@@ -591,25 +596,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       while (entryIterator.hasNext()) {
         Entry<RaftNode, DataGroupMember> entry = entryIterator.next();
         DataGroupMember dataGroupMember = entry.getValue();
-        if (dataGroupMember.getHeader().equals(node)) {
-          // the group is removed as the node is removed, so new writes should be rejected as
-          // they belong to the new holder, but the member is kept alive for other nodes to pull
-          // snapshots
+        if (dataGroupMember.getHeader().equals(node) || node.equals(thisNode)) {
           entryIterator.remove();
           removeMember(entry.getKey(), entry.getValue());
         } else {
-          if (node.equals(thisNode)) {
-            // this node is removed, it is no more replica of other groups
-            List<Integer> nodeSlots =
-                ((SlotPartitionTable) partitionTable)
-                    .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
-            dataGroupMember.removeLocalData(nodeSlots);
-            entryIterator.remove();
-            dataGroupMember.stop();
-          } else {
-            // the group should be updated and pull new slots from the removed node
-            dataGroupMember.removeNode(node, removalResult);
-          }
+          // the group should be updated and pull new slots from the removed node
+          dataGroupMember.removeNode(node, removalResult);
         }
       }
       for (PartitionGroup newGroup : removalResult.getNewGroupList()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 2cd675f..4737520 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -402,16 +402,27 @@ public class DataGroupMember extends RaftMember {
    * @param request
    */
   public PullSnapshotResp getSnapshot(PullSnapshotRequest request) throws IOException {
-    waitLeader();
-    if (character != NodeCharacter.LEADER && !readOnly) {
-      return null;
-    }
     // if the requester pulls the snapshots because the header of the group is removed, then the
     // member should no longer receive new data
     if (request.isRequireReadOnly()) {
       setReadOnly();
     }
 
+    boolean canGetSnapshot;
+    /**
+     * There are two conditions that can get snapshot:
+     * 1. The raft member is stopped and sync status is successful which means it has synced leader successfully before stop.
+     * 2. The raft member is not stopped and syncing leader is successful.
+     */
+    if (stopStatus.stop) {
+      canGetSnapshot = stopStatus.syncSuccess;
+    } else {
+      canGetSnapshot = syncLeader();
+    }
+    if (!canGetSnapshot) {
+      return null;
+    }
+
     List<Integer> requiredSlots = request.getRequiredSlots();
     for (Integer requiredSlot : requiredSlots) {
       // wait if the data of the slot is in another node
@@ -467,28 +478,26 @@ public class DataGroupMember extends RaftMember {
     synchronized (logManager) {
       logger.info("{} pulling {} slots from remote", name, slots.size());
       PartitionedSnapshot<Snapshot> snapshot = (PartitionedSnapshot) logManager.getSnapshot();
-      Map<Integer, RaftNode> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
+      Map<Integer, PartitionGroup> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
           .getPreviousNodeMap(new RaftNode(newNode, getRaftGroupId()));
 
       // group the slots by their owners
-      Map<RaftNode, List<Integer>> holderSlotsMap = new HashMap<>();
+      Map<PartitionGroup, List<Integer>> holderSlotsMap = new HashMap<>();
       for (int slot : slots) {
         // skip the slot if the corresponding data is already replicated locally
         if (snapshot.getSnapshot(slot) == null) {
-          RaftNode raftNode = prevHolders.get(slot);
-          if (raftNode != null) {
-            holderSlotsMap.computeIfAbsent(raftNode, n -> new ArrayList<>()).add(slot);
+          PartitionGroup group = prevHolders.get(slot);
+          if (group != null) {
+            holderSlotsMap.computeIfAbsent(group, n -> new ArrayList<>()).add(slot);
           }
         }
       }
 
       // pull snapshots from each owner's data group
-      for (Entry<RaftNode, List<Integer>> entry : holderSlotsMap.entrySet()) {
-        RaftNode raftNode = entry.getKey();
+      for (Entry<PartitionGroup, List<Integer>> entry : holderSlotsMap.entrySet()) {
         List<Integer> nodeSlots = entry.getValue();
         PullSnapshotTaskDescriptor taskDescriptor =
-            new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable()
-                .getHeaderGroup(raftNode), nodeSlots, false);
+            new PullSnapshotTaskDescriptor(entry.getKey(), nodeSlots, false);
         pullFileSnapshot(taskDescriptor, null);
       }
     }
@@ -760,6 +769,27 @@ public class DataGroupMember extends RaftMember {
     }
   }
 
+  public void waitFollowersToSync() {
+    if (character != NodeCharacter.LEADER) {
+      return;
+    }
+    for (Map.Entry<Node, Peer> entry: peerMap.entrySet()) {
+      Node node = entry.getKey();
+      Peer peer = entry.getValue();
+      while (peer.getMatchIndex() < logManager.getCommitLogIndex()) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          logger.warn("{}: Unexpected interruption when waiting follower {} to sync, raft id is {}",
+              name, node, getRaftGroupId());
+        }
+      }
+      logger.info("{}: Follower {} has synced with leader, raft id is {}", name, node,
+          getRaftGroupId());
+    }
+  }
+
   /**
    * Generate a report containing the character, leader, term, last log term, last log index, header
    * and readOnly or not of this member.
@@ -800,6 +830,12 @@ public class DataGroupMember extends RaftMember {
   public boolean onSnapshotInstalled(List<Integer> slots) {
     List<Integer> removableSlots = new ArrayList<>();
     for (Integer slot : slots) {
+      /**
+       * If this slot is just held by different raft groups in the same node, it should keep the data of slot.
+       */
+      if (metaGroupMember.getPartitionTable().judgeHoldSlot(thisNode, slot)) {
+        continue;
+      }
       int sentReplicaNum = slotManager.sentOneReplication(slot);
       if (sentReplicaNum >= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) {
         removableSlots.add(slot);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index d93efc0..7e73f61 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -382,7 +382,6 @@ public class MetaGroupMember extends RaftMember {
         logger.error("Unexpected interruption when waiting for hardlinkCleaner to end", e);
       }
     }
-
     logger.info("{}: stopped", name);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 3a5b51b..0526285 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -249,6 +249,8 @@ public abstract class RaftMember {
    */
   private LogDispatcher logDispatcher;
 
+  protected StopStatus stopStatus;
+
   protected RaftMember() {
   }
 
@@ -260,6 +262,7 @@ public abstract class RaftMember {
     this.asyncHeartbeatClientPool = asyncHeartbeatPool;
     this.syncHeartbeatClientPool = syncHeartbeatPool;
     this.asyncSendLogClientPool = asyncClientPool;
+    this.stopStatus = new StopStatus();
   }
 
   protected RaftMember(String name, AsyncClientPool asyncPool, SyncClientPool syncPool,
@@ -365,9 +368,11 @@ public abstract class RaftMember {
         logger.error("Unexpected interruption when waiting for commitLogPool to end", e);
       }
     }
+    leader.set(ClusterConstant.EMPTY_NODE);
     catchUpService = null;
     heartBeatService = null;
     appendLogThreadPool = null;
+    stopStatus.setStop(true);
     logger.info("Member {} stopped", name);
   }
 
@@ -801,6 +806,9 @@ public abstract class RaftMember {
    * Wait until the leader of this node becomes known or time out.
    */
   public void waitLeader() {
+    if (stopStatus.isStop()) {
+      return;
+    }
     long startTime = System.currentTimeMillis();
     while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
       synchronized (waitLeaderCondition) {
@@ -1876,4 +1884,31 @@ public abstract class RaftMember {
     OK, TIME_OUT, LEADERSHIP_STALE
   }
 
+  public class StopStatus {
+
+    boolean stop;
+
+    boolean syncSuccess;
+
+    public boolean isStop() {
+      return stop;
+    }
+
+    public void setStop(boolean stop) {
+      this.stop = stop;
+    }
+
+    public boolean isSyncSuccess() {
+      return syncSuccess;
+    }
+
+    public void setSyncSuccess(boolean syncSuccess) {
+      this.syncSuccess = syncSuccess;
+    }
+  }
+
+  public StopStatus getStopStatus() {
+    return stopStatus;
+  }
+
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index e112d31..f6bb254 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -106,6 +106,11 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     public List<PartitionGroup> getGlobalGroups() {
       return null;
     }
+
+    @Override
+    public boolean judgeHoldSlot(Node node, int slot) {
+      return true;
+    }
   };
 
   @Override
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 9019680..2a24106 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -315,7 +315,6 @@ service RaftService {
   **/
   long requestCommitIndex(1:Node header, 2:int raftId)
 
-
   /**
   * Read a chunk of a file from the client. If the remaining of the file does not have enough
   * bytes, only the remaining will be returned.