You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/03/24 01:36:27 UTC

[iotdb] branch cluster_scalability updated: This commit fix following issues: 1. Meta leader do not apply partition table immediately, it should deserialize when apply the log. 2. sendLogToAllDataLog parallelization 3. Add time-consuming statistics 4. modify the wait time of retry from 1s to 10ms 5. Put sendLogToAllGroup into meta applier

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_scalability by this push:
     new feb1f64  This commit fix following issues: 1. Meta leader do not apply partition table immediately, it should deserialize when apply the log. 2. sendLogToAllDataLog parallelization 3. Add time-consuming statistics 4. modify the wait time of retry from 1s to 10ms 5. Put sendLogToAllGroup into meta applier
feb1f64 is described below

commit feb1f642f3c4eb13b7ec16448676a50cc37c9da2
Author: lta <li...@163.com>
AuthorDate: Wed Mar 24 09:30:37 2021 +0800

    This commit fix following issues:
    1. Meta leader do not apply partition table immediately, it should deserialize when apply the log.
    2. sendLogToAllDataLog parallelization
    3. Add time-consuming statistics
    4. modify the wait time of retry from 1s to 10ms
    5. Put sendLogToAllGroup into meta applier
---
 .../java/org/apache/iotdb/cluster/ClusterMain.java |  14 +-
 .../iotdb/cluster/config/ClusterConstant.java      |   2 +
 .../org/apache/iotdb/cluster/log/LogParser.java    |   8 +
 .../iotdb/cluster/log/applier/DataLogApplier.java  |  10 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |  36 +++-
 .../iotdb/cluster/log/logtypes/AddNodeLog.java     |   4 +-
 .../iotdb/cluster/log/logtypes/RemoveNodeLog.java  |   4 +-
 .../cluster/log/snapshot/PullSnapshotTask.java     |   3 +
 .../cluster/partition/slot/SlotPartitionTable.java |  12 +-
 .../iotdb/cluster/query/ClusterPlanRouter.java     |  23 ++-
 .../iotdb/cluster/server/DataClusterServer.java    |  18 +-
 .../cluster/server/heartbeat/HeartbeatThread.java  |   3 +
 .../server/heartbeat/MetaHeartbeatThread.java      |   3 +
 .../cluster/server/member/MetaGroupMember.java     | 225 +++++++++++++--------
 .../iotdb/cluster/server/member/RaftMember.java    |  21 +-
 .../cluster/server/service/MetaAsyncService.java   |   2 +-
 .../cluster/utils/nodetool/ClusterMonitor.java     |   2 +-
 .../apache/iotdb/cluster/log/LogParserTest.java    |   1 +
 .../cluster/server/member/MetaGroupMemberTest.java |   7 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   3 +-
 20 files changed, 258 insertions(+), 143 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 391a7cf..820d623 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -86,6 +86,7 @@ public class ClusterMain {
     }
 
     IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnablePartition(true);
     logger.info("Running mode {}", mode);
     if (MODE_START.equals(mode)) {
       try {
@@ -101,10 +102,13 @@ public class ClusterMain {
       }
     } else if (MODE_ADD.equals(mode)) {
       try {
+        long startTime = System.currentTimeMillis();
         metaServer = new MetaClusterServer();
         // preStartCustomize();
         metaServer.start();
         metaServer.joinCluster();
+        logger.info("Adding this node {} to cluster costs {} ms",
+            metaServer.getMember().getThisNode(), (System.currentTimeMillis() - startTime));
       } catch (TTransportException | StartupException | QueryProcessException | StartUpCheckFailureException | ConfigInconsistentException e) {
         metaServer.stop();
         logger.error("Fail to join cluster", e);
@@ -252,6 +256,7 @@ public class ClusterMain {
       }
       AsyncMetaClient client = new AsyncMetaClient(factory, new TAsyncClientManager(), node, null);
       Long response = null;
+      long startTime = System.currentTimeMillis();
       try {
         logger.info("Start removing node {} with the help of node {}", nodeToRemove, node);
         response = SyncClientAdaptor.removeNode(client, nodeToRemove);
@@ -262,15 +267,15 @@ public class ClusterMain {
         logger.warn("Cannot send remove node request through {}, try next node", node);
       }
       if (response != null) {
-        handleNodeRemovalResp(response, nodeToRemove);
+        handleNodeRemovalResp(response, nodeToRemove, startTime);
         return;
       }
     }
   }
 
-  private static void handleNodeRemovalResp(Long response, Node nodeToRemove) {
+  private static void handleNodeRemovalResp(Long response, Node nodeToRemove, long startTime) {
     if (response == Response.RESPONSE_AGREE) {
-      logger.info("Node {} is successfully removed", nodeToRemove);
+      logger.info("Node {} is successfully removed, cost {}ms", nodeToRemove, (System.currentTimeMillis() - startTime));
     } else if (response == Response.RESPONSE_CLUSTER_TOO_SMALL) {
       logger.error("Cluster size is too small, cannot remove any node");
     } else if (response == Response.RESPONSE_REJECT) {
@@ -278,6 +283,9 @@ public class ClusterMain {
     } else if (response == Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) {
       logger.warn(
           "The cluster is performing other change membership operations. Change membership operations should be performed one by one. Please try again later");
+    } else if (response == Response.RESPONSE_DATA_MIGRATION_NOT_FINISH) {
+      logger.warn(
+          "The data migration of the previous membership change operation is not finished. Please try again later");
     } else {
       logger.error("Unexpected response {}", response);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index 8d3a4da..8e3f304 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@ -35,6 +35,8 @@ public class ClusterConstant {
 
   public static final int LOG_NUM_IN_BATCH = 100;
 
+  public static final int RETRY_WAIT_TIME_MS = 10;
+
   public static final Node EMPTY_NODE = new Node();
 
   private ClusterConstant() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index 4a7afc4..1af0830 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -64,6 +64,10 @@ public class LogParser {
       case ADD_NODE:
         AddNodeLog addNodeLog = new AddNodeLog();
         addNodeLog.deserialize(buffer);
+        if (logger.isDebugEnabled()) {
+          logger.info("The last meta log index of log {} is {}", addNodeLog,
+              addNodeLog.getMetaLogIndex());
+        }
         log = addNodeLog;
         break;
       case PHYSICAL_PLAN:
@@ -79,6 +83,10 @@ public class LogParser {
       case REMOVE_NODE:
         RemoveNodeLog removeNodeLog = new RemoveNodeLog();
         removeNodeLog.deserialize(buffer);
+        if (logger.isDebugEnabled()) {
+          logger.debug("The last meta log index of log {} is {}", removeNodeLog,
+              removeNodeLog.getMetaLogIndex());
+        }
         log = removeNodeLog;
         break;
       case EMPTY_CONTENT:
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 84fad46..fbaffaa 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -107,11 +107,8 @@ public class DataLogApplier extends BaseApplier {
 
   private void applyInsert(InsertPlan plan)
       throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
-    // check if the corresponding slot is being pulled
-    PartialPath sg;
-    long time = plan.getMinTime();
     try {
-      sg = IoTDB.metaManager.getStorageGroupPath(plan.getDeviceId());
+      IoTDB.metaManager.getStorageGroupPath(plan.getDeviceId());
     } catch (StorageGroupNotSetException e) {
       // the sg may not exist because the node does not catch up with the leader, retry after
       // synchronization
@@ -120,12 +117,7 @@ public class DataLogApplier extends BaseApplier {
       } catch (CheckConsistencyException ce) {
         throw new QueryProcessException(ce.getMessage());
       }
-      sg = IoTDB.metaManager.getStorageGroupPath(plan.getDeviceId());
     }
-    int slotId = SlotPartitionTable.getSlotStrategy().calculateSlotByTime(sg.getFullPath(), time,
-        ClusterConstant.SLOT_NUM);
-    // the slot may not be writable because it is pulling file versions, wait until it is done
-    dataGroupMember.getSlotManager().waitSlotForWrite(slotId);
     applyPhysicalPlan(plan, dataGroupMember);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index 9b98a73..4635d65 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -19,15 +19,14 @@
 
 package org.apache.iotdb.cluster.log.applier;
 
+import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,21 +48,46 @@ public class MetaLogApplier extends BaseApplier {
     try {
       logger.debug("MetaMember [{}] starts applying Log {}", metaGroupMember.getName(), log);
       if (log instanceof AddNodeLog) {
-        member.applyAddNode((AddNodeLog) log);
+        applyAddNodeLog((AddNodeLog) log);
       } else if (log instanceof PhysicalPlanLog) {
         applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
       } else if (log instanceof RemoveNodeLog) {
-        member.applyRemoveNode(((RemoveNodeLog) log));
+        applyRemoveNodeLog((RemoveNodeLog) log);
       } else if (log instanceof EmptyContentLog) {
         // Do nothing
       } else {
         logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
       }
-    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) {
+    } catch (Exception e) {
       logger.debug("Exception occurred when executing {}", log, e);
       log.setException(e);
     } finally {
       log.setApplied(true);
     }
   }
+
+  private void applyAddNodeLog(AddNodeLog log)
+      throws ChangeMembershipException, InterruptedException {
+    if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+      logger.info("Ignore previous change membership log");
+      // ignore previous change membership log
+      return;
+    }
+    if (metaGroupMember.getCharacter() == NodeCharacter.LEADER) {
+      metaGroupMember.sendLogToAllDataGroups(log);
+    }
+    member.applyAddNode(log);
+  }
+
+  private void applyRemoveNodeLog(RemoveNodeLog log)
+      throws ChangeMembershipException, InterruptedException {
+    if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+      // ignore previous change membership log
+      return;
+    }
+    if (metaGroupMember.getCharacter() == NodeCharacter.LEADER) {
+      metaGroupMember.sendLogToAllDataGroups(log);
+    }
+    member.applyRemoveNode(log);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
index 8a2fcab..3e5268b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
@@ -128,9 +128,7 @@ public class AddNodeLog extends Log {
 
   @Override
   public String toString() {
-    return "AddNodeLog{" +
-        "newNode=" + newNode +
-        '}';
+    return "AddNodeLog{" + "newNode=" + newNode.toString() + '}';
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 4b147eb..f5a7b23 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -122,9 +122,7 @@ public class RemoveNodeLog extends Log {
 
   @Override
   public String toString() {
-    return "RemoveNodeLog{" +
-        "removedNode=" + removedNode +
-        '}';
+    return "RemoveNodeLog{" + "removedNode=" + removedNode.toString() + '}';
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 16e5bd8..265ed32 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -174,6 +174,8 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
     request.setRequiredSlots(descriptor.getSlots());
     request.setRequireReadOnly(descriptor.isRequireReadOnly());
 
+    long startTime = System.currentTimeMillis();
+    logger.info("{}: data migration starts.", newMember.getName());
     boolean finished = false;
     int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()) - 1;
     while (!finished) {
@@ -200,6 +202,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
         }
       }
     }
+    logger.info("{}: data migration ends, cost {}ms", newMember.getName(), (System.currentTimeMillis() - startTime));
     removeTask();
     return null;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index b305b32..c4e2f07 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -92,7 +92,7 @@ public class SlotPartitionTable implements PartitionTable {
   private List<PartitionGroup> globalGroups;
 
   // the last meta log index that modifies the partition table
-  private long lastMetaLogIndex = -1;
+  private volatile long lastMetaLogIndex = -1;
 
   private SlotBalancer slotBalancer = new DefaultSlotBalancer(this);
 
@@ -379,6 +379,9 @@ public class SlotPartitionTable implements PartitionTable {
   public synchronized boolean deserialize(ByteBuffer buffer) {
     long newLastLogIndex = buffer.getLong();
 
+    if (logger.isDebugEnabled()) {
+      logger.debug("Partition table: lastMetaLogIndex {}, newLastLogIndex {}", lastMetaLogIndex, newLastLogIndex);
+    }
     // judge whether the partition table of byte buffer is out of date
     if (lastMetaLogIndex != -1 && lastMetaLogIndex >= newLastLogIndex) {
       return lastMetaLogIndex == newLastLogIndex;
@@ -575,11 +578,14 @@ public class SlotPartitionTable implements PartitionTable {
   }
 
   @Override
-  public synchronized long getLastMetaLogIndex() {
+  public long getLastMetaLogIndex() {
     return lastMetaLogIndex;
   }
 
-  public synchronized void setLastMetaLogIndex(long lastMetaLogIndex) {
+  public void setLastMetaLogIndex(long lastMetaLogIndex) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Set last meta log index of partition table to {}", lastMetaLogIndex);
+    }
     this.lastMetaLogIndex = Math.max(this.lastMetaLogIndex, lastMetaLogIndex);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index 8fbc772..02e3de4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -132,8 +131,6 @@ public class ClusterPlanRouter {
       return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
     } else if (plan instanceof CreateMultiTimeSeriesPlan) {
       return splitAndRoutePlan((CreateMultiTimeSeriesPlan) plan);
-    } else if (plan instanceof LogPlan) {
-      return splitAndRoutePlan((LogPlan)plan);
     }
     //the if clause can be removed after the program is stable
     if (PartitionUtils.isLocalNonQueryPlan(plan)) {
@@ -147,20 +144,24 @@ public class ClusterPlanRouter {
     throw new UnsupportedPlanException(plan);
   }
 
-  protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan)
-      throws UnknownLogTypeException, UnsupportedPlanException {
+  public Map<PhysicalPlan, PartitionGroup> splitAndRouteChangeMembershipLog(Log log) {
     Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
-    Log log = LogParser.getINSTANCE().parse(plan.getLog());
+    LogPlan plan = new LogPlan(log.serialize());
     List<Node> oldRing = new ArrayList<>(partitionTable.getAllNodes());
     if (log instanceof AddNodeLog) {
       oldRing.remove(((AddNodeLog) log).getNewNode());
     } else if (log instanceof RemoveNodeLog) {
-      oldRing.add(((RemoveNodeLog) log).getRemovedNode());
-      oldRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
-    } else {
-      throw new UnsupportedPlanException(plan);
+      if (!oldRing.contains(((RemoveNodeLog) log).getRemovedNode())) {
+        oldRing.add(((RemoveNodeLog) log).getRemovedNode());
+        oldRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
+      }
     }
-    for (PartitionGroup partitionGroup: partitionTable.calculateGlobalGroups(oldRing)) {
+    for (PartitionGroup partitionGroup : partitionTable.calculateGlobalGroups(oldRing)) {
+      // It doesn't need to notify the data group which will be removed from cluster.
+      if (log instanceof RemoveNodeLog && partitionGroup.getHeader()
+          .equals(((RemoveNodeLog) log).getRemovedNode())) {
+        continue;
+      }
       result.put(new LogPlan(plan), partitionGroup);
     }
     return result;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 6d10edb..d5a33a5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -561,13 +561,17 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       if (logger.isDebugEnabled()) {
         logger.debug("Data cluster server: start to handle new groups when adding new node {}", node);
       }
-      for (PartitionGroup newGroup: result.getNewGroupList()) {
-        if (newGroup.contains(thisNode)) {
-          logger.info("Adding this node into a new group {}", newGroup);
-          DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode);
-          addDataGroupMember(dataGroupMember);
-          dataGroupMember.pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node,
-              newGroup.getId()), node);
+      // pull snapshot has already done when the new node starts.
+      if (!node.equals(thisNode)) {
+        for (PartitionGroup newGroup : result.getNewGroupList()) {
+          if (newGroup.contains(thisNode)) {
+            logger.info("Adding this node into a new group {}", newGroup);
+            DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode);
+            addDataGroupMember(dataGroupMember);
+            dataGroupMember
+                .pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node,
+                    newGroup.getId()), node);
+          }
         }
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 82eb020..c590e9b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -266,6 +266,9 @@ public class HeartbeatThread implements Runnable {
   @SuppressWarnings({"java:S2274"})
   // enable timeout
   void startElection() {
+    if (localMember.isSkipElection()) {
+      return;
+    }
     synchronized (localMember.getTerm()) {
       long nextTerm = localMember.getTerm().incrementAndGet();
       localMember.setVoteFor(localMember.getThisNode());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index 16bca7a..35a69c6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -73,6 +73,9 @@ public class MetaHeartbeatThread extends HeartbeatThread {
 
   @Override
   void startElection() {
+//    if (localMetaMember.getThisNode().metaPort != 9003 && localMetaMember.getThisNode().metaPort != 9005) {
+//      return;
+//    }
     super.startElection();
     localMetaMember.getAppendLogThreadPool().submit(() -> localMetaMember.processEmptyContentLog());
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index f9d625d..4144b55 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -141,7 +142,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -332,10 +332,6 @@ public class MetaGroupMember extends RaftMember {
     return dataClusterServer;
   }
 
-  public void setDataClusterServer(DataClusterServer dataClusterServer) {
-    this.dataClusterServer = dataClusterServer;
-  }
-
   public DataHeartbeatServer getDataHeartbeatServer() {
     return dataHeartbeatServer;
   }
@@ -443,25 +439,23 @@ public class MetaGroupMember extends RaftMember {
     long startTime = System.currentTimeMillis();
     Node newNode = addNodeLog.getNewNode();
     synchronized (allNodes) {
-      if (partitionTable.deserialize(addNodeLog.getPartitionTable())) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("{}: adding a new node {} into {}", name, newNode, allNodes);
-        }
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}: adding a new node {} into {}", name, newNode, allNodes);
+      }
 
-        if (!allNodes.contains(newNode)) {
-          registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
-          allNodes.add(newNode);
-        }
+      if (!allNodes.contains(newNode)) {
+        registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
+        allNodes.add(newNode);
+      }
 
-        // update the partition table
-        savePartitionTable();
+      // update the partition table
+      savePartitionTable();
 
-        // update local data members
-        NodeAdditionResult result = partitionTable.getNodeAdditionResult(newNode);
-        getDataClusterServer().addNode(newNode, result);
-        if (logger.isDebugEnabled()) {
-          logger.debug("{}: success to add a new node {} into {}", name, newNode, allNodes);
-        }
+      // update local data members
+      NodeAdditionResult result = partitionTable.getNodeAdditionResult(newNode);
+      getDataClusterServer().addNode(newNode, result);
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}: success to add a new node {} into {}", name, newNode, allNodes);
       }
     }
     logger.info("{}: execute adding node {} cost {} ms", name, newNode,
@@ -892,8 +886,7 @@ public class MetaGroupMember extends RaftMember {
    * @return true if the process is over, false if the request should be forwarded
    */
   private boolean processAddNodeLocally(Node newNode, StartUpStatus startUpStatus,
-      AddNodeResponse response)
-      throws LogExecutionException, ChangeMembershipException, InterruptedException, UnsupportedPlanException {
+      AddNodeResponse response) throws LogExecutionException, InterruptedException {
     if (character != NodeCharacter.LEADER) {
       return false;
     }
@@ -942,10 +935,12 @@ public class MetaGroupMember extends RaftMember {
     // node adding is serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
-      partitionTable.addNode(newNode);
-      ((SlotPartitionTable) partitionTable).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
+      PartitionTable table = new SlotPartitionTable(thisNode);
+      table.deserialize(partitionTable.serialize());
+      table.addNode(newNode);
+      ((SlotPartitionTable) table).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
 
-      addNodeLog.setPartitionTable(partitionTable.serialize());
+      addNodeLog.setPartitionTable(table.serialize());
       addNodeLog.setCurrLogTerm(getTerm().get());
       addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
       addNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
@@ -962,7 +957,7 @@ public class MetaGroupMember extends RaftMember {
       AppendLogResult result = sendLogToFollowers(addNodeLog);
       switch (result) {
         case OK:
-          sendLogToAllDataGroups(addNodeLog);
+//          sendLogToAllDataGroups(addNodeLog);
           commitLog(addNodeLog);
           logger.info("{}: Join request of {} is accepted", name, newNode);
 
@@ -973,6 +968,12 @@ public class MetaGroupMember extends RaftMember {
           logger.info("{}: Sending join response of {}", name, newNode);
           return true;
         case TIME_OUT:
+          logger.debug("{}: log {} timed out, retrying...", name, addNodeLog);
+          try {
+            Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
           logger.info("{}: Join request of {} timed out", name, newNode);
           retryTime++;
           break;
@@ -997,7 +998,7 @@ public class MetaGroupMember extends RaftMember {
       if (++retryTime == 5) {
         break;
       }
-      Thread.sleep(20);
+      Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
     }
     return false;
   }
@@ -1031,7 +1032,12 @@ public class MetaGroupMember extends RaftMember {
           }
           return;
         case TIME_OUT:
-          logger.info("{}: add empty content log timed out", name);
+          logger.info("{}: add empty content log timed out, retry.", name);
+          try {
+            Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
           retryTime++;
           break;
         case LEADERSHIP_STALE:
@@ -2021,8 +2027,7 @@ public class MetaGroupMember extends RaftMember {
    * @param node the node to be removed.
    * @return Long.MIN_VALUE if further forwarding is required, or the execution result
    */
-  private long processRemoveNodeLocally(Node node)
-      throws LogExecutionException, ChangeMembershipException, InterruptedException, UnsupportedPlanException {
+  private long processRemoveNodeLocally(Node node) throws LogExecutionException, InterruptedException {
     if (character != NodeCharacter.LEADER) {
       return Response.RESPONSE_NULL;
     }
@@ -2057,14 +2062,27 @@ public class MetaGroupMember extends RaftMember {
       return Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT;
     }
 
+//    // If it is to remove the leader of meta group, transfer leader authority.
+//    if (node.equals(thisNode)) {
+//      logger.info("Remove the leader of meta group, it should step down and transfer leadership. Remove node: {}", node);
+//      setSkipElection(true);
+//      setCharacter(NodeCharacter.ELECTOR);
+//      setLeader(null);
+//      waitLeader();
+//      setSkipElection(false);
+//      return Response.RESPONSE_NULL;
+//    }
+
     RemoveNodeLog removeNodeLog = new RemoveNodeLog();
     // node removal must be serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
-      partitionTable.removeNode(target);
-      ((SlotPartitionTable) partitionTable).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
+      PartitionTable table = new SlotPartitionTable(thisNode);
+      table.deserialize(partitionTable.serialize());
+      table.removeNode(target);
+      ((SlotPartitionTable) table).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
 
-      removeNodeLog.setPartitionTable(partitionTable.serialize());
+      removeNodeLog.setPartitionTable(table.serialize());
       removeNodeLog.setCurrLogTerm(getTerm().get());
       removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
       removeNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
@@ -2081,12 +2099,16 @@ public class MetaGroupMember extends RaftMember {
       AppendLogResult result = sendLogToFollowers(removeNodeLog);
       switch (result) {
         case OK:
-          sendLogToAllDataGroups(removeNodeLog);
           commitLog(removeNodeLog);
           logger.info("{}: Removal request of {} is accepted", name, target);
           return Response.RESPONSE_AGREE;
         case TIME_OUT:
           logger.info("{}: Removal request of {} timed out", name, target);
+          try {
+            Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
           retryTime++;
           break;
         // retry
@@ -2097,23 +2119,58 @@ public class MetaGroupMember extends RaftMember {
     }
   }
 
-  private void sendLogToAllDataGroups(Log log)
-      throws ChangeMembershipException, UnsupportedPlanException, InterruptedException {
+  public void sendLogToAllDataGroups(Log log)
+      throws ChangeMembershipException, InterruptedException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Send log {} to all data groups: start", log);
+    }
+
+    Map<PhysicalPlan, PartitionGroup> planGroupMap = router.splitAndRouteChangeMembershipLog(log);
+    List<String> errorCodePartitionGroups = new ArrayList<>();
+    CountDownLatch counter = new CountDownLatch(planGroupMap.size());
+    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
+      getAppendLogThreadPool()
+          .submit(() -> forwardChangeMembershipPlan(log, entry, errorCodePartitionGroups, counter));
+    }
+    counter.await();
+    if (!errorCodePartitionGroups.isEmpty()) {
+      throw new ChangeMembershipException(String
+          .format("apply %s failed with status {%s}", log, errorCodePartitionGroups.toString()));
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Send log {} to all data groups: end", log);
+    }
+  }
+
+  private void forwardChangeMembershipPlan(Log log, Map.Entry<PhysicalPlan, PartitionGroup> entry,
+      List<String> errorCodePartitionGroups, CountDownLatch counter) {
     int retryTime = 0;
-    TSStatus status = null;
-    while(retryTime++ <= 5) {
-      logger.debug("Send log {} to all data groups, retry time: {}", log, retryTime);
-      LogPlan plan = new LogPlan(log.serialize());
-      status = processPartitionedPlan(plan);
-      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        break;
+    try {
+      while (true) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Send change membership log {} to data group {}, retry time: {}", log, entry.getValue(),
+              retryTime);
+        }
+        try {
+          TSStatus status = forwardToSingleGroup(entry);
+          if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            if(logger.isDebugEnabled()) {
+              logger.debug("Success to send change membership log {} to data group {}", log, entry.getValue());
+            }
+            return;
+          }
+          Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+        } catch (InterruptedException e) {
+          synchronized (errorCodePartitionGroups) {
+            errorCodePartitionGroups.add(e.getMessage());
+          }
+          return;
+        }
+        retryTime++;
       }
-      Thread.sleep(100);
-    }
-    if (status == null || status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new ChangeMembershipException(String.format("apply %s failed with status {%s}", log, status));
+    } finally {
+      counter.countDown();
     }
-    logger.debug("Send log {} to all data groups: end", log);
   }
 
   /**
@@ -2128,48 +2185,46 @@ public class MetaGroupMember extends RaftMember {
     long startTime = System.currentTimeMillis();
     Node oldNode = removeNodeLog.getRemovedNode();
     synchronized (allNodes) {
-      if (partitionTable.deserialize(removeNodeLog.getPartitionTable())) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("{}: Removing a node {} from {}", name, oldNode, allNodes);
-        }
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}: Removing a node {} from {}", name, oldNode, allNodes);
+      }
 
-        if (allNodes.contains(oldNode)) {
-          allNodes.remove(oldNode);
-          idNodeMap.remove(oldNode.nodeIdentifier);
-        }
+      if (allNodes.contains(oldNode)) {
+        allNodes.remove(oldNode);
+        idNodeMap.remove(oldNode.nodeIdentifier);
+      }
 
-        // save the updated partition table
-        savePartitionTable();
+      // save the updated partition table
+      savePartitionTable();
 
-        // update DataGroupMembers, as the node is removed, the members of some groups are
-        // changed and there will also be one less group
-        NodeRemovalResult result = partitionTable.getNodeRemovalResult();
-        getDataClusterServer().removeNode(oldNode, result);
+      // update DataGroupMembers, as the node is removed, the members of some groups are
+      // changed and there will also be one less group
+      NodeRemovalResult result = partitionTable.getNodeRemovalResult();
+      getDataClusterServer().removeNode(oldNode, result);
 
-        // the leader is removed, start the next election ASAP
-        if (oldNode.equals(leader.get())) {
-          setCharacter(NodeCharacter.ELECTOR);
-          setLeader(ClusterConstant.EMPTY_NODE);
-          lastHeartbeatReceivedTime = Long.MIN_VALUE;
-        }
+      // the leader is removed, start the next election ASAP
+      if (oldNode.equals(leader.get())) {
+        setCharacter(NodeCharacter.ELECTOR);
+        setLeader(ClusterConstant.EMPTY_NODE);
+        lastHeartbeatReceivedTime = Long.MIN_VALUE;
+      }
 
-        if (oldNode.equals(thisNode)) {
-          // use super.stop() so that the data server will not be closed because other nodes may
-          // want to pull data from this node
-          super.stop();
-          if (clientServer != null) {
-            clientServer.stop();
-          }
-          logger.info("{} has been removed from the cluster", name);
-        } else if (thisNode.equals(leader.get())) {
-          // as the old node is removed, it cannot know this by heartbeat or log, so it should be
-          // directly kicked out of the cluster
-          exileNode(removeNodeLog);
+      if (oldNode.equals(thisNode)) {
+        // use super.stop() so that the data server will not be closed because other nodes may
+        // want to pull data from this node
+        super.stop();
+        if (clientServer != null) {
+          clientServer.stop();
         }
+        logger.info("{} has been removed from the cluster", name);
+      } else if (thisNode.equals(leader.get())) {
+        // as the old node is removed, it cannot know this by heartbeat or log, so it should be
+        // directly kicked out of the cluster
+        exileNode(removeNodeLog);
+      }
 
-        if (logger.isDebugEnabled()) {
-          logger.debug("{}: Success to remove a node {} from {}", name, oldNode, allNodes);
-        }
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}: Success to remove a node {} from {}", name, oldNode, allNodes);
       }
 
       logger.info("{}: execute removing node {} cost {} ms", name, oldNode,
@@ -2234,6 +2289,7 @@ public class MetaGroupMember extends RaftMember {
    * @return key: data group; value: slot num in data migration
    */
   public Map<PartitionGroup, Integer> collectAllPartitionMigrationStatus() {
+    syncLeader();
     Map<PartitionGroup, Integer> res = new HashMap<>();
     for (Node node: allNodes) {
       Map<PartitionGroup, Integer> oneNodeRes;
@@ -2257,6 +2313,7 @@ public class MetaGroupMember extends RaftMember {
    * @return key: data group; value: slot num in data migration
    */
   public Map<PartitionGroup, Integer> collectMigrationStatus() {
+    logger.info("{}: start to collect migration status.", name);
     Map<PartitionGroup, Integer> groupSlotMap = new HashMap<>();
     Map<RaftNode, DataGroupMember> headerMap = getDataClusterServer().getHeaderGroupMap();
     waitUtil(getPartitionTable().getLastMetaLogIndex());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index bfdfb70..89fd12d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -61,9 +61,7 @@ import org.apache.iotdb.cluster.log.LogDispatcher;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
-import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -242,7 +240,12 @@ public abstract class RaftMember {
    */
   private LogDispatcher logDispatcher;
 
-  private boolean hasSyncedLeaderBeforeRemoved = false;
+  private volatile boolean hasSyncedLeaderBeforeRemoved = false;
+
+  /**
+   * If this node can not be the leader, this parameter will be set true.
+   */
+  private volatile boolean skipElection = false;
 
   protected RaftMember() {
   }
@@ -1478,7 +1481,7 @@ public abstract class RaftMember {
         term.set(newTerm);
         setVoteFor(null);
         setCharacter(NodeCharacter.ELECTOR);
-        setLeader(ClusterConstant.EMPTY_NODE);
+        setLeader(null);
         updateHardState(newTerm, getVoteFor());
       }
 
@@ -1560,7 +1563,7 @@ public abstract class RaftMember {
         case TIME_OUT:
           logger.debug("{}: log {} timed out, retrying...", name, log);
           try {
-            Thread.sleep(1000);
+            Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
           }
@@ -1909,4 +1912,12 @@ public abstract class RaftMember {
   public void setHasSyncedLeaderBeforeRemoved(boolean hasSyncedLeaderAfterRemoved) {
     this.hasSyncedLeaderBeforeRemoved = hasSyncedLeaderAfterRemoved;
   }
+
+  public boolean isSkipElection() {
+    return skipElection;
+  }
+
+  public void setSkipElection(boolean skipElection) {
+    this.skipElection = skipElection;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 1ee9ab5..fceb078 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -154,7 +154,7 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
 
   @Override
   public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
-    long result = Response.RESPONSE_NULL;
+    long result;
     try {
       result = metaGroupMember.removeNode(node);
     } catch (PartitionTableUnavailableException | LogExecutionException | ChangeMembershipException | InterruptedException | UnsupportedPlanException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 6b58665..3a061db 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -81,7 +81,7 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
     }
     List<Pair<Node, NodeCharacter>> res = new ArrayList<>();
     for (Node node : metaMember.getPartitionTable().getAllNodes()) {
-      if (node.equals(metaMember.getThisNode())) {
+      if (node.equals(metaMember.getLeader())) {
         res.add(new Pair<>(node, NodeCharacter.LEADER));
       } else {
         res.add(new Pair<>(node, NodeCharacter.FOLLOWER));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 59874b4..259bffb 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -107,6 +107,7 @@ public class LogParserTest {
   @Test
   public void testLogPlan() throws IOException, IllegalPathException, UnknownLogTypeException {
     AddNodeLog log = new AddNodeLog(TestUtils.seralizePartitionTable, TestUtils.getNode(0));
+    log.setMetaLogIndex(1);
     LogPlan logPlan = new LogPlan(log.serialize());
     ByteBuffer buffer = ByteBuffer.wrap(PlanSerializer.getInstance().serialize(logPlan));
     PhysicalPlan plan = PhysicalPlan.Factory.create(buffer);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 2402ff8..2eb61bd 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -192,12 +192,7 @@ public class MetaGroupMemberTest extends MemberTest {
 
     buildDataGroups(dataClusterServer);
     testMetaMember.getThisNode().setNodeIdentifier(0);
-    testMetaMember.setRouter(new ClusterPlanRouter(testMetaMember.getPartitionTable()){
-      @Override
-      protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan) {
-        return Collections.singletonMap(plan, partitionTable.getHeaderGroup(testMetaMember.getThisNode()));
-      }
-    });
+    testMetaMember.setRouter(new ClusterPlanRouter(testMetaMember.getPartitionTable()));
     mockDataClusterServer = false;
     NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
     exiledNode = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 988469a..b9e5ef6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -769,7 +769,8 @@ public class IoTDBConfig {
   private int mtreeSnapshotThresholdTime = 3600;
 
   /**
-   * Time range for partitioning data inside each storage group, the unit is second
+   * Time range for partitioning data inside each storage group, the unit is second.
+   * Default time is a week.
    */
   private long partitionInterval = 604800;