You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/03/02 14:31:27 UTC

[iotdb] branch cluster_scalability updated: This commit fix following issues: 1. fix a bug of still generate partition groups of a removed node in PartitionTable localGroups: to handle -1. 2. fix a null pointer when get storage processor when handle OnSnapshotApplied. 3. remove tsfile from TsFileManager when remove local data. 4. Wrong index assignment in Physical plan.

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_scalability by this push:
     new 5c4ed2d  This commit fix following issues: 1. fix a bug of still generate partition groups of a removed node in PartitionTable localGroups: to handle -1. 2. fix a null pointer when get storage processor when handle OnSnapshotApplied. 3. remove tsfile from TsFileManager when remove local data. 4. Wrong index assignment in Physical plan.
5c4ed2d is described below

commit 5c4ed2dd6a3a97078085ca866f923ce41d5ceeef
Author: lta <li...@163.com>
AuthorDate: Tue Mar 2 22:26:56 2021 +0800

    This commit fix following issues:
    1. fix a bug of still generate partition groups of a removed node in PartitionTable localGroups: to handle -1.
    2. fix a null pointer when get storage processor when handle OnSnapshotApplied.
    3. remove tsfile from TsFileManager when remove local data.
    4. Wrong index assignment in Physical plan.
---
 .../cluster/client/sync/SyncClientAdaptor.java     | 12 +++-
 .../manage/FilePartitionedSnapshotLogManager.java  | 36 +++++++----
 .../log/manage/PartitionedSnapshotLogManager.java  | 11 +++-
 .../iotdb/cluster/log/snapshot/FileSnapshot.java   |  6 +-
 .../cluster/log/snapshot/PullSnapshotTask.java     | 62 +++++++++----------
 .../iotdb/cluster/partition/PartitionTable.java    |  5 ++
 .../iotdb/cluster/partition/slot/SlotManager.java  | 44 +++++++++++---
 .../cluster/partition/slot/SlotPartitionTable.java | 31 ++++++----
 .../cluster/server/PullSnapshotHintService.java    |  2 +-
 .../cluster/server/member/DataGroupMember.java     | 69 ++++++++++++----------
 .../cluster/server/member/MetaGroupMember.java     |  8 +--
 .../iotdb/cluster/server/member/RaftMember.java    | 41 ++++++++-----
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |  5 ++
 .../cluster/server/member/MetaGroupMemberTest.java |  6 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  4 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 14 +++--
 16 files changed, 219 insertions(+), 137 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index b028766..ce2b048 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -425,7 +425,7 @@ public class SyncClientAdaptor {
         client.getNode(), slots, factory));
     synchronized (snapshotRef) {
       if (snapshotRef.get() == null) {
-        snapshotRef.wait(RaftServer.getReadOperationTimeoutMS());
+        snapshotRef.wait();
       }
     }
     return snapshotRef.get();
@@ -447,10 +447,16 @@ public class SyncClientAdaptor {
 
   public static boolean onSnapshotApplied(AsyncDataClient client, Node header, int raftId, List<Integer> slots)
       throws TException, InterruptedException {
-    AtomicReference<Boolean> result = new AtomicReference<>(false);
+    AtomicReference<Boolean> result = new AtomicReference<>();
     GenericHandler<Boolean> handler = new GenericHandler<>(client.getNode(), result);
 
     client.onSnapshotApplied(header, raftId, slots, handler);
-    return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
+
+    synchronized (result) {
+      if (result.get() == null) {
+        result.wait();
+      }
+    }
+    return result.get();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 682da96..4c3da5e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -26,7 +26,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-
+import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.EntryCompactedException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
@@ -37,6 +37,7 @@ import org.apache.iotdb.cluster.log.snapshot.FileSnapshot.Factory;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -64,7 +65,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
   /**
    * send FlushPlan to all nodes in one dataGroup
    */
-  private void syncFlushAllProcessor() {
+  private void syncFlushAllProcessor(List<Integer> requiredSlots) {
     logger.info("{}: Start flush all storage group processor in one data group", getName());
     Map<String, List<Pair<Long, Boolean>>> storageGroupPartitions = StorageEngine.getInstance()
         .getWorkingStorageGroupPartitions();
@@ -72,12 +73,18 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
       logger.info("{}: no need to flush processor", getName());
       return;
     }
-    dataGroupMember.flushFileWhenDoSnapshot(storageGroupPartitions);
+    dataGroupMember.flushFileWhenDoSnapshot(storageGroupPartitions, requiredSlots);
   }
 
   @Override
   @SuppressWarnings("java:S1135") // ignore todos
   public void takeSnapshot() throws IOException {
+    takeSnapshotForSpecificSlots(((SlotPartitionTable) partitionTable)
+        .getNodeSlots(new RaftNode(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId())));
+  }
+
+  @Override
+  public void takeSnapshotForSpecificSlots(List<Integer> requiredSlots) throws IOException {
     try {
       logger.info("{}: Taking snapshots, flushing IoTDB", getName());
       // record current commit index and prevent further logs from being applied, so the
@@ -86,14 +93,14 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
       // wait until all logs before BlockAppliedCommitIndex are applied
       super.takeSnapshot();
       // flush data to disk so that the disk files will represent a complete state
-      syncFlushAllProcessor();
+      syncFlushAllProcessor(requiredSlots);
       logger.info("{}: Taking snapshots, IoTDB is flushed", getName());
       // TODO-cluster https://issues.apache.org/jira/browse/IOTDB-820
       synchronized (this) {
-        collectTimeseriesSchemas();
+        collectTimeseriesSchemas(requiredSlots);
         snapshotLastLogIndex = getBlockAppliedCommitIndex();
         snapshotLastLogTerm = getTerm(snapshotLastLogIndex);
-        collectTsFilesAndFillTimeseriesSchemas();
+        collectTsFilesAndFillTimeseriesSchemas(requiredSlots);
         logger.info("{}: Snapshot is taken", getName());
       }
     } catch (EntryCompactedException e) {
@@ -112,9 +119,9 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
    *
    * @throws IOException
    */
-  private void collectTsFilesAndFillTimeseriesSchemas() throws IOException {
+  private void collectTsFilesAndFillTimeseriesSchemas(List<Integer> requiredSlots) throws IOException {
     // 1.collect tsfile
-    collectTsFiles();
+    collectTsFiles(requiredSlots);
 
     // 2.register the measurement
     for (Map.Entry<Integer, Collection<TimeseriesSchema>> entry : slotTimeseries.entrySet()) {
@@ -127,7 +134,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
     }
   }
 
-  private void collectTsFiles() throws IOException {
+  private void collectTsFiles(List<Integer> requiredSlots) throws IOException {
     slotSnapshots.clear();
     Map<PartialPath, Map<Long, List<TsFileResource>>> allClosedStorageGroupTsFile = StorageEngine
         .getInstance().getAllClosedStorageGroupTsFile();
@@ -140,13 +147,13 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
       for (Entry<Long, List<TsFileResource>> storageGroupFiles : storageGroupsFiles.entrySet()) {
         Long partitionNum = storageGroupFiles.getKey();
         List<TsFileResource> resourceList = storageGroupFiles.getValue();
-        if (!collectTsFiles(partitionNum, resourceList, storageGroupName, createdHardlinks)) {
+        if (!collectTsFiles(partitionNum, resourceList, storageGroupName, createdHardlinks, requiredSlots)) {
           // some file is deleted during the collecting, clean created hardlinks and restart
           // from the beginning
           for (TsFileResource createdHardlink : createdHardlinks) {
             createdHardlink.remove();
           }
-          collectTsFiles();
+          collectTsFiles(requiredSlots);
           return;
         }
       }
@@ -165,9 +172,12 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
    * @throws IOException
    */
   private boolean collectTsFiles(Long partitionNum, List<TsFileResource> resourceList,
-      PartialPath storageGroupName, List<TsFileResource> createdHardlinks) throws IOException {
+      PartialPath storageGroupName, List<TsFileResource> createdHardlinks, List<Integer> requiredSlots) throws IOException {
     int slotNum = SlotPartitionTable.getSlotStrategy().calculateSlotByPartitionNum(storageGroupName.getFullPath(),
-        partitionNum, ((SlotPartitionTable) partitionTable).getTotalSlotNumbers());
+        partitionNum, ClusterConstant.SLOT_NUM);
+    if (!requiredSlots.contains(slotNum)) {
+      return true;
+    }
     FileSnapshot snapshot = slotSnapshots.computeIfAbsent(slotNum,
         s -> new FileSnapshot());
     for (TsFileResource tsFileResource : resourceList) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index 1ca26a2..0cf6c77 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -19,12 +19,14 @@
 
 package org.apache.iotdb.cluster.log.manage;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
@@ -68,6 +70,8 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
     this.dataGroupMember = dataGroupMember;
   }
 
+  public void takeSnapshotForSpecificSlots(List<Integer> requiredSlots) throws IOException {}
+
   @Override
   public Snapshot getSnapshot(long minIndex) {
     // copy snapshots
@@ -83,14 +87,17 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
     }
   }
 
-  void collectTimeseriesSchemas() {
+  void collectTimeseriesSchemas(List<Integer> requiredSlots) {
     slotTimeseries.clear();
     List<StorageGroupMNode> allSgNodes = IoTDB.metaManager.getAllStorageGroupNodes();
     for (MNode sgNode : allSgNodes) {
       String storageGroupName = sgNode.getFullPath();
       int slot = SlotPartitionTable.getSlotStrategy().calculateSlotByTime(storageGroupName, 0,
-          ((SlotPartitionTable) partitionTable).getTotalSlotNumbers());
+          ClusterConstant.SLOT_NUM);
 
+      if (!requiredSlots.contains(slot)) {
+        continue;
+      }
       Collection<TimeseriesSchema> schemas = slotTimeseries.computeIfAbsent(slot,
           s -> new HashSet<>());
       IoTDB.metaManager.collectTimeseriesSchema(sgNode, schemas);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index 9e159b8..25a0c0f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -235,10 +235,11 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         SlotStatus status = slotManager.getStatus(slot);
         if (status == SlotStatus.PULLING) {
           // as schemas are set, writes can proceed
-          slotManager.setToPullingWritable(slot);
+          slotManager.setToPullingWritable(slot, false);
           logger.debug("{}: slot {} is now pulling writable", name, slot);
         }
       }
+      slotManager.save();
 
       for (Entry<Integer, FileSnapshot> integerSnapshotEntry : snapshotMap.entrySet()) {
         Integer slot = integerSnapshotEntry.getKey();
@@ -249,6 +250,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
           throw new SnapshotInstallationException(e);
         }
       }
+      slotManager.save();
     }
 
     private void installFileSnapshotSchema(FileSnapshot snapshot) {
@@ -280,7 +282,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         }
       }
       // all files are loaded, the slot can be queried without accessing the previous holder
-      slotManager.setToNull(slot);
+      slotManager.setToNull(slot, false);
       logger.info("{}: slot {} is ready", name, slot);
     }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 752e3e3..1dc3247 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -100,10 +100,11 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
       List<Integer> noSnapshotSlots = new ArrayList<>();
       for (Integer slot : descriptor.getSlots()) {
         if (!result.containsKey(slot)) {
-          newMember.getSlotManager().setToNull(slot);
+          newMember.getSlotManager().setToNull(slot, false);
           noSnapshotSlots.add(slot);
         }
       }
+      newMember.getSlotManager().save();
       if (!noSnapshotSlots.isEmpty() && logger.isInfoEnabled()) {
         logger.info("{}: {} and other {} slots do not have snapshot", newMember.getName(),
             noSnapshotSlots.get(0), noSnapshotSlots.size() - 1);
@@ -164,40 +165,31 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
 
   @Override
   public Void call() {
-    // If this node is the member of previous holder, it's unnecessary to pull data again
-    if (descriptor.getPreviousHolders().contains(newMember.getThisNode())) {
-      for (Integer slot: descriptor.getSlots()) {
-        newMember.getSlotManager().setToNull(slot);
-      }
-      // inform the previous holders that one member has successfully pulled snapshot directly
-      newMember.registerPullSnapshotHint(descriptor);
-    } else {
-      request = new PullSnapshotRequest();
-      request.setHeader(descriptor.getPreviousHolders().getHeader());
-      request.setRaftId(descriptor.getPreviousHolders().getId());
-      request.setRequiredSlots(descriptor.getSlots());
-      request.setRequireReadOnly(descriptor.isRequireReadOnly());
-
-      boolean finished = false;
-      int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()) - 1;
-      while (!finished) {
-        try {
-          // sequentially pick up a node that may have this slot
-          nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
-          finished = pullSnapshot(nodeIndex);
-          if (!finished) {
-            Thread
-                .sleep(
-                    ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs());
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          finished = true;
-        } catch (TException e) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(),
-                descriptor.getPreviousHolders().get(nodeIndex), e);
-          }
+    request = new PullSnapshotRequest();
+    request.setHeader(descriptor.getPreviousHolders().getHeader());
+    request.setRaftId(descriptor.getPreviousHolders().getId());
+    request.setRequiredSlots(descriptor.getSlots());
+    request.setRequireReadOnly(descriptor.isRequireReadOnly());
+
+    boolean finished = false;
+    int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()) - 1;
+    while (!finished) {
+      try {
+        // sequentially pick up a node that may have this slot
+        nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
+        finished = pullSnapshot(nodeIndex);
+        if (!finished) {
+          Thread
+              .sleep(
+                  ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs());
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        finished = true;
+      } catch (TException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(),
+              descriptor.getPreviousHolders().get(nodeIndex), e);
         }
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 97d88b7..67fb4f0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -113,6 +113,11 @@ public interface PartitionTable {
   boolean judgeHoldSlot(Node node, int slot);
 
   /**
+   * get the last meta log index that modifies the partition table
+   */
+  long getLastMetaLogIndex();
+
+  /**
    * @param path      can be an incomplete path (but should contain a storage group name) e.g., if
    *                  "root.sg" is a storage group, then path can not be "root".
    * @param timestamp
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
index 7165145..1104dc8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
@@ -145,11 +145,18 @@ public class SlotManager {
    * @param source
    */
   public void setToPulling(int slotId, Node source) {
-    SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
+    setToPulling(slotId, source, true);
+  }
+
+  public void setToPulling(int slotId, Node source, boolean needSave) {
+      SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
     synchronized (slotDescriptor) {
       slotDescriptor.slotStatus = SlotStatus.PULLING;
       slotDescriptor.source = source;
     }
+    if (needSave) {
+      save();
+    }
   }
 
   /**
@@ -157,12 +164,18 @@ public class SlotManager {
    * @param slotId
    */
   public void setToPullingWritable(int slotId) {
+    setToPullingWritable(slotId, true);
+  }
+
+  public void setToPullingWritable(int slotId, boolean needSave) {
     SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
     synchronized (slotDescriptor) {
       slotDescriptor.slotStatus = SlotStatus.PULLING_WRITABLE;
       slotDescriptor.notifyAll();
     }
-    save();
+    if (needSave) {
+      save();
+    }
   }
 
   /**
@@ -170,16 +183,26 @@ public class SlotManager {
    * @param slotId
    */
   public void setToNull(int slotId) {
+    setToNull(slotId, true);
+  }
+
+  public void setToNull(int slotId, boolean needSave) {
     SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
     synchronized (slotDescriptor) {
       slotDescriptor.slotStatus = SlotStatus.NULL;
       slotDescriptor.source = null;
       slotDescriptor.notifyAll();
     }
-    save();
+    if (needSave) {
+      save();
+    }
   }
 
   public void setToSending(int slotId) {
+    setToSending(slotId, true);
+  }
+
+  public void setToSending(int slotId, boolean needSave) {
     // only NULL slots can be set to SENDING
     waitSlot(slotId);
     SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
@@ -187,7 +210,9 @@ public class SlotManager {
       slotDescriptor.slotStatus = SlotStatus.SENDING;
       slotDescriptor.snapshotReceivedCount = 0;
     }
-    save();
+    if (needSave) {
+      save();
+    }
   }
 
   private void setToSent(int slotId) {
@@ -195,7 +220,6 @@ public class SlotManager {
     synchronized (slotDescriptor) {
       slotDescriptor.slotStatus = SlotStatus.SENT;
     }
-    save();
   }
 
   /**
@@ -206,13 +230,19 @@ public class SlotManager {
    * invocation).
    */
   public int sentOneReplication(int slotId) {
+    return sentOneReplication(slotId, true);
+  }
+
+  public int sentOneReplication(int slotId, boolean needSave) {
     SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
     synchronized (slotDescriptor) {
       int sentReplicaNum = ++slotDescriptor.snapshotReceivedCount;
       if (sentReplicaNum >= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) {
         setToSent(slotId);
       }
-      save();
+      if (needSave) {
+        save();
+      }
       return sentReplicaNum;
     }
   }
@@ -243,7 +273,7 @@ public class SlotManager {
     }
   }
 
-  private synchronized void save() {
+  public synchronized void save() {
     if (slotFilePath == null) {
       return;
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index bd764b2..3dea23e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -79,7 +79,6 @@ public class SlotPartitionTable implements PartitionTable {
   private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM];
   // the nodes that each slot belongs to before a new node is added, used for the new node to
   // find the data source
-  // find the data source
   private Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = new ConcurrentHashMap<>();
 
   private NodeRemovalResult nodeRemovalResult = new SlotNodeRemovalResult();
@@ -92,8 +91,8 @@ public class SlotPartitionTable implements PartitionTable {
 
   private List<PartitionGroup> globalGroups;
 
-  // last log index that modifies the partition table
-  private long lastLogIndex = -1;
+  // the last meta log index that modifies the partition table
+  private long lastMetaLogIndex = -1;
 
   private SlotBalancer slotBalancer = new DefaultSlotBalancer(this);
 
@@ -178,6 +177,10 @@ public class SlotPartitionTable implements PartitionTable {
     List<PartitionGroup> ret = new ArrayList<>();
 
     int nodeIndex = nodeRing.indexOf(node);
+    if (nodeIndex == -1) {
+      logger.info("PartitionGroups is empty due to this node has been removed from the cluster!");
+      return ret;
+    }
     for (int i = 0; i < replicationNum; i++) {
       // the previous replicationNum nodes (including the node itself) are the headers of the
       // groups the node is in
@@ -312,6 +315,7 @@ public class SlotPartitionTable implements PartitionTable {
     // the slots movement is only done logically, the new node itself will pull data from the
     // old node
     slotBalancer.moveSlotsToNew(node, oldRing);
+    this.nodeRemovalResult = new SlotNodeRemovalResult();
 
   }
 
@@ -343,7 +347,7 @@ public class SlotPartitionTable implements PartitionTable {
     DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
 
     try {
-      dataOutputStream.writeLong(lastLogIndex);
+      dataOutputStream.writeLong(lastMetaLogIndex);
       dataOutputStream.writeInt(totalSlotNumbers);
       dataOutputStream.writeInt(nodeSlotMap.size());
       for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
@@ -376,10 +380,10 @@ public class SlotPartitionTable implements PartitionTable {
     long newLastLogIndex = buffer.getLong();
 
     // judge whether the partition table of byte buffer is out of date
-    if (lastLogIndex != -1 && lastLogIndex >= newLastLogIndex) {
-      return lastLogIndex <= newLastLogIndex;
+    if (lastMetaLogIndex != -1 && lastMetaLogIndex >= newLastLogIndex) {
+      return lastMetaLogIndex <= newLastLogIndex;
     }
-    lastLogIndex = newLastLogIndex;
+    lastMetaLogIndex = newLastLogIndex;
     logger.info("Initializing the partition table from buffer");
     totalSlotNumbers = buffer.getInt();
     int size = buffer.getInt();
@@ -432,7 +436,7 @@ public class SlotPartitionTable implements PartitionTable {
 
   @Override
   public boolean checkChangeMembershipValidity(long targetLogIndex) {
-    return lastLogIndex == targetLogIndex;
+    return lastMetaLogIndex == targetLogIndex;
   }
 
   @Override
@@ -478,7 +482,7 @@ public class SlotPartitionTable implements PartitionTable {
         Objects.equals(nodeSlotMap, that.nodeSlotMap) &&
         Arrays.equals(slotNodes, that.slotNodes) &&
         Objects.equals(previousNodeMap, that.previousNodeMap) &&
-        lastLogIndex == that.lastLogIndex;
+        lastMetaLogIndex == that.lastMetaLogIndex;
   }
 
   @Override
@@ -575,12 +579,13 @@ public class SlotPartitionTable implements PartitionTable {
     return result;
   }
 
-  public synchronized long getLastLogIndex() {
-    return lastLogIndex;
+  @Override
+  public synchronized long getLastMetaLogIndex() {
+    return lastMetaLogIndex;
   }
 
-  public synchronized void setLastLogIndex(long lastLogIndex) {
-    this.lastLogIndex = Math.max(this.lastLogIndex, lastLogIndex);
+  public synchronized void setLastMetaLogIndex(long lastMetaLogIndex) {
+    this.lastMetaLogIndex = Math.max(this.lastMetaLogIndex, lastMetaLogIndex);
   }
 
   public RaftNode[] getSlotNodes() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
index d60ba42..0cc1452 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
@@ -52,7 +52,7 @@ public class PullSnapshotHintService {
 
   public void start() {
     this.service = Executors.newScheduledThreadPool(1);
-    this.service.scheduleAtFixedRate(this::sendHints, 0, 1, TimeUnit.MINUTES);
+    this.service.scheduleAtFixedRate(this::sendHints, 0, 10, TimeUnit.MILLISECONDS);
   }
 
   public void stop() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 2bd460d..a9f39be 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -70,7 +70,6 @@ import org.apache.iotdb.cluster.partition.slot.SlotNodeRemovalResult;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.query.LocalQueryExecutor;
 import org.apache.iotdb.cluster.query.manage.ClusterQueryManager;
-import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
@@ -78,14 +77,14 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.cluster.server.PullSnapshotHintService;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread;
 import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Peer;
-import org.apache.iotdb.cluster.server.PullSnapshotHintService;
-import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
-import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -197,6 +196,7 @@ public class DataGroupMember extends RaftMember {
     heartBeatService.submit(new DataHeartbeatThread(this));
     pullSnapshotService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
     pullSnapshotHintService = new PullSnapshotHintService(this);
+    pullSnapshotHintService.start();
     resumePullSnapshotTasks();
   }
 
@@ -461,7 +461,7 @@ public class DataGroupMember extends RaftMember {
     synchronized (logManager) {
       PullSnapshotResp resp = new PullSnapshotResp();
       Map<Integer, ByteBuffer> resultMap = new HashMap<>();
-      logManager.takeSnapshot();
+      ((PartitionedSnapshotLogManager)logManager).takeSnapshotForSpecificSlots(requiredSlots);
 
       PartitionedSnapshot<Snapshot> allSnapshot = (PartitionedSnapshot) logManager.getSnapshot();
       for (int requiredSlot : requiredSlots) {
@@ -521,6 +521,12 @@ public class DataGroupMember extends RaftMember {
    *                     ot null otherwise
    */
   private void pullFileSnapshot(PullSnapshotTaskDescriptor descriptor, File snapshotSave) {
+    // If this node is the member of previous holder, it's unnecessary to pull data again
+    if (descriptor.getPreviousHolders().contains(thisNode)) {
+      // inform the previous holders that one member has successfully pulled snapshot directly
+      registerPullSnapshotHint(descriptor);
+      return;
+    }
     Iterator<Integer> iterator = descriptor.getSlots().iterator();
     while (iterator.hasNext()) {
       Integer nodeSlot = iterator.next();
@@ -530,9 +536,10 @@ public class DataGroupMember extends RaftMember {
         iterator.remove();
       } else {
         // mark the slot as pulling to control reads and writes of the pulling slot
-        slotManager.setToPulling(nodeSlot, descriptor.getPreviousHolders().getHeader());
+        slotManager.setToPulling(nodeSlot, descriptor.getPreviousHolders().getHeader(), false);
       }
     }
+    slotManager.save();
 
     if (descriptor.getSlots().isEmpty()) {
       return;
@@ -629,7 +636,7 @@ public class DataGroupMember extends RaftMember {
   }
 
   public boolean flushFileWhenDoSnapshot(
-      Map<String, List<Pair<Long, Boolean>>> storageGroupPartitions) {
+      Map<String, List<Pair<Long, Boolean>>> storageGroupPartitions, List<Integer> requiredSlots) {
     if (character != NodeCharacter.LEADER) {
       return false;
     }
@@ -641,11 +648,11 @@ public class DataGroupMember extends RaftMember {
       String storageGroupName = entry.getKey();
       List<Pair<Long, Boolean>> tmpPairList = entry.getValue();
       for (Pair<Long, Boolean> pair : tmpPairList) {
-        long partitionId = pair.left;
-        RaftNode raftNode = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
-            partitionId * StorageEngine.getTimePartitionInterval());
-        DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(raftNode);
-        if (localDataMember.getHeader().equals(thisNode)) {
+        long timestamp = pair.left * StorageEngine.getTimePartitionInterval();
+        int slotId = SlotPartitionTable.getSlotStrategy()
+            .calculateSlotByTime(storageGroupName, timestamp,
+                ClusterConstant.SLOT_NUM);
+        if (requiredSlots.contains(slotId)) {
           localListPair.add(pair);
         }
       }
@@ -670,7 +677,6 @@ public class DataGroupMember extends RaftMember {
     return false;
   }
 
-
   /**
    * Execute a non-query plan. If the member is a leader, a log for the plan will be created and
    * process through the raft procedure, otherwise the plan will be forwarded to the leader.
@@ -727,6 +733,12 @@ public class DataGroupMember extends RaftMember {
       int slot = SlotPartitionTable
           .getSlotStrategy().calculateSlotByPartitionNum(storageGroupName, timePartitionId,
               ClusterConstant.SLOT_NUM);
+      /**
+       * If this slot is just held by different raft groups in the same node, it should keep the data of slot.
+       */
+      if (metaGroupMember.getPartitionTable().judgeHoldSlot(thisNode, slot)) {
+        return false;
+      }
       return slotSet.contains(slot);
     };
     for (PartialPath sg : allStorageGroupNames) {
@@ -738,8 +750,9 @@ public class DataGroupMember extends RaftMember {
       }
     }
     for (Integer slot : slots) {
-      slotManager.setToNull(slot);
+      slotManager.setToNull(slot, false);
     }
+    slotManager.save();
 
     if (logger.isInfoEnabled()) {
       logger.info("{}: data of {} and other {} slots are removed", name, slots.get(0),
@@ -783,15 +796,16 @@ public class DataGroupMember extends RaftMember {
             setLastHeartbeatReceivedTime(Long.MIN_VALUE);
           }
         }
-        List<Integer> slotsToPull = ((SlotNodeRemovalResult) removalResult).getNewSlotOwners()
-            .get(new RaftNode(getHeader(), getRaftGroupId()));
-        if (slotsToPull != null) {
-          // pull the slots that should be taken over
-          PullSnapshotTaskDescriptor taskDescriptor = new PullSnapshotTaskDescriptor(
-              removalResult.getRemovedGroup(getRaftGroupId()),
-              slotsToPull, true);
-          pullFileSnapshot(taskDescriptor, null);
-        }
+      }
+
+      List<Integer> slotsToPull = ((SlotNodeRemovalResult) removalResult).getNewSlotOwners()
+          .get(new RaftNode(getHeader(), getRaftGroupId()));
+      if (slotsToPull != null) {
+        // pull the slots that should be taken over
+        PullSnapshotTaskDescriptor taskDescriptor = new PullSnapshotTaskDescriptor(
+            removalResult.getRemovedGroup(getRaftGroupId()),
+            slotsToPull, true);
+        pullFileSnapshot(taskDescriptor, null);
       }
     }
   }
@@ -860,17 +874,12 @@ public class DataGroupMember extends RaftMember {
   public boolean onSnapshotInstalled(List<Integer> slots) {
     List<Integer> removableSlots = new ArrayList<>();
     for (Integer slot : slots) {
-      /**
-       * If this slot is just held by different raft groups in the same node, it should keep the data of slot.
-       */
-      if (metaGroupMember.getPartitionTable().judgeHoldSlot(thisNode, slot)) {
-        continue;
-      }
-      int sentReplicaNum = slotManager.sentOneReplication(slot);
+      int sentReplicaNum = slotManager.sentOneReplication(slot, false);
       if (sentReplicaNum >= config.getReplicationNum()) {
         removableSlots.add(slot);
       }
     }
+    slotManager.save();
     removeLocalData(removableSlots);
     return true;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 5dddac7..391381c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -678,8 +678,8 @@ public class MetaGroupMember extends RaftMember {
     newTable.deserialize(partitionTableBuffer);
     // avoid overwriting current partition table with a previous one
     if (partitionTable != null) {
-      long currIndex = ((SlotPartitionTable) partitionTable).getLastLogIndex();
-      long incomingIndex = newTable.getLastLogIndex();
+      long currIndex = ((SlotPartitionTable) partitionTable).getLastMetaLogIndex();
+      long incomingIndex = newTable.getLastMetaLogIndex();
       logger.info("Current partition table index {}, new partition table index {}", currIndex,
           incomingIndex);
       if (currIndex >= incomingIndex) {
@@ -924,7 +924,7 @@ public class MetaGroupMember extends RaftMember {
     synchronized (logManager) {
       // update partition table
       partitionTable.addNode(newNode);
-      ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
+      ((SlotPartitionTable) partitionTable).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
 
       AddNodeLog addNodeLog = new AddNodeLog();
       addNodeLog.setPartitionTable(partitionTable.serialize());
@@ -1977,7 +1977,7 @@ public class MetaGroupMember extends RaftMember {
     synchronized (logManager) {
       // update partition table
       partitionTable.removeNode(target);
-      ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
+      ((SlotPartitionTable) partitionTable).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
 
       RemoveNodeLog removeNodeLog = new RemoveNodeLog();
       removeNodeLog.setPartitionTable(partitionTable.serialize());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 4fe3274..83eec55 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -883,21 +883,22 @@ public abstract class RaftMember {
       return StatusUtils.NODE_READ_ONLY;
     }
     long startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
+
     Log log;
-    if (plan instanceof LogPlan) {
-      try {
-        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
-      } catch (UnknownLogTypeException e) {
-        logger.error("Can not parse LogPlan {}", plan, e);
-        return StatusUtils.PARSE_LOG_ERROR;
-      }
-    } else {
-      log = new PhysicalPlanLog();
-      ((PhysicalPlanLog)log).setPlan(plan);
-      plan.setIndex(log.getCurrLogIndex());
-    }
     // assign term and index to the new log and append it
     synchronized (logManager) {
+      if (plan instanceof LogPlan) {
+        try {
+          log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+        } catch (UnknownLogTypeException e) {
+          logger.error("Can not parse LogPlan {}", plan, e);
+          return StatusUtils.PARSE_LOG_ERROR;
+        }
+      } else {
+        log = new PhysicalPlanLog();
+        ((PhysicalPlanLog) log).setPlan(plan);
+        plan.setIndex(logManager.getLastLogIndex() + 1);
+      }
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
@@ -920,20 +921,30 @@ public abstract class RaftMember {
     if (readOnly) {
       return StatusUtils.NODE_READ_ONLY;
     }
-    PhysicalPlanLog log = new PhysicalPlanLog();
     // assign term and index to the new log and append it
     SendLogRequest sendLogRequest;
 
     long startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2
         .getOperationStartTime();
+    Log log;
     synchronized (logManager) {
       Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2
           .calOperationCostTimeFromStart(startTime);
 
+      if (plan instanceof LogPlan) {
+        try {
+          log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+        } catch (UnknownLogTypeException e) {
+          logger.error("Can not parse LogPlan {}", plan, e);
+          return StatusUtils.PARSE_LOG_ERROR;
+        }
+      } else {
+        log = new PhysicalPlanLog();
+        ((PhysicalPlanLog) log).setPlan(plan);
+        plan.setIndex(logManager.getLastLogIndex() + 1);
+      }
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-      log.setPlan(plan);
-      plan.setIndex(log.getCurrLogIndex());
 
       startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
       logManager.append(log);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index 2a6542f..6d79710 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -131,6 +131,11 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
     public boolean judgeHoldSlot(Node node, int slot) {
       return false;
     }
+
+    @Override
+    public long getLastMetaLogIndex() {
+      return 0;
+    }
   };
 
   @Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 1b9b437..af143c4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -51,8 +51,6 @@ import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.common.TestAsyncClient;
 import org.apache.iotdb.cluster.common.TestAsyncDataClient;
 import org.apache.iotdb.cluster.common.TestAsyncMetaClient;
-import org.apache.iotdb.cluster.common.TestLogApplier;
-import org.apache.iotdb.cluster.common.TestLogManager;
 import org.apache.iotdb.cluster.common.TestPartitionedLogManager;
 import org.apache.iotdb.cluster.common.TestSnapshot;
 import org.apache.iotdb.cluster.common.TestUtils;
@@ -64,7 +62,6 @@ import org.apache.iotdb.cluster.exception.EmptyIntervalException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
@@ -143,7 +140,6 @@ import org.apache.thrift.protocol.TCompactProtocol.Factory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class MetaGroupMemberTest extends MemberTest {
@@ -725,7 +721,7 @@ public class MetaGroupMemberTest extends MemberTest {
 
     // 4. prepare the partition table
     SlotPartitionTable partitionTable = (SlotPartitionTable) TestUtils.getPartitionTable(3);
-    partitionTable.setLastLogIndex(0);
+    partitionTable.setLastMetaLogIndex(0);
 
     ByteBuffer beforePartitionTableBuffer = partitionTable.serialize();
     // 5. serialize
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1170733..243319c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -861,7 +861,9 @@ public class StorageEngine implements IService {
 
   public void removePartitions(PartialPath storageGroupPath, TimePartitionFilter filter)
       throws StorageEngineException {
-    processorMap.get(storageGroupPath).removePartitions(filter);
+    if (processorMap.get(storageGroupPath) != null) {
+      processorMap.get(storageGroupPath).removePartitions(filter);
+    }
   }
 
   public Map<PartialPath, VirtualStorageGroupManager> getProcessorMap() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2c6f300..641f113 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2620,12 +2620,12 @@ public class StorageGroupProcessor {
       // abort ongoing merges
       MergeManager.getINSTANCE().abortMerge(logicalStorageGroupName);
       // close all working files that should be removed
-      removePartitions(filter, workSequenceTsFileProcessors.entrySet());
-      removePartitions(filter, workUnsequenceTsFileProcessors.entrySet());
+      removePartitions(filter, workSequenceTsFileProcessors.entrySet(), true);
+      removePartitions(filter, workUnsequenceTsFileProcessors.entrySet(), false);
 
       // remove data files
-      removePartitions(filter, tsFileManagement.getIterator(true));
-      removePartitions(filter, tsFileManagement.getIterator(false));
+      removePartitions(filter, tsFileManagement.getIterator(true), true);
+      removePartitions(filter, tsFileManagement.getIterator(false), false);
 
     } finally {
       insertLock.writeLock().unlock();
@@ -2635,7 +2635,7 @@ public class StorageGroupProcessor {
 
   //may remove the processorEntrys
   private void removePartitions(TimePartitionFilter filter,
-      Set<Entry<Long, TsFileProcessor>> processorEntrys) {
+      Set<Entry<Long, TsFileProcessor>> processorEntrys, boolean sequence) {
     for (Iterator<Entry<Long, TsFileProcessor>> iterator = processorEntrys.iterator();
         iterator.hasNext(); ) {
       Entry<Long, TsFileProcessor> longTsFileProcessorEntry = iterator.next();
@@ -2644,6 +2644,7 @@ public class StorageGroupProcessor {
       if (filter.satisfy(logicalStorageGroupName, partitionId)) {
         processor.syncClose();
         iterator.remove();
+        tsFileManagement.remove(processor.getTsFileResource(), sequence);
         updateLatestFlushTimeToPartition(partitionId, Long.MIN_VALUE);
         logger.debug("{} is removed during deleting partitions",
             processor.getTsFileResource().getTsFilePath());
@@ -2652,12 +2653,13 @@ public class StorageGroupProcessor {
   }
 
   //may remove the iterator's data
-  private void removePartitions(TimePartitionFilter filter, Iterator<TsFileResource> iterator) {
+  private void removePartitions(TimePartitionFilter filter, Iterator<TsFileResource> iterator, boolean sequence) {
     while (iterator.hasNext()) {
       TsFileResource tsFileResource = iterator.next();
       if (filter.satisfy(logicalStorageGroupName, tsFileResource.getTimePartition())) {
         tsFileResource.remove();
         iterator.remove();
+        tsFileManagement.remove(tsFileResource, sequence);
         updateLatestFlushTimeToPartition(tsFileResource.getTimePartition(), Long.MIN_VALUE);
         logger.debug("{} is removed during deleting partitions", tsFileResource.getTsFilePath());
       }