You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/03/06 05:59:40 UTC

[iotdb] branch cluster_scalability updated: This commit fix following issues: 1. Imporve node-tool, add function to query the status of data migration 2. When add/remove node, add a logic to query whther data migration for previous change membership operation is finished. If not, refuse the operation. 3. In the second stage of change membership, use preAddNode/preRemoveNode instead of syncLeader to make sure the order. 4. Fix a bug of install snapshot when data migration. In this case, it's wrong to judge overlap just by the range o [...]

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_scalability by this push:
     new 4505b95  This commit fix following issues: 1. Imporve node-tool, add function to query the status of data migration 2. When add/remove node, add a logic to query whther data migration for previous change membership operation is finished. If not, refuse the operation. 3. In the second stage of change membership, use preAddNode/preRemoveNode instead of syncLeader to make sure the order. 4. Fix a bug of install snapshot when data migration. In this case, it's wrong to judge overlap  [...]
4505b95 is described below

commit 4505b9589fdc4f8bdae970a763de98ac6a54c6a3
Author: lta <li...@163.com>
AuthorDate: Sat Mar 6 13:48:43 2021 +0800

    This commit fix following issues:
    1. Imporve node-tool, add function to query the status of data migration
    2. When add/remove node, add a logic to query whther data migration for previous change membership operation is finished. If not, refuse the operation.
    3. In the second stage of change membership, use preAddNode/preRemoveNode instead of syncLeader to make sure the order.
    4. Fix a bug of install snapshot when data migration. In this case, it's wrong to judge overlap just by the range of plan index because they are not in the same raft group. So add a logic to distinguish snapshot for catch-up and data migration. For catch-up, new tsfile in data migration will renumber the plan indexs.
    5. Fix a bug of repeatlly pull slots which may leader to error.
    6. Improvement: remove the syncLeader when install snapshot in the stage of data migration because it's unnecessary.
    7. When query, it should consider the case that slot is in data migration, which means that it should merge the resource of local data and previous holder data.
---
 .../resources/conf/iotdb-cluster.properties        |   2 +-
 .../exception/ChangeMembershipException.java       |   2 +-
 .../exception/RedirectMetaLeaderException.java     |  38 ------
 .../iotdb/cluster/log/snapshot/FileSnapshot.java   |  46 ++++---
 .../cluster/log/snapshot/PartitionedSnapshot.java  |   1 -
 .../cluster/log/snapshot/SnapshotInstaller.java    |   1 +
 .../iotdb/cluster/partition/slot/SlotManager.java  |  38 ++++--
 .../cluster/query/ClusterDataQueryExecutor.java    |   4 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |   5 +-
 .../query/aggregate/ClusterAggregateExecutor.java  |   2 +-
 .../cluster/query/filter/SlotTsFileFilter.java     |  12 +-
 .../groupby/ClusterGroupByVFilterDataSet.java      |   2 +-
 .../cluster/query/reader/ClusterReaderFactory.java | 140 +++++++++++++++------
 .../cluster/query/reader/ClusterTimeGenerator.java |   2 +-
 .../iotdb/cluster/server/DataClusterServer.java    |   7 +-
 .../iotdb/cluster/server/MetaClusterServer.java    |  10 ++
 .../apache/iotdb/cluster/server/RaftServer.java    |   2 +-
 .../org/apache/iotdb/cluster/server/Response.java  |   2 +
 .../server/heartbeat/MetaHeartbeatThread.java      |   1 -
 .../cluster/server/member/DataGroupMember.java     |  73 ++++++-----
 .../cluster/server/member/MetaGroupMember.java     | 117 ++++++++++++++++-
 .../iotdb/cluster/server/member/RaftMember.java    |   4 +
 .../cluster/server/service/DataAsyncService.java   |   3 +-
 .../cluster/server/service/DataSyncService.java    |   3 +-
 .../cluster/server/service/MetaAsyncService.java   |   9 +-
 .../cluster/server/service/MetaSyncService.java    |   9 +-
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |  32 +++++
 .../cluster/utils/nodetool/ClusterMonitor.java     |  22 ++--
 .../utils/nodetool/ClusterMonitorMBean.java        |   5 +-
 .../cluster/utils/nodetool/function/Migration.java |   9 +-
 .../utils/nodetool/function/NodeToolCmd.java       |  14 ++-
 .../cluster/utils/nodetool/function/Partition.java |   2 +-
 .../iotdb/cluster/common/TestDataGroupMember.java  |   4 +-
 .../iotdb/cluster/partition/SlotManagerTest.java   |   6 +-
 .../cluster/server/member/DataGroupMemberTest.java |   4 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   4 +-
 .../engine/storagegroup/StorageGroupProcessor.java |   4 +-
 .../db/engine/storagegroup/TsFileResource.java     |   6 +-
 thrift/src/main/thrift/cluster.thrift              |  11 +-
 39 files changed, 451 insertions(+), 207 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 4b9b2f5..f38870e 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -114,7 +114,7 @@ enable_auto_create_schema=true
 consistency_level=mid
 
 # Whether to use asynchronous server
-is_use_async_server=false
+is_use_async_server=true
 
 # Whether to use asynchronous applier
 is_use_async_applier=true
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
index f50e668..a377fda 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
@@ -25,6 +25,6 @@ package org.apache.iotdb.cluster.exception;
 public class ChangeMembershipException extends Exception {
 
   public ChangeMembershipException(String errMsg) {
-    super(String.format("change membership fail, error message=%s ", errMsg));
+    super(String.format("Change membership fails, error message=%s ", errMsg));
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/RedirectMetaLeaderException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/RedirectMetaLeaderException.java
deleted file mode 100644
index 6097d96..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/RedirectMetaLeaderException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.exception;
-
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-
-/**
- * Redirect to meta leader.
- */
-public class RedirectMetaLeaderException extends Exception {
-
-  private Node metaLeader;
-
-  public RedirectMetaLeaderException(Node leader) {
-    super(String.format("Redirect to meta leader %s", leader));
-    this.metaLeader = leader;
-  }
-
-  public Node getMetaLeader() {
-    return metaLeader;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index 25a0c0f..a731d32 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.PullFileException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.log.Snapshot;
@@ -198,15 +197,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         logger.info("Starting to install a snapshot {} into slot[{}]", snapshot, slot);
         installFileSnapshotSchema(snapshot);
         logger.info("Schemas in snapshot are registered");
-
-        SlotStatus status = slotManager.getStatus(slot);
-        if (status == SlotStatus.PULLING) {
-          // as the schemas are set, writes can proceed
-          slotManager.setToPullingWritable(slot);
-          logger.debug("{}: slot {} is now pulling writable", name, slot);
-        }
-
-        installFileSnapshotFiles(snapshot, slot);
+        installFileSnapshotFiles(snapshot, slot, false);
       } catch (PullFileException e) {
         throw new SnapshotInstallationException(e);
       }
@@ -221,13 +212,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
 
     private void installSnapshot(Map<Integer, FileSnapshot> snapshotMap)
         throws SnapshotInstallationException {
-      // ensure StorageGroups are synchronized
-      try {
-        dataGroupMember.getMetaGroupMember().syncLeaderWithConsistencyCheck(true);
-      } catch (CheckConsistencyException e) {
-        throw new SnapshotInstallationException(e);
-      }
-
+      // In data migration, meta group member does not need to synchronize the leader,
+      // because data migration must be carried out after meta group applied add/remove node log.
       for (Entry<Integer, FileSnapshot> integerSnapshotEntry : snapshotMap.entrySet()) {
         Integer slot = integerSnapshotEntry.getKey();
         FileSnapshot snapshot = integerSnapshotEntry.getValue();
@@ -245,7 +231,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         Integer slot = integerSnapshotEntry.getKey();
         FileSnapshot snapshot = integerSnapshotEntry.getValue();
         try {
-          installFileSnapshotFiles(snapshot, slot);
+          installFileSnapshotFiles(snapshot, slot, true);
         } catch (PullFileException e) {
           throw new SnapshotInstallationException(e);
         }
@@ -261,7 +247,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
       }
     }
 
-    private void installFileSnapshotFiles(FileSnapshot snapshot, int slot)
+    private void installFileSnapshotFiles(FileSnapshot snapshot, int slot, boolean isDataMigration)
         throws PullFileException {
       List<RemoteTsFileResource> remoteTsFileResources = snapshot.getDataFiles();
       // pull file
@@ -271,18 +257,28 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         logger.info("Pulling {}/{} files, current: {}", i + 1, remoteTsFileResources.size(),
             resource);
         try {
-          if (!isFileAlreadyPulled(resource)) {
+          if (isDataMigration) {
+            // This means that the minimum plan index and maximum plan index of some files are the same,
+            // so the logic of judging index coincidence needs to remove the case of equal
+            resource.setMinPlanIndex(dataGroupMember.getLogManager().getLastLogIndex());
+            resource.setMaxPlanIndex(dataGroupMember.getLogManager().getLastLogIndex());
             loadRemoteFile(resource);
           } else {
-            // notify the snapshot provider to remove the hardlink
-            removeRemoteHardLink(resource);
+            if (isFileAlreadyPulled(resource)) {
+              loadRemoteFile(resource);
+            } else {
+              // notify the snapshot provider to remove the hardlink
+              removeRemoteHardLink(resource);
+            }
           }
         } catch (IllegalPathException e) {
           throw new PullFileException(resource.getTsFilePath(), resource.getSource(), e);
         }
       }
-      // all files are loaded, the slot can be queried without accessing the previous holder
-      slotManager.setToNull(slot, false);
+      if (isDataMigration) {
+        // all files are loaded, the slot can be queried without accessing the previous holder
+        slotManager.setToNull(slot, false);
+      }
       logger.info("{}: slot {} is ready", name, slot);
     }
 
@@ -581,7 +577,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
   @Override
   public void truncateBefore(long minIndex) {
     dataFiles.removeIf(res -> {
-      boolean toBeTruncated = res.getMaxPlanIndex() <= minIndex;
+      boolean toBeTruncated = res.getMaxPlanIndex() < minIndex;
       if (toBeTruncated) {
         // also remove the hardlink
         res.remove();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
index 605d086..54eaf98 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
@@ -141,7 +141,6 @@ public class PartitionedSnapshot<T extends Snapshot> extends Snapshot {
     }
 
     @Override
-
     public void install(PartitionedSnapshot snapshot, int slot)
         throws SnapshotInstallationException {
       installPartitionedSnapshot(snapshot);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/SnapshotInstaller.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/SnapshotInstaller.java
index 86d6321..1ee55ed 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/SnapshotInstaller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/SnapshotInstaller.java
@@ -28,5 +28,6 @@ public interface SnapshotInstaller<T extends Snapshot> {
 
   void install(T snapshot, int slot) throws SnapshotInstallationException;
 
+  // for data migration
   void install(Map<Integer, T> snapshotMap) throws SnapshotInstallationException;
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
index 9b26e7a..bad45f0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
@@ -54,10 +54,13 @@ public class SlotManager {
    */
   private Map<Integer, SlotDescriptor> idSlotMap;
 
-  public SlotManager(long totalSlotNumber, String memberDir) {
+  private String memberName;
+
+  public SlotManager(long totalSlotNumber, String memberDir, String memberName) {
     if (memberDir != null) {
       this.slotFilePath = memberDir + File.separator + SLOT_FILE_NAME;
     }
+    this.memberName = memberName;
     if (!load()) {
       init(totalSlotNumber);
     }
@@ -104,16 +107,15 @@ public class SlotManager {
    */
   public void waitSlotForWrite(int slotId) throws StorageEngineException {
     SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
+    long startTime = System.currentTimeMillis();
     while (true) {
       synchronized (slotDescriptor) {
-        if (slotDescriptor.slotStatus == SlotStatus.SENDING
-            || slotDescriptor.slotStatus == SlotStatus.SENT) {
-          throw new StorageEngineException(String.format("Slot %d no longer belongs to the node",
-              slotId));
-        }
-        if (slotDescriptor.slotStatus != SlotStatus.NULL &&
-            slotDescriptor.slotStatus != SlotStatus.PULLING_WRITABLE) {
+        if (slotDescriptor.slotStatus == SlotStatus.PULLING) {
           try {
+            if ((System.currentTimeMillis() - startTime) >= 5000) {
+              throw new StorageEngineException(String.format("The status of slot %d is still PULLING after 5s.",
+                  slotId));
+            }
             slotDescriptor.wait(SLOT_WAIT_INTERVAL_MS);
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -127,6 +129,14 @@ public class SlotManager {
   }
 
   /**
+   * If a slot in the status of PULLING or PULLING_WRITABLE, reads of it should merge the source
+   */
+  public boolean checkSlotInDataMigrationStatus(int slotId) {
+    SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
+    return slotDescriptor.slotStatus == SlotStatus.PULLING || slotDescriptor.slotStatus == SlotStatus.PULLING_WRITABLE;
+  }
+
+  /**
    *
    * @param slotId
    * @return the SlotStatus of a slot
@@ -295,6 +305,18 @@ public class SlotManager {
     }
   }
 
+  public int getSloNumInDataMigration() {
+    int res = 0;
+    for (Entry<Integer, SlotDescriptor> entry: idSlotMap.entrySet()) {
+      SlotDescriptor descriptor = entry.getValue();
+      if (descriptor.slotStatus == SlotStatus.PULLING || descriptor.slotStatus == SlotStatus.PULLING_WRITABLE) {
+        logger.info("{}: slot {} is in data migration, status is {}",memberName, entry.getKey(), descriptor.slotStatus);
+        res++;
+      }
+    }
+    return res;
+  }
+
   private void serialize(DataOutputStream outputStream) throws IOException {
     outputStream.writeInt(idSlotMap.size());
     for (Entry<Integer, SlotDescriptor> integerSlotDescriptorEntry : idSlotMap.entrySet()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index 3af10a8..4ff9424 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -72,7 +72,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
       try {
         reader = readerFactory.getSeriesReader(path,
             queryPlan.getAllMeasurementsInDevice(path.getDevice()), dataType, timeFilter,
-            null, context, queryPlan.isAscending());
+            null, context, queryPlan.isAscending(), null);
       } catch (EmptyIntervalException e) {
         logger.info(e.getMessage());
         return Collections.emptyList();
@@ -92,7 +92,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
       QueryContext context)
       throws StorageEngineException, QueryProcessException {
     return readerFactory.getReaderByTimestamp(path, deviceMeasurements, dataType, context,
-        queryPlan.isAscending());
+        queryPlan.isAscending(), null);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 49e20af..0b114ae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
+import org.apache.iotdb.cluster.exception.EmptyIntervalException;
 import org.apache.iotdb.cluster.exception.ReaderNotFoundException;
 import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
@@ -186,7 +187,7 @@ public class LocalQueryExecutor {
     logger.debug("{}: local queryId for {}#{} is {}", name, request.getQueryId(),
         request.getPath(), queryContext.getQueryId());
     IBatchReader batchReader = readerFactory.getSeriesBatchReader(path, deviceMeasurements,
-        dataType, timeFilter, valueFilter, queryContext, dataGroupMember, request.ascending);
+        dataType, timeFilter, valueFilter, queryContext, dataGroupMember, request.ascending, request.requiredSlots);
 
     // if the reader contains no data, send a special id of -1 to prevent the requester from
     // meaninglessly fetching data
@@ -317,7 +318,7 @@ public class LocalQueryExecutor {
     logger.debug("{}: local queryId for {}#{} is {}", name, request.getQueryId(),
         request.getPath(), queryContext.getQueryId());
     IReaderByTimestamp readerByTimestamp = readerFactory.getReaderByTimestamp(path,
-        deviceMeasurements, dataType, queryContext, dataGroupMember, request.ascending);
+        deviceMeasurements, dataType, queryContext, dataGroupMember, request.ascending, request.requiredSlots);
     if (readerByTimestamp != null) {
       long readerId = queryManager.registerReaderByTime(readerByTimestamp);
       queryContext.registerLocalReader(readerId);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
index 9d41f22..91280f0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
@@ -91,6 +91,6 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
       throws StorageEngineException, QueryProcessException {
     return readerFactory.getReaderByTimestamp(path,
         dataQueryPlan.getAllMeasurementsInDevice(path.getDevice()),
-        dataType, context, dataQueryPlan.isAscending());
+        dataType, context, dataQueryPlan.isAscending(), null);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/filter/SlotTsFileFilter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/filter/SlotTsFileFilter.java
index 3bae96e..049cd9b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/filter/SlotTsFileFilter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/filter/SlotTsFileFilter.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.cluster.query.filter;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -32,18 +34,22 @@ import org.slf4j.LoggerFactory;
 public class SlotTsFileFilter implements TsFileFilter {
 
   private static final Logger logger = LoggerFactory.getLogger(SlotTsFileFilter.class);
-  private List<Integer> slots;
+  private Set<Integer> slots;
 
-  public SlotTsFileFilter(List<Integer> slots) {
+  public SlotTsFileFilter(Set<Integer> slots) {
     this.slots = slots;
   }
 
+  public SlotTsFileFilter(List<Integer> slots) {
+    this.slots = new HashSet<>(slots);
+  }
+
   @Override
   public boolean fileNotSatisfy(TsFileResource resource) {
     return fileNotInSlots(resource, slots);
   }
 
-  private static boolean fileNotInSlots(TsFileResource resource, List<Integer> nodeSlots) {
+  private static boolean fileNotInSlots(TsFileResource resource, Set<Integer> nodeSlots) {
     Pair<String, Long> sgNameAndPartitionIdPair = FilePathUtils
         .getLogicalSgNameAndTimePartitionIdPair(resource);
     int slot = SlotPartitionTable.getSlotStrategy()
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
index 085fd05..18f2cc0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
@@ -69,6 +69,6 @@ public class ClusterGroupByVFilterDataSet extends GroupByWithValueFilterDataSet
       TsFileFilter fileFilter) throws StorageEngineException, QueryProcessException {
     return readerFactory.getReaderByTimestamp(path,
         dataQueryPlan.getAllMeasurementsInDevice(path.getDevice()), dataType, context,
-        dataQueryPlan.isAscending());
+        dataQueryPlan.isAscending(), null);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index c5f6ee6..df3e3e3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -23,7 +23,10 @@ package org.apache.iotdb.cluster.query.reader;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
@@ -55,6 +58,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByExecutor;
+import org.apache.iotdb.db.query.externalsort.adapter.ByTimestampReaderAdapter;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -89,7 +93,7 @@ public class ClusterReaderFactory {
    * cluster. This will query every group and merge the result from them.
    */
   public IReaderByTimestamp getReaderByTimestamp(PartialPath path,
-      Set<String> deviceMeasurements, TSDataType dataType, QueryContext context, boolean ascending)
+      Set<String> deviceMeasurements, TSDataType dataType, QueryContext context, boolean ascending, Set<Integer> requiredSlots)
       throws StorageEngineException, QueryProcessException {
     // make sure the partition table is new
     try {
@@ -111,7 +115,7 @@ public class ClusterReaderFactory {
     for (PartitionGroup partitionGroup : partitionGroups) {
       // query each group to get a reader in that group
       readers.add(getSeriesReaderByTime(partitionGroup, path, deviceMeasurements, context,
-          dataType, ascending));
+          dataType, ascending, requiredSlots));
     }
     // merge the readers
     return new MergedReaderByTime(readers);
@@ -123,7 +127,7 @@ public class ClusterReaderFactory {
    * to one node in that group.
    */
   private IReaderByTimestamp getSeriesReaderByTime(PartitionGroup partitionGroup, PartialPath path,
-      Set<String> deviceMeasurements, QueryContext context, TSDataType dataType, boolean ascending)
+      Set<String> deviceMeasurements, QueryContext context, TSDataType dataType, boolean ascending, Set<Integer> requiredSlots)
       throws StorageEngineException, QueryProcessException {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
       // the target storage group contains this node, perform a local query
@@ -135,10 +139,10 @@ public class ClusterReaderFactory {
             context.getQueryId());
       }
       return getReaderByTimestamp(path, deviceMeasurements, dataType, context, dataGroupMember,
-          ascending);
+          ascending, requiredSlots);
     } else {
       return getRemoteReaderByTimestamp(path, deviceMeasurements, dataType, partitionGroup,
-          context, ascending);
+          context, ascending, requiredSlots);
     }
   }
 
@@ -150,9 +154,9 @@ public class ClusterReaderFactory {
   private IReaderByTimestamp getRemoteReaderByTimestamp(
       Path path, Set<String> deviceMeasurements, TSDataType dataType,
       PartitionGroup partitionGroup,
-      QueryContext context, boolean ascending) throws StorageEngineException {
+      QueryContext context, boolean ascending, Set<Integer> requiredSlots) throws StorageEngineException {
     SingleSeriesQueryRequest request = constructSingleQueryRequest(null,
-        null, dataType, path, deviceMeasurements, partitionGroup, context, ascending);
+        null, dataType, path, deviceMeasurements, partitionGroup, context, ascending, requiredSlots);
 
     // reorder the nodes by their communication delays
     List<Node> reorderedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
@@ -182,7 +186,7 @@ public class ClusterReaderFactory {
    */
   public ManagedSeriesReader getSeriesReader(PartialPath path,
       Set<String> deviceMeasurements, TSDataType dataType,
-      Filter timeFilter, Filter valueFilter, QueryContext context, boolean ascending)
+      Filter timeFilter, Filter valueFilter, QueryContext context, boolean ascending, Set<Integer> requiredSlots)
       throws StorageEngineException, EmptyIntervalException {
     // make sure the partition table is new
     try {
@@ -199,7 +203,7 @@ public class ClusterReaderFactory {
       // build a reader for each group and merge them
       for (PartitionGroup partitionGroup : partitionGroups) {
         IPointReader seriesReader = getSeriesReader(partitionGroup, path,
-            deviceMeasurements, timeFilter, valueFilter, context, dataType, ascending);
+            deviceMeasurements, timeFilter, valueFilter, context, dataType, ascending, requiredSlots);
         mergeReader.addReader(seriesReader, 0);
       }
     } catch (IOException | QueryProcessException e) {
@@ -218,7 +222,7 @@ public class ClusterReaderFactory {
    */
   private IPointReader getSeriesReader(PartitionGroup partitionGroup, PartialPath path,
       Set<String> deviceMeasurements, Filter timeFilter, Filter valueFilter,
-      QueryContext context, TSDataType dataType, boolean ascending)
+      QueryContext context, TSDataType dataType, boolean ascending, Set<Integer> requiredSlots)
       throws IOException,
       StorageEngineException, QueryProcessException {
     if (partitionGroup.contains(metaGroupMember.getThisNode())) {
@@ -229,7 +233,7 @@ public class ClusterReaderFactory {
                   context.getQueryId()));
       IPointReader seriesPointReader = getSeriesPointReader(path, deviceMeasurements, dataType,
           timeFilter, valueFilter,
-          context, dataGroupMember, ascending);
+          context, dataGroupMember, ascending, requiredSlots);
       if (logger.isDebugEnabled()) {
         logger.debug("{}: creating a local reader for {}#{} of {}, empty: {}",
             metaGroupMember.getName(),
@@ -240,7 +244,7 @@ public class ClusterReaderFactory {
       return seriesPointReader;
     } else {
       return getRemoteSeriesPointReader(timeFilter, valueFilter, dataType, path,
-          deviceMeasurements, partitionGroup, context, ascending);
+          deviceMeasurements, partitionGroup, context, ascending, requiredSlots);
     }
   }
 
@@ -258,7 +262,7 @@ public class ClusterReaderFactory {
    */
   public IPointReader getSeriesPointReader(PartialPath path, Set<String> allSensors,
       TSDataType dataType, Filter timeFilter, Filter valueFilter, QueryContext context,
-      DataGroupMember dataGroupMember, boolean ascending)
+      DataGroupMember dataGroupMember, boolean ascending, Set<Integer> requiredSlots)
       throws StorageEngineException, QueryProcessException {
     // pull the newest data
     try {
@@ -268,7 +272,7 @@ public class ClusterReaderFactory {
     }
     return new SeriesRawDataPointReader(
         getSeriesReader(path, allSensors, dataType, timeFilter,
-            valueFilter, context, dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId(), ascending));
+            valueFilter, context, dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId(), ascending, requiredSlots));
 
   }
 
@@ -287,15 +291,18 @@ public class ClusterReaderFactory {
   private SeriesReader getSeriesReader(PartialPath path, Set<String> allSensors, TSDataType
       dataType,
       Filter timeFilter,
-      Filter valueFilter, QueryContext context, Node header, int raftId, boolean ascending)
+      Filter valueFilter, QueryContext context, Node header, int raftId, boolean ascending, Set<Integer> requiredSlots)
       throws StorageEngineException, QueryProcessException {
     ClusterQueryUtils.checkPathExistence(path);
-    List<Integer> nodeSlots =
-        ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header, raftId);
+    if (requiredSlots == null) {
+      List<Integer> nodeSlots =
+          ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header, raftId);
+      requiredSlots = new HashSet<>(nodeSlots);
+    }
     QueryDataSource queryDataSource =
         QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
     return new SeriesReader(path, allSensors, dataType, context, queryDataSource,
-        timeFilter, valueFilter, new SlotTsFileFilter(nodeSlots), ascending);
+        timeFilter, valueFilter, new SlotTsFileFilter(requiredSlots), ascending);
   }
 
   /**
@@ -310,12 +317,12 @@ public class ClusterReaderFactory {
   private IPointReader getRemoteSeriesPointReader(Filter timeFilter,
       Filter valueFilter, TSDataType dataType, Path path,
       Set<String> deviceMeasurements, PartitionGroup partitionGroup,
-      QueryContext context, boolean ascending)
+      QueryContext context, boolean ascending, Set<Integer> requiredSlots)
       throws StorageEngineException {
     SingleSeriesQueryRequest request = constructSingleQueryRequest(timeFilter, valueFilter,
-        dataType, path, deviceMeasurements, partitionGroup, context, ascending);
+        dataType, path, deviceMeasurements, partitionGroup, context, ascending, requiredSlots);
 
-    // reorder the nodes such that the nodes that suit the query best (have lowest latenct or
+    // reorder the nodes such that the nodes that suit the query best (have lowest latency or
     // highest throughput) will be put to the front
     List<Node> orderedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
 
@@ -337,7 +344,7 @@ public class ClusterReaderFactory {
   private SingleSeriesQueryRequest constructSingleQueryRequest(Filter timeFilter,
       Filter valueFilter, TSDataType dataType, Path path,
       Set<String> deviceMeasurements, PartitionGroup partitionGroup,
-      QueryContext context, boolean ascending) {
+      QueryContext context, boolean ascending, Set<Integer> requiredSlots) {
     SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
     request.setRaftId(partitionGroup.getId());
     if (timeFilter != null) {
@@ -353,6 +360,7 @@ public class ClusterReaderFactory {
     request.setDataTypeOrdinal(dataType.ordinal());
     request.setDeviceMeasurements(deviceMeasurements);
     request.setAscending(ascending);
+    request.setRequiredSlots(requiredSlots);
     return request;
   }
 
@@ -522,7 +530,7 @@ public class ClusterReaderFactory {
    */
   public IBatchReader getSeriesBatchReader(PartialPath path, Set<String> allSensors,
       TSDataType dataType, Filter timeFilter,
-      Filter valueFilter, QueryContext context, DataGroupMember dataGroupMember, boolean ascending)
+      Filter valueFilter, QueryContext context, DataGroupMember dataGroupMember, boolean ascending, Set<Integer> requiredSlots)
       throws StorageEngineException, QueryProcessException, IOException {
     // pull the newest data
     try {
@@ -531,12 +539,40 @@ public class ClusterReaderFactory {
       throw new StorageEngineException(e);
     }
 
-    SeriesReader seriesReader = getSeriesReader(path, allSensors, dataType, timeFilter,
-        valueFilter, context, dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId(), ascending);
-    if (seriesReader.isEmpty()) {
-      return null;
+    // find the groups that should be queried due to data migration.
+    // when a slot is in the status of PULLING or PULLING_WRITABLE, the read of it should merge result to guarantee integrity.
+    Map<PartitionGroup, Set<Integer>> holderSlotMap = dataGroupMember.getPreviousHolderSlotMap();
+
+    // If requiredSlots is not null, it means that this data group is the previous holder of required slots, which is no need to merge other resource,
+    if (requiredSlots == null && !holderSlotMap.isEmpty()) {
+      // merge remote reader and local reader
+      ManagedMergeReader mergeReader = new ManagedMergeReader(dataType);
+
+      // add local reader
+      IPointReader seriesPointReader = getSeriesPointReader(path, allSensors, dataType, timeFilter,
+          valueFilter, context, dataGroupMember, ascending, requiredSlots);
+      mergeReader.addReader(seriesPointReader, 0);
+
+      // add previous holder reader due to in the stage of data migration
+      logger.debug("{}: Sending data query of {} to {} groups due to data is in the state of data migration", metaGroupMember.getName(), path,
+          holderSlotMap.size());
+      for (Entry<PartitionGroup, Set<Integer>> entry : holderSlotMap.entrySet()) {
+        IPointReader seriesReader = getSeriesReader(entry.getKey(), path,
+            allSensors, timeFilter, valueFilter, context, dataType, ascending, entry.getValue());
+        mergeReader.addReader(seriesReader, 0);
+      }
+
+      return mergeReader;
+    } else {
+      // just local reader is enough
+      SeriesReader seriesReader = getSeriesReader(path, allSensors, dataType, timeFilter,
+          valueFilter, context, dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId(),
+          ascending, requiredSlots);
+      if (seriesReader.isEmpty()) {
+        return null;
+      }
+      return new SeriesRawDataBatchReader(seriesReader);
     }
-    return new SeriesRawDataBatchReader(seriesReader);
   }
 
   /**
@@ -550,24 +586,56 @@ public class ClusterReaderFactory {
    * @throws StorageEngineException
    */
   public IReaderByTimestamp getReaderByTimestamp(PartialPath path, Set<String> allSensors,
-      TSDataType dataType, QueryContext context, DataGroupMember dataGroupMember, boolean ascending)
+      TSDataType dataType, QueryContext context, DataGroupMember dataGroupMember, boolean ascending, Set<Integer> requiredSlots)
       throws StorageEngineException, QueryProcessException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new StorageEngineException(e);
     }
-    SeriesReader seriesReader = getSeriesReader(path, allSensors, dataType,
-        TimeFilter.gtEq(Long.MIN_VALUE), null, context, dataGroupMember.getHeader(),
-        dataGroupMember.getRaftGroupId(), ascending);
+
+    // find the groups that should be queried due to data migration.
+    // when a slot is in the status of PULLING or PULLING_WRITABLE, the read of it should merge result to guarantee integrity.
+    Map<PartitionGroup, Set<Integer>> holderSlotMap = dataGroupMember.getPreviousHolderSlotMap();
     try {
-      if (seriesReader.isEmpty()) {
-        return null;
+      // If requiredSlots is not null, it means that this data group is the previous holder of required slots, which is no need to merge other resource,
+      if (requiredSlots == null && !holderSlotMap.isEmpty()) {
+        // merge remote reader and local reader
+        ManagedMergeReader mergeReader = new ManagedMergeReader(dataType);
+
+        // add local reader
+        IPointReader seriesPointReader = getSeriesPointReader(path, allSensors, dataType,
+            TimeFilter.gtEq(Long.MIN_VALUE), null, context, dataGroupMember, ascending,
+            requiredSlots);
+        mergeReader.addReader(seriesPointReader, 0);
+
+        // add previous holder reader due to in the stage of data migration
+        logger.debug(
+            "{}: Sending data query of {} to {} groups due to data is in the state of data migration",
+            metaGroupMember.getName(), path,
+            holderSlotMap.size());
+        for (Entry<PartitionGroup, Set<Integer>> entry : holderSlotMap.entrySet()) {
+          IPointReader seriesReader = getSeriesReader(entry.getKey(), path,
+              allSensors, TimeFilter.gtEq(Long.MIN_VALUE), null, context, dataType, ascending,
+              entry.getValue());
+          mergeReader.addReader(seriesReader, 0);
+        }
+
+        return new ByTimestampReaderAdapter(mergeReader);
+      } else {
+        // just local reader is enough
+        SeriesReader seriesReader = getSeriesReader(path, allSensors, dataType,
+            TimeFilter.gtEq(Long.MIN_VALUE), null, context, dataGroupMember.getHeader(),
+            dataGroupMember.getRaftGroupId(), ascending, requiredSlots);
+
+        if (seriesReader.isEmpty()) {
+          return null;
+        }
+
+        return new SeriesReaderByTimestamp(seriesReader, ascending);
       }
     } catch (IOException e) {
       throw new QueryProcessException(e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
-    return new SeriesReaderByTimestamp(seriesReader, ascending);
-
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
index 9252bf2..2e6cc06 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
@@ -67,7 +67,7 @@ public class ClusterTimeGenerator extends ServerTimeGenerator {
               null).left.get(0);
       return readerFactory.getSeriesReader(path,
           queryPlan.getAllMeasurementsInDevice(path.getDevice()), dataType,
-          null, filter, context, queryPlan.isAscending());
+          null, filter, context, queryPlan.isAscending(), null);
     } catch (Exception e) {
       throw new IOException(e);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index be56c8d..6d10edb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -667,13 +667,13 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
           entryIterator.remove();
           removeMember(entry.getKey(), dataGroupMember, dataGroupMember.getHeader().equals(node));
         } else {
-          // the group should be updated and pull new slots from the removed node
-          dataGroupMember.removeNode(node, removalResult);
+          // the group should be updated
+          dataGroupMember.removeNode(node);
         }
       }
 
       if (logger.isDebugEnabled()) {
-        logger.debug("Data cluster server: start to handle new groups when removing node {}", node);
+        logger.debug("Data cluster server: start to handle new groups and pulling data when removing node {}", node);
       }
       // if the removed group contains the local node, the local node should join a new group to
       // preserve the replication number
@@ -684,6 +684,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
           DataGroupMember dataGroupMember = dataMemberFactory.create(group, thisNode);
           addDataGroupMember(dataGroupMember);
         }
+        // pull new slots from the removed node
         headerGroupMap.get(header).pullSlots(removalResult);
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index f97d181..4e57fe5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -227,6 +227,11 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
+  public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) {
+    asyncService.collectMigrationStatus(resultHandler);
+  }
+
+  @Override
   public void readFile(String filePath, long offset, int length, int raftId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     asyncService.readFile(filePath, offset, length, raftId, resultHandler);
@@ -294,6 +299,11 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
   }
 
   @Override
+  public ByteBuffer collectMigrationStatus() {
+    return syncService.collectMigrationStatus();
+  }
+
+  @Override
   public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
     return syncService.sendHeartbeat(request);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 4a061ca..2e925c1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -61,7 +61,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
       ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS();
   private static int writeOperationTimeoutMS =
       ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS();
-  private static int syncLeaderMaxWaitMs = 10 * 1000;
+  private static int syncLeaderMaxWaitMs = 20 * 1000;
   private static long heartBeatIntervalMs = 1000L;
 
   ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
index 8a9b710..50f41ea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
@@ -51,6 +51,8 @@ public class Response {
   public static final long RESPONSE_NEW_NODE_PARAMETER_CONFLICT = -10;
   // add/remove node operations should one by one
   public static final long RESPONSE_CHANGE_MEMBERSHIP_CONFLICT = -11;
+  // the data migration of previous add/remove node operations is not finished.
+  public static final long RESPONSE_DATA_MIGRATION_NOT_FINISH = -12;
   // the request is not executed locally anc should be forwarded
   public static final long RESPONSE_NULL = Long.MIN_VALUE;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index 89aa9d5..16bca7a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.cluster.server.heartbeat;
 
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 5691d14..5142796 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -105,7 +105,6 @@ import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -171,7 +170,7 @@ public class DataGroupMember extends RaftMember {
     this.metaGroupMember = metaGroupMember;
     allNodes = nodes;
     setQueryManager(new ClusterQueryManager());
-    slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir());
+    slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir(), getName());
     LogApplier applier = new DataLogApplier(metaGroupMember, this);
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()) {
       applier = new AsyncDataLogApplier(applier, name);
@@ -187,8 +186,6 @@ public class DataGroupMember extends RaftMember {
   /**
    * Start heartbeat, catch-up, pull snapshot services and start all unfinished pull-snapshot-tasks.
    * Calling the method twice does not induce side effects.
-   *
-   * @throws TTransportException
    */
   @Override
   public void start() {
@@ -269,10 +266,10 @@ public class DataGroupMember extends RaftMember {
     if (logger.isDebugEnabled()) {
       logger.debug("{}: start to pre adding node {}", name, node);
     }
-    if (allNodes.contains(node)) {
-      return false;
-    }
     synchronized (allNodes) {
+      if (allNodes.contains(node)) {
+        return false;
+      }
       int insertIndex = -1;
       // find the position to insert the new node, the nodes are ordered by their identifiers
       for (int i = 0; i < allNodes.size() - 1; i++) {
@@ -302,7 +299,6 @@ public class DataGroupMember extends RaftMember {
   /**
    * Try to add a Node into the group to which the member belongs.
    *
-   * @param node
    * @return true if this node should leave the group because of the addition of the node, false
    * otherwise
    */
@@ -310,7 +306,6 @@ public class DataGroupMember extends RaftMember {
     if (logger.isDebugEnabled()) {
       logger.debug("{}: start to add node {}", name, node);
     }
-    syncLeader();
 
     // mark slots that do not belong to this group any more
     Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots()
@@ -321,6 +316,7 @@ public class DataGroupMember extends RaftMember {
     slotManager.save();
 
     synchronized (allNodes) {
+      preAddNode(node);
       if (allNodes.contains(node) && allNodes.size() > config.getReplicationNum()) {
         // remove the last node because the group size is fixed to replication number
         Node removedNode = allNodes.remove(allNodes.size() - 1);
@@ -344,7 +340,6 @@ public class DataGroupMember extends RaftMember {
    * member, a node must have both meta and data logs no older than then local member, or it will be
    * turned down.
    *
-   * @param electionRequest
    * @return Response.RESPONSE_META_LOG_STALE if the meta logs of the elector fall behind
    * Response.RESPONSE_LOG_MISMATCH if the data logs of the elector fall behind Response.SUCCESS if
    * the vote is given to the elector the term of local member if the elector's term is no bigger
@@ -400,8 +395,6 @@ public class DataGroupMember extends RaftMember {
   /**
    * Deserialize and install a snapshot sent by the leader. The type of the snapshot must be
    * currently PartitionedSnapshot with FileSnapshot inside.
-   *
-   * @param request
    */
   public void receiveSnapshot(SendSnapshotRequest request) throws SnapshotInstallationException {
     logger.info("{}: received a snapshot from {} with size {}", name, request.getHeader(),
@@ -418,8 +411,6 @@ public class DataGroupMember extends RaftMember {
 
   /**
    * Send the requested snapshots to the applier node.
-   *
-   * @param request
    */
   public PullSnapshotResp getSnapshot(PullSnapshotRequest request) throws IOException {
     // if the requester pulls the snapshots because the header of the group is removed, then the
@@ -472,7 +463,8 @@ public class DataGroupMember extends RaftMember {
     synchronized (logManager) {
       PullSnapshotResp resp = new PullSnapshotResp();
       Map<Integer, ByteBuffer> resultMap = new HashMap<>();
-      ((PartitionedSnapshotLogManager)logManager).takeSnapshotForSpecificSlots(requiredSlots, false);
+      ((PartitionedSnapshotLogManager) logManager)
+          .takeSnapshotForSpecificSlots(requiredSlots, false);
 
       PartitionedSnapshot<Snapshot> allSnapshot = (PartitionedSnapshot) logManager.getSnapshot();
       for (int requiredSlot : requiredSlots) {
@@ -490,9 +482,6 @@ public class DataGroupMember extends RaftMember {
 
   /**
    * Pull snapshots from the previous holders after newNode joins the cluster.
-   *
-   * @param slots
-   * @param newNode
    */
   public void pullNodeAdditionSnapshots(List<Integer> slots, Node newNode) {
     // group the slots by their owners
@@ -528,14 +517,14 @@ public class DataGroupMember extends RaftMember {
    * Pull FileSnapshots (timeseries schemas and lists of TsFiles) of "nodeSlots" from one of the
    * "prevHolders". The actual pulling will be performed in a separate thread.
    *
-   * @param descriptor
-   * @param snapshotSave set to the corresponding disk file if the task is resumed from disk, or set
-   *                     ot null otherwise
+   * @param snapshotSave set to the corresponding disk file if the task is resumed from disk, or set to null otherwise
    */
   private void pullFileSnapshot(PullSnapshotTaskDescriptor descriptor, File snapshotSave) {
     // If this node is the member of previous holder, it's unnecessary to pull data again
     if (descriptor.getPreviousHolders().contains(thisNode)) {
-      logger.info("{}: {} and other {} don't need to pull because there already has such data locally", name,
+      logger.info(
+          "{}: {} and other {} don't need to pull because there already has such data locally",
+          name,
           descriptor.getSlots().get(0), descriptor.getSlots().size() - 1);
       // inform the previous holders that one member has successfully pulled snapshot directly
       registerPullSnapshotHint(descriptor);
@@ -623,10 +612,6 @@ public class DataGroupMember extends RaftMember {
   /**
    * If the member is the leader, let all members in the group close the specified partition of a
    * storage group, else just return false.
-   *
-   * @param storageGroupName
-   * @param partitionId
-   * @param isSeq
    */
   boolean closePartition(String storageGroupName, long partitionId, boolean isSeq) {
     if (character != NodeCharacter.LEADER) {
@@ -697,7 +682,6 @@ public class DataGroupMember extends RaftMember {
    * process through the raft procedure, otherwise the plan will be forwarded to the leader.
    *
    * @param plan a non-query plan.
-   * @return
    */
   @Override
   public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
@@ -782,7 +766,8 @@ public class DataGroupMember extends RaftMember {
     synchronized (allNodes) {
       if (allNodes.contains(removedNode) && allNodes.size() == config.getReplicationNum()) {
         // update the group if the deleted node was in it
-        PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
+        PartitionGroup newGroup = metaGroupMember.getPartitionTable()
+            .getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
         if (newGroup == null) {
           return;
         }
@@ -799,13 +784,13 @@ public class DataGroupMember extends RaftMember {
    * group.
    */
   @SuppressWarnings("java:S2445") // the reference of allNodes is unchanged
-  public void removeNode(Node removedNode, NodeRemovalResult removalResult) {
+  public void removeNode(Node removedNode) {
     if (logger.isDebugEnabled()) {
       logger.debug("{}: start to remove node {}", name, removedNode);
     }
-    syncLeader();
 
     synchronized (allNodes) {
+      preRemoveNode(removedNode);
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
         allNodes.remove(removedNode);
@@ -819,8 +804,6 @@ public class DataGroupMember extends RaftMember {
         }
       }
     }
-
-    pullSlots(removalResult);
   }
 
   public void pullSlots(NodeRemovalResult removalResult) {
@@ -841,7 +824,8 @@ public class DataGroupMember extends RaftMember {
   @Override
   protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
     long resp = super.appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
-    if (resp == Response.RESPONSE_AGREE && (log instanceof AddNodeLog || log instanceof RemoveNodeLog)) {
+    if (resp == Response.RESPONSE_AGREE && (log instanceof AddNodeLog
+        || log instanceof RemoveNodeLog)) {
       try {
         commitLog(log);
       } catch (LogExecutionException e) {
@@ -852,8 +836,8 @@ public class DataGroupMember extends RaftMember {
   }
 
   /**
-   * When the header of a partition group is removed, it needs to wait all followers to sync data because
-   * there has no new leader.
+   * When the header of a partition group is removed, it needs to wait all followers to sync data
+   * because there has no new leader.
    */
   public void waitFollowersToSync() {
     try {
@@ -887,8 +871,6 @@ public class DataGroupMember extends RaftMember {
   /**
    * Generate a report containing the character, leader, term, last log term, last log index, header
    * and readOnly or not of this member.
-   *
-   * @return
    */
   public DataMemberReport genReport() {
     long prevLastLogIndex = lastReportedLogIndex;
@@ -945,6 +927,23 @@ public class DataGroupMember extends RaftMember {
     pullSnapshotHintService.registerHint(descriptor);
   }
 
+  public Map<PartitionGroup, Set<Integer>> getPreviousHolderSlotMap() {
+    Map<PartitionGroup, Set<Integer>> holderSlotMap = new HashMap<>();
+    RaftNode raftNode = new RaftNode(getHeader(), getRaftGroupId());
+    Map<RaftNode, Map<Integer, PartitionGroup>> previousHolderMap = ((SlotPartitionTable)getMetaGroupMember().getPartitionTable()).getPreviousNodeMap();
+    if (previousHolderMap.containsKey(raftNode)) {
+      for (Entry<Integer, PartitionGroup> entry: previousHolderMap.get(raftNode).entrySet()) {
+        int slot = entry.getKey();
+        PartitionGroup holder = entry.getValue();
+        if (slotManager.checkSlotInDataMigrationStatus(slot)) {
+          holderSlotMap.computeIfAbsent(holder, n -> new HashSet<>()).add(slot);
+        }
+      }
+    }
+    return holderSlotMap;
+  }
+
+
   public LocalQueryExecutor getLocalQueryExecutor() {
     return localQueryExecutor;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index c004b8a..f9d625d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -53,6 +53,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncClientPool;
 import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -619,6 +620,9 @@ public class MetaGroupMember extends RaftMember {
     } else if (resp.getRespNum() == Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) {
       logger.warn(
           "The cluster is performing other change membership operations. Change membership operations should be performed one by one. Please try again later");
+    } else if (resp.getRespNum() == Response.RESPONSE_DATA_MIGRATION_NOT_FINISH) {
+      logger.warn(
+          "The data migration of the previous membership change operation is not finished. Please try again later");
     } else {
       logger
           .warn("Joining the cluster is rejected by {} for response {}", node, resp.getRespNum());
@@ -686,7 +690,7 @@ public class MetaGroupMember extends RaftMember {
     newTable.deserialize(partitionTableBuffer);
     // avoid overwriting current partition table with a previous one
     if (partitionTable != null) {
-      long currIndex = ((SlotPartitionTable) partitionTable).getLastMetaLogIndex();
+      long currIndex = partitionTable.getLastMetaLogIndex();
       long incomingIndex = newTable.getLastMetaLogIndex();
       logger.info("Current partition table index {}, new partition table index {}", currIndex,
           incomingIndex);
@@ -893,6 +897,12 @@ public class MetaGroupMember extends RaftMember {
     if (character != NodeCharacter.LEADER) {
       return false;
     }
+
+    if (!waitDataMigrationEnd()) {
+      response.setRespNum((int)Response.RESPONSE_DATA_MIGRATION_NOT_FINISH);
+      return true;
+    }
+
     boolean nodeExistInPartitionTable = false;
     for (Node node : partitionTable.getAllNodes()) {
       if (node.ip.equals(newNode.ip) && newNode.dataPort == node.dataPort
@@ -974,6 +984,25 @@ public class MetaGroupMember extends RaftMember {
   }
 
   /**
+   * Check if there has data migration due to previous change membership operation.
+   */
+  private boolean waitDataMigrationEnd() throws InterruptedException {
+    // try 5 time
+    int retryTime = 0;
+    while(true) {
+      Map<PartitionGroup, Integer> res = collectAllPartitionMigrationStatus();
+      if (res != null && res.isEmpty()) {
+        return true;
+      }
+      if (++retryTime == 5) {
+        break;
+      }
+      Thread.sleep(20);
+    }
+    return false;
+  }
+
+  /**
    * Process empty log for leader to commit all previous log.
    */
   public void processEmptyContentLog() {
@@ -1912,6 +1941,45 @@ public class MetaGroupMember extends RaftMember {
     }
   }
 
+
+  public Map<PartitionGroup, Integer> collectMigrationStatus(Node node) {
+    try {
+      if (config.isUseAsyncServer()) {
+        return collectMigrationStatusAsync(node);
+      } else {
+        return collectMigrationStatusSync(node);
+      }
+    } catch (TException | InterruptedException e) {
+      logger.warn("Cannot get the status of all nodes", e);
+    }
+    return null;
+  }
+
+  private Map<PartitionGroup, Integer> collectMigrationStatusAsync(Node node)
+      throws TException, InterruptedException {
+    AtomicReference<ByteBuffer> resultRef = new AtomicReference<>();
+    GenericHandler<ByteBuffer> migrationStatusHandler = new GenericHandler<>(node, resultRef);
+    AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(node);
+    if (client == null) {
+      return null;
+    }
+    client.collectMigrationStatus(migrationStatusHandler);
+    synchronized (resultRef) {
+      if (resultRef.get() == null) {
+        resultRef.wait(RaftServer.getConnectionTimeoutInMS());
+      }
+    }
+    return ClusterUtils.deserializeMigrationStatus(resultRef.get());
+  }
+
+  private Map<PartitionGroup, Integer> collectMigrationStatusSync(Node node) throws TException {
+    SyncMetaClient client = (SyncMetaClient) getSyncClient(node);
+    if (client == null) {
+      return null;
+    }
+    return ClusterUtils.deserializeMigrationStatus(client.collectMigrationStatus());
+  }
+
   @TestOnly
   public void setPartitionTable(PartitionTable partitionTable) {
     this.partitionTable = partitionTable;
@@ -1964,6 +2032,10 @@ public class MetaGroupMember extends RaftMember {
       return Response.RESPONSE_CLUSTER_TOO_SMALL;
     }
 
+    if (!waitDataMigrationEnd()) {
+      return Response.RESPONSE_DATA_MIGRATION_NOT_FINISH;
+    }
+
     // find the node to be removed in the node list
     Node target = null;
     synchronized (allNodes) {
@@ -2077,6 +2149,7 @@ public class MetaGroupMember extends RaftMember {
         // the leader is removed, start the next election ASAP
         if (oldNode.equals(leader.get())) {
           setCharacter(NodeCharacter.ELECTOR);
+          setLeader(ClusterConstant.EMPTY_NODE);
           lastHeartbeatReceivedTime = Long.MIN_VALUE;
         }
 
@@ -2156,6 +2229,48 @@ public class MetaGroupMember extends RaftMember {
     return report;
   }
 
+  /**
+   * Collect data migration status of data group in all cluster nodes.
+   * @return key: data group; value: slot num in data migration
+   */
+  public Map<PartitionGroup, Integer> collectAllPartitionMigrationStatus() {
+    Map<PartitionGroup, Integer> res = new HashMap<>();
+    for (Node node: allNodes) {
+      Map<PartitionGroup, Integer> oneNodeRes;
+      if (node.equals(thisNode)) {
+        oneNodeRes = collectMigrationStatus();
+      } else {
+        oneNodeRes = collectMigrationStatus(node);
+      }
+      if (oneNodeRes == null) {
+        return null;
+      }
+      for (Entry<PartitionGroup, Integer> entry: oneNodeRes.entrySet()) {
+        res.put(entry.getKey(), Math.max(res.getOrDefault(entry.getKey(), 0), entry.getValue()));
+      }
+    }
+    return res;
+  }
+
+  /**
+   * Collect data migration status of data group in all cluster nodes.
+   * @return key: data group; value: slot num in data migration
+   */
+  public Map<PartitionGroup, Integer> collectMigrationStatus() {
+    Map<PartitionGroup, Integer> groupSlotMap = new HashMap<>();
+    Map<RaftNode, DataGroupMember> headerMap = getDataClusterServer().getHeaderGroupMap();
+    waitUtil(getPartitionTable().getLastMetaLogIndex());
+    synchronized (headerMap) {
+      for (DataGroupMember dataMember : headerMap.values()) {
+        int num = dataMember.getSlotManager().getSloNumInDataMigration();
+        if (num > 0) {
+          groupSlotMap.put(dataMember.getPartitionGroup(), num);
+        }
+      }
+    }
+    return groupSlotMap;
+  }
+
   @Override
   public void setAllNodes(PartitionGroup allNodes) {
     super.setAllNodes(new PartitionGroup(allNodes));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 700945e..bfdfb70 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -642,6 +642,10 @@ public abstract class RaftMember {
     return allNodes;
   }
 
+  public PartitionGroup getPartitionGroup() {
+    return allNodes;
+  }
+
   public void setAllNodes(PartitionGroup allNodes) {
     this.allNodes = allNodes;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 486274e..3befc79 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.ReaderNotFoundException;
@@ -92,7 +93,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
       AsyncMethodCallback<PullSnapshotResp> resultHandler) {
     // if this node has been set readOnly, then it must have been synchronized with the leader
     // otherwise forward the request to the leader
-    if (dataGroupMember.getLeader() != null) {
+    if (dataGroupMember.getLeader() != null && !ClusterConstant.EMPTY_NODE.equals(dataGroupMember.getLeader())) {
       logger.debug("{} forwarding a pull snapshot request to the leader {}", name,
           dataGroupMember.getLeader());
       AsyncDataClient client = (AsyncDataClient) dataGroupMember
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index bbf3430..c16ad68 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.ReaderNotFoundException;
@@ -88,7 +89,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   private PullSnapshotResp forwardPullSnapshot(PullSnapshotRequest request) throws TException {
     // if this node has been set readOnly, then it must have been synchronized with the leader
     // otherwise forward the request to the leader
-    if (dataGroupMember.getLeader() != null) {
+    if (dataGroupMember.getLeader() != null && !ClusterConstant.EMPTY_NODE.equals(dataGroupMember.getLeader())) {
       logger.debug("{} forwarding a pull snapshot request to the leader {}", name,
           dataGroupMember.getLeader());
       SyncDataClient client =
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index e1c899a..1ee9ab5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.cluster.server.service;
 
 import java.nio.ByteBuffer;
+import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.AddSelfException;
 import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
@@ -81,7 +82,8 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
       return;
     }
 
-    if (member.getCharacter() == NodeCharacter.FOLLOWER && member.getLeader() != null) {
+    if (member.getCharacter() == NodeCharacter.FOLLOWER && member.getLeader() != null
+        && !ClusterConstant.EMPTY_NODE.equals(member.getLeader())) {
       logger.info("Forward the join request of {} to leader {}", node, member.getLeader());
       if (forwardAddNode(node, startUpStatus, resultHandler)) {
         return;
@@ -146,6 +148,11 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
   }
 
   @Override
+  public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) {
+    resultHandler.onComplete(ClusterUtils.serializeMigrationStatus(metaGroupMember.collectMigrationStatus()));
+  }
+
+  @Override
   public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
     long result = Response.RESPONSE_NULL;
     try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index dfb2aee..e738ee3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.server.service;
 
 import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.AddSelfException;
 import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
@@ -78,7 +79,8 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
       return addNodeResponse;
     }
 
-    if (member.getCharacter() == NodeCharacter.FOLLOWER && member.getLeader() != null) {
+    if (member.getCharacter() == NodeCharacter.FOLLOWER && member.getLeader() != null
+        && !ClusterConstant.EMPTY_NODE.equals(member.getLeader())) {
       logger.info("Forward the join request of {} to leader {}", node, member.getLeader());
       addNodeResponse = forwardAddNode(node, startUpStatus);
       if (addNodeResponse != null) {
@@ -140,6 +142,11 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
   }
 
   @Override
+  public ByteBuffer collectMigrationStatus() {
+    return ClusterUtils.serializeMigrationStatus(metaGroupMember.collectMigrationStatus());
+  }
+
+  @Override
   public long removeNode(Node node) throws TException {
     long result;
     try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 6b42d67..afd030b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -19,8 +19,15 @@
 
 package org.apache.iotdb.cluster.utils;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
@@ -332,4 +339,29 @@ public class ClusterUtils {
     return partitionGroup;
   }
 
+  public static ByteBuffer serializeMigrationStatus(Map<PartitionGroup, Integer> migrationStatus) {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+      dataOutputStream.writeInt(migrationStatus.size());
+      for (Entry<PartitionGroup, Integer> entry: migrationStatus.entrySet()) {
+        entry.getKey().serialize(dataOutputStream);
+        dataOutputStream.writeInt(entry.getValue());
+      }
+    } catch (IOException e) {
+      // ignored
+    }
+    return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+  }
+
+  public static Map<PartitionGroup, Integer> deserializeMigrationStatus(ByteBuffer buffer) {
+    Map<PartitionGroup, Integer> migrationStatus = new HashMap<>();
+    int size = buffer.getInt();
+    while (size-- > 0) {
+      PartitionGroup partitionGroup = new PartitionGroup();
+      partitionGroup.deserialize(buffer);
+      migrationStatus.put(partitionGroup, buffer.getInt());
+    }
+    return migrationStatus;
+  }
+
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 09464b4..6b58665 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -18,16 +18,17 @@
  */
 package org.apache.iotdb.cluster.utils.nodetool;
 
+import static org.apache.iotdb.cluster.utils.nodetool.function.NodeToolCmd.BUILDING_CLUSTER_INFO;
+import static org.apache.iotdb.cluster.utils.nodetool.function.NodeToolCmd.META_LEADER_UNKNOWN_INFO;
+
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.collections4.map.MultiKeyMap;
 import org.apache.iotdb.cluster.ClusterMain;
+import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.exception.LeaderUnknownException;
-import org.apache.iotdb.cluster.exception.RedirectMetaLeaderException;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
@@ -38,6 +39,7 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.utils.nodetool.function.NodeToolCmd;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -112,17 +114,19 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
 
   @Override
   public Map<PartitionGroup, Integer> getSlotNumInDataMigration()
-      throws RedirectMetaLeaderException, LeaderUnknownException {
+      throws Exception {
     MetaGroupMember member = getMetaGroupMember();
+    if (member.getPartitionTable() == null) {
+      throw new Exception(BUILDING_CLUSTER_INFO);
+    }
     if (member.getCharacter() != NodeCharacter.LEADER) {
-      if (member.getCharacter() == null) {
-        throw new LeaderUnknownException(member.getAllNodes());
+      if (member.getLeader() == null || member.getLeader().equals(ClusterConstant.EMPTY_NODE)) {
+        throw new Exception(META_LEADER_UNKNOWN_INFO);
       } else {
-        throw new RedirectMetaLeaderException(member.getLeader());
+        throw new Exception(NodeToolCmd.redirectToQueryMetaLeader(member.getLeader()));
       }
-    } else {
-      return Collections.EMPTY_MAP;
     }
+    return member.collectAllPartitionMigrationStatus();
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
index 3c4db07..90af73d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.cluster.utils.nodetool;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.collections4.map.MultiKeyMap;
-import org.apache.iotdb.cluster.exception.LeaderUnknownException;
-import org.apache.iotdb.cluster.exception.RedirectMetaLeaderException;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.NodeCharacter;
@@ -44,8 +42,7 @@ public interface ClusterMonitorMBean {
    * Query how many slots are still PULLING or PULLING_WRITABLE, it means whether user can add/remove a node.
    * @return key: group, value: slot num that still in the process of data migration
    */
-  Map<PartitionGroup, Integer> getSlotNumInDataMigration()
-      throws RedirectMetaLeaderException, LeaderUnknownException;
+  Map<PartitionGroup, Integer> getSlotNumInDataMigration() throws Exception;
 
   /**
    * Get data partition information of input path and time range.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Migration.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Migration.java
index c3c7bad..491bfad 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Migration.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Migration.java
@@ -24,7 +24,6 @@ import io.airlift.airline.Command;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
-import org.apache.iotdb.cluster.exception.RedirectMetaLeaderException;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitorMBean;
 
@@ -36,7 +35,7 @@ public class Migration extends NodeToolCmd {
     try {
       Map<PartitionGroup, Integer> groupSlotsMap = proxy.getSlotNumInDataMigration();
       if (groupSlotsMap == null) {
-        msgPrintln(BUILDING_CLUSTER_INFO);
+        msgPrintln(FAIL_TO_GET_ALL_SLOT_STATUS_INFO);
         return;
       }
       if (groupSlotsMap.isEmpty()) {
@@ -48,13 +47,13 @@ public class Migration extends NodeToolCmd {
         for (Entry<PartitionGroup, Integer> entry : groupSlotsMap.entrySet()) {
           PartitionGroup group = entry.getKey();
           msgPrintln(String
-              .format("%-20d->%30s", entry.getValue(), group));
+              .format("%-20d->%30s", entry.getValue(), partitionGroupToString(group)));
         }
       }
-    } catch (RedirectMetaLeaderException e) {
-      msgPrintln(redirectToQueryMetaLeader(e.getMetaLeader()));
     } catch (LeaderUnknownException e) {
       msgPrintln(META_LEADER_UNKNOWN_INFO);
+    } catch (Exception e) {
+      msgPrintln(e.getMessage());
     }
   }
 }
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/NodeToolCmd.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/NodeToolCmd.java
index 0043c15..fc1d644 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/NodeToolCmd.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/NodeToolCmd.java
@@ -61,9 +61,11 @@ public abstract class NodeToolCmd implements Runnable {
 
   private static final String JMX_URL_FORMAT = "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi";
 
-  static final String BUILDING_CLUSTER_INFO = "The cluster is being created.";
+  public static final String BUILDING_CLUSTER_INFO = "The cluster is being created.";
 
-  static final String META_LEADER_UNKNOWN_INFO = "Meta group leader is unknown, please try again later.";
+  public static final String META_LEADER_UNKNOWN_INFO = "Meta group leader is unknown, please try again later.";
+
+  static final String FAIL_TO_GET_ALL_SLOT_STATUS_INFO = "Fail to get all slot status, please check node status and try again later.";
 
   @Override
   public void run() {
@@ -100,20 +102,20 @@ public abstract class NodeToolCmd implements Runnable {
     return mbsc;
   }
 
-  public String nodeCharacterToString(Node node, NodeCharacter character) {
+  public static String nodeCharacterToString(Node node, NodeCharacter character) {
     return String.format("%s (%s)", nodeToString(node), character);
   }
 
-  public String nodeToString(Node node) {
+  public static String nodeToString(Node node) {
     return String.format("%s:%d:%d:%d", node.getIp(), node.getMetaPort(), node.getDataPort(),
         node.getClientPort());
   }
 
-  public String redirectToQueryMetaLeader(Node node) {
+  public static String redirectToQueryMetaLeader(Node node) {
     return String.format("Please redirect to query meta group leader %s", nodeToString(node));
   }
 
-  public String partitionGroupToString(PartitionGroup group) {
+  public static String partitionGroupToString(PartitionGroup group) {
     StringBuilder stringBuilder = new StringBuilder("[");
     if (!group.isEmpty()) {
       stringBuilder.append(nodeToString(group.get(0)));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Partition.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Partition.java
index ef2a3b1..92b7546 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Partition.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Partition.java
@@ -78,7 +78,7 @@ public class Partition extends NodeToolCmd {
     } else {
       timeRangeMapRaftGroup.forEach(
           (timeRange, raftGroup) -> msgPrintln(String.format("DATA<%s, %d, %d>\t->\t%s", path,
-              (long) timeRange.getKey(0), (long) timeRange.getKey(1),
+              timeRange.getKey(0), timeRange.getKey(1),
               partitionGroupToString(raftGroup))));
     }
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
index 6f182d2..8ebca64 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
@@ -33,7 +33,7 @@ public class TestDataGroupMember extends DataGroupMember {
   public TestDataGroupMember() {
     super();
     setQueryManager(new ClusterQueryManager());
-    this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null);
+    this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null, "");
     this.allNodes = new PartitionGroup(Collections.singletonList(TestUtils.getNode(0)));
   }
 
@@ -41,7 +41,7 @@ public class TestDataGroupMember extends DataGroupMember {
     super();
     this.thisNode = thisNode;
     this.allNodes = allNodes;
-    this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null);
+    this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null, "");
     setQueryManager(new ClusterQueryManager());
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotManagerTest.java
index 218db7f..83912bc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotManagerTest.java
@@ -52,7 +52,7 @@ public class SlotManagerTest {
     ClusterDescriptor.getInstance().getConfig().setEnableRaftLogPersistence(true);
     ClusterDescriptor.getInstance().getConfig().setReplicationNum(2);
     int testSlotNum = 100;
-    slotManager = new SlotManager(testSlotNum, null);
+    slotManager = new SlotManager(testSlotNum, null, "");
   }
 
   @Test
@@ -118,7 +118,7 @@ public class SlotManagerTest {
     File dummyMemberDir = new File("test");
     dummyMemberDir.mkdirs();
     try {
-      slotManager = new SlotManager(5, dummyMemberDir.getPath());
+      slotManager = new SlotManager(5, dummyMemberDir.getPath(), "");
       slotManager.setToNull(0);
       slotManager.setToPulling(1, TestUtils.getNode(1));
       slotManager.setToPulling(2, TestUtils.getNode(2));
@@ -130,7 +130,7 @@ public class SlotManagerTest {
         slotManager.sentOneReplication(4);
       }
 
-      SlotManager recovered = new SlotManager(5, dummyMemberDir.getPath());
+      SlotManager recovered = new SlotManager(5, dummyMemberDir.getPath(), "");
       assertEquals(NULL, recovered.getStatus(0));
       assertEquals(PULLING, recovered.getStatus(1));
       assertEquals(PULLING_WRITABLE, recovered.getStatus(2));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 4653822..08f96d9 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -934,7 +934,7 @@ public class DataGroupMemberTest extends MemberTest {
 
     try {
       dataGroupMember.preRemoveNode(nodeToRemove);
-      dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult);
+      dataGroupMember.removeNode(nodeToRemove);
 
       assertEquals(NodeCharacter.ELECTOR, dataGroupMember.getCharacter());
       assertEquals(Long.MIN_VALUE, dataGroupMember.getLastHeartbeatReceivedTime());
@@ -965,7 +965,7 @@ public class DataGroupMemberTest extends MemberTest {
 
     try {
       dataGroupMember.preRemoveNode(nodeToRemove);
-      dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult);
+      dataGroupMember.removeNode(nodeToRemove);
 
       assertEquals(0, dataGroupMember.getLastHeartbeatReceivedTime());
       assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30)));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index af143c4..2402ff8 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -864,7 +864,7 @@ public class MetaGroupMemberTest extends MemberTest {
         IReaderByTimestamp readerByTimestamp = readerFactory
             .getReaderByTimestamp(new PartialPath(TestUtils.getTestSeries(i, 0)),
                 Collections.singleton(TestUtils.getTestMeasurement(0)), TSDataType.DOUBLE,
-                context, true);
+                context, true, null);
         for (int j = 0; j < 10; j++) {
           assertEquals(j * 1.0, (double) readerByTimestamp.getValueInTimestamp(j), 0.00001);
         }
@@ -913,7 +913,7 @@ public class MetaGroupMemberTest extends MemberTest {
             .getSeriesReader(new PartialPath(TestUtils.getTestSeries(i, 0)),
                 Collections.singleton(TestUtils.getTestMeasurement(0)), TSDataType.DOUBLE,
                 TimeFilter.gtEq(5),
-                ValueFilter.ltEq(8.0), context, true);
+                ValueFilter.ltEq(8.0), context, true, null);
         assertTrue(reader.hasNextBatch());
         BatchData batchData = reader.nextBatch();
         for (int j = 5; j < 9; j++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2349228..1180388 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2580,7 +2580,7 @@ public class StorageGroupProcessor {
       Collection<TsFileResource> existingFiles) {
     for (TsFileResource resource : existingFiles) {
       if (resource.getTimePartition() == partitionNum
-          && resource.getMaxPlanIndex() >= tsFileResource.getMaxPlanIndex()) {
+          && resource.getMaxPlanIndex() > tsFileResource.getMaxPlanIndex()) {
         logger.info("{} is covered by a closed file {}: [{}, {}] [{}, {}]", tsFileResource,
             resource, tsFileResource.minPlanIndex, tsFileResource.maxPlanIndex,
             resource.minPlanIndex, resource.maxPlanIndex);
@@ -2596,7 +2596,7 @@ public class StorageGroupProcessor {
       if (workingProcesssor.getTimeRangeId() == partitionNum) {
         TsFileResource workResource = workingProcesssor.getTsFileResource();
         boolean isCovered =
-            workResource.getMaxPlanIndex() >= tsFileResource
+            workResource.getMaxPlanIndex() > tsFileResource
                 .getMaxPlanIndex();
         if (isCovered) {
           logger.info("{} is covered by a working file {}: [{}, {}] [{}, {}]", tsFileResource,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 82f31f3..926c559 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -747,12 +747,12 @@ public class TsFileResource {
   }
 
   public boolean isPlanIndexOverlap(TsFileResource another) {
-    return another.maxPlanIndex >= this.minPlanIndex &&
-        another.minPlanIndex <= this.maxPlanIndex;
+    return another.maxPlanIndex > this.minPlanIndex &&
+        another.minPlanIndex < this.maxPlanIndex;
   }
 
   public boolean isPlanRangeCovers(TsFileResource another) {
-    return this.minPlanIndex <= another.minPlanIndex && another.maxPlanIndex <= this.maxPlanIndex;
+    return this.minPlanIndex < another.minPlanIndex && another.maxPlanIndex < this.maxPlanIndex;
   }
 
   public void setMaxPlanIndex(long maxPlanIndex) {
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 6e6703d..53f5ae4 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -196,6 +196,7 @@ struct SingleSeriesQueryRequest {
   10: required bool ascending
   11: required int fetchSize
   12: required int deduplicatedPathNum
+  13: required set<int> requiredSlots
 }
 
 struct PreviousFillRequest {
@@ -471,13 +472,21 @@ service TSMetaService extends RaftService {
 
   TNodeStatus queryNodeStatus()
 
+  /**
+  * Check whether this node is alive.
+  **/
   Node checkAlive()
 
   /**
+  * Collect the info of data migration, which will decide that if users can change membership.
+  **/
+  binary collectMigrationStatus()
+
+  /**
   * When a node starts, it send handshakes to all other nodes so they know the node is alive
   * again. Notice that heartbeats exists only between leaders and followers, so coordinators
   * cannot know when another node resumes, and handshakes are mainly used to update node status
   * on coordinator side.
   **/
-  void handshake(Node sender);
+  void handshake(Node sender)
 }