You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/10 08:04:09 UTC

[iotdb] branch master updated: [IOTDB-4784] Control total memory for enabling time partition for storage engine (#7792)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 487b3edc16 [IOTDB-4784] Control total memory for enabling time partition for storage engine (#7792)
487b3edc16 is described below

commit 487b3edc16be602e0648225e7ee07845c23cd7c0
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Thu Nov 10 16:04:02 2022 +0800

    [IOTDB-4784] Control total memory for enabling time partition for storage engine (#7792)
---
 .../resources/conf/iotdb-common.properties         |  11 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  24 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  30 ++-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |   9 +
 .../engine/memtable/AlignedWritableMemChunk.java   |  10 +
 .../memtable/AlignedWritableMemChunkGroup.java     |   5 +
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   2 +
 .../db/engine/memtable/IWritableMemChunk.java      |   6 +-
 .../db/engine/memtable/IWritableMemChunkGroup.java |   2 +
 .../iotdb/db/engine/memtable/WritableMemChunk.java |   9 +-
 .../db/engine/memtable/WritableMemChunkGroup.java  |  25 ++-
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 240 ++++++++++++---------
 .../db/engine/storagegroup/DataRegionInfo.java     |   2 +-
 ...hTimeManager.java => HashLastFlushTimeMap.java} | 235 ++++++++++----------
 ...meManager.java => IDTableLastFlushTimeMap.java} | 179 +++++++--------
 ...lushTimeManager.java => ILastFlushTimeMap.java} |  34 +--
 .../db/engine/storagegroup/TimePartitionInfo.java  |  66 ++++++
 .../engine/storagegroup/TimePartitionManager.java  | 165 ++++++++++++++
 .../db/engine/storagegroup/TsFileManager.java      |   6 +
 .../db/engine/storagegroup/TsFileProcessor.java    |   9 +-
 .../db/engine/storagegroup/TsFileResource.java     |   6 +
 .../db/metadata/idtable/entry/DeviceEntry.java     |  37 +---
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |  31 +--
 .../db/utils/datastructure/AlignedTVList.java      |   4 +-
 .../iotdb/db/utils/datastructure/BinaryTVList.java |  17 +-
 .../db/utils/datastructure/BooleanTVList.java      |  13 +-
 .../iotdb/db/utils/datastructure/DoubleTVList.java |  13 +-
 .../iotdb/db/utils/datastructure/FloatTVList.java  |  13 +-
 .../iotdb/db/utils/datastructure/IntTVList.java    |  13 +-
 .../iotdb/db/utils/datastructure/LongTVList.java   |  13 +-
 .../iotdb/db/utils/datastructure/TVList.java       |  20 +-
 .../db/engine/storagegroup/DataRegionTest.java     |   2 +-
 .../storagegroup/IDTableLastFlushTimeMapTest.java  | 129 +++++++++++
 .../engine/storagegroup/LastFlushTimeMapTest.java  | 223 +++++++++++++++++++
 .../iotdb/db/engine/storagegroup/TTLTest.java      |   5 +-
 .../storagegroup/TimePartitionManagerTest.java     | 129 +++++++++++
 .../engine/storagegroup/TsFileProcessorV2Test.java |  12 +-
 .../db/metadata/idtable/IDTableFlushTimeTest.java  | 199 -----------------
 38 files changed, 1273 insertions(+), 675 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 8122732058..772de865b2 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -130,20 +130,25 @@
 # Datatype: boolean
 # enable_mem_control=true
 
-# Memory Allocation Ratio: Write, Read, Schema, Consensus and Free Memory.
+# Memory Allocation Ratio: StorageEngine, QueryEngine, SchemaEngine, Consensus and Free Memory.
 # The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 6:2:1:1:1
 # If you have high level of writing pressure and low level of reading pressure, please adjust it to for example 6:1:1:1:2
-# write_read_schema_free_memory_proportion=3:3:1:1:2
+# storage_query_schema_consensus_free_memory_proportion=3:3:1:1:2
 
 # Schema Memory Allocation Ratio: SchemaRegion, SchemaCache, PartitionCache and LastCache.
 # The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 6:2:1:1
 # In cluster mode, we recommend 5:3:1:1. In standalone mode, we recommend 8:1:0:1
 # schema_memory_allocate_proportion=5:3:1:1
 
-# Memory allocation ratio in StorageEngine: MemTable, Compaction
+# Memory allocation ratio in StorageEngine: Write, Compaction
 # The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 8:2 , 7:3
 # storage_engine_memory_proportion=8:2
 
+# Memory allocation ratio in writing: Memtable, TimePartitionInfo
+# Memtable is the total memory size of all memtables
+# TimePartitionInfo is the total memory size of last flush time of all data regions
+# write_memory_proportion=19:1
+
 # Max number of concurrent writing time partitions in one storage group
 # This parameter is used to control total memTable number when memory control is disabled
 # The max number of memTable is 4 * concurrent_writing_time_partition * storage group number
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 882caaf97f..04696f99bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -150,15 +150,15 @@ public class IoTDBConfig {
   private double rejectProportion = 0.8;
 
   /** The proportion of write memory for memtable */
-  private double writeProportion = 0.8;
+  private double writeProportionForMemtable = 0.8;
 
   /** The proportion of write memory for compaction */
   private double compactionProportion = 0.2;
 
   /**
    * If memory cost of data region increased more than proportion of {@linkplain
-   * IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain IoTDBConfig#getWriteProportion()},
-   * report to system.
+   * IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain
+   * IoTDBConfig#getWriteProportionForMemtable()}, report to system.
    */
   private double writeMemoryVariationReportProportion = 0.001;
 
@@ -501,6 +501,8 @@ public class IoTDBConfig {
   /** Memory allocated proportion for timeIndex */
   private long allocateMemoryForTimeIndex = allocateMemoryForRead * 200 / 1001;
 
+  /** Memory allocated proportion for time partition info */
+  private long allocateMemoryForTimePartitionInfo = allocateMemoryForStorageEngine * 50 / 1001;
   /**
    * If true, we will estimate each query's possible memory footprint before executing it and deny
    * it if its estimated memory exceeds current free memory
@@ -2108,6 +2110,14 @@ public class IoTDBConfig {
     this.allocateMemoryForTimeIndex = allocateMemoryForTimeIndex;
   }
 
+  public long getAllocateMemoryForTimePartitionInfo() {
+    return allocateMemoryForTimePartitionInfo;
+  }
+
+  public void setAllocateMemoryForTimePartitionInfo(long allocateMemoryForTimePartitionInfo) {
+    this.allocateMemoryForTimePartitionInfo = allocateMemoryForTimePartitionInfo;
+  }
+
   public boolean isEnableQueryMemoryEstimation() {
     return enableQueryMemoryEstimation;
   }
@@ -3269,12 +3279,12 @@ public class IoTDBConfig {
     this.driverTaskExecutionTimeSliceInMs = driverTaskExecutionTimeSliceInMs;
   }
 
-  public double getWriteProportion() {
-    return writeProportion;
+  public double getWriteProportionForMemtable() {
+    return writeProportionForMemtable;
   }
 
-  public void setWriteProportion(double writeProportion) {
-    this.writeProportion = writeProportion;
+  public void setWriteProportionForMemtable(double writeProportionForMemtable) {
+    this.writeProportionForMemtable = writeProportionForMemtable;
   }
 
   public double getCompactionProportion() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ef945bb55b..9222ce9621 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1591,7 +1591,7 @@ public class IoTDBDescriptor {
 
   private void initMemoryAllocate(Properties properties) {
     String memoryAllocateProportion =
-        properties.getProperty("write_read_schema_free_memory_proportion");
+        properties.getProperty("storage_query_schema_consensus_free_memory_proportion");
     if (memoryAllocateProportion != null) {
       String[] proportions = memoryAllocateProportion.split(":");
       int proportionSum = 0;
@@ -1685,14 +1685,34 @@ public class IoTDBDescriptor {
   private void initStorageEngineAllocate(Properties properties) {
     String allocationRatio = properties.getProperty("storage_engine_memory_proportion", "8:2");
     String[] proportions = allocationRatio.split(":");
-    int proportionForMemTable = Integer.parseInt(proportions[0].trim());
+    int proportionForWrite = Integer.parseInt(proportions[0].trim());
     int proportionForCompaction = Integer.parseInt(proportions[1].trim());
-    conf.setWriteProportion(
+
+    double writeProportion =
+        ((double) (proportionForWrite) / (double) (proportionForCompaction + proportionForWrite));
+
+    String allocationRatioForWrite = properties.getProperty("write_memory_proportion", "19:1");
+    proportions = allocationRatioForWrite.split(":");
+    int proportionForMemTable = Integer.parseInt(proportions[0].trim());
+    int proportionForTimePartitionInfo = Integer.parseInt(proportions[1].trim());
+
+    double memtableProportionForWrite =
         ((double) (proportionForMemTable)
-            / (double) (proportionForCompaction + proportionForMemTable)));
+            / (double) (proportionForMemTable + proportionForTimePartitionInfo));
+    Double.parseDouble(properties.getProperty("flush_time_memory_proportion", "0.05"));
+    double timePartitionInfoForWrite =
+        ((double) (proportionForTimePartitionInfo)
+            / (double) (proportionForMemTable + proportionForTimePartitionInfo));
+    conf.setWriteProportionForMemtable(writeProportion * memtableProportionForWrite);
+
+    conf.setAllocateMemoryForTimePartitionInfo(
+        (long)
+            ((writeProportion * timePartitionInfoForWrite)
+                * conf.getAllocateMemoryForStorageEngine()));
+
     conf.setCompactionProportion(
         ((double) (proportionForCompaction)
-            / (double) (proportionForCompaction + proportionForMemTable)));
+            / (double) (proportionForCompaction + proportionForWrite)));
   }
 
   private void initSchemaMemoryAllocate(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index e34b79a173..a7790b7764 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -631,6 +631,15 @@ public abstract class AbstractMemTable implements IMemTable {
     }
   }
 
+  @Override
+  public Map<String, Long> getMaxTime() {
+    Map<String, Long> latestTimeForEachDevice = new HashMap<>();
+    for (Entry<IDeviceID, IWritableMemChunkGroup> entry : memTableMap.entrySet()) {
+      latestTimeForEachDevice.put(entry.getKey().toStringID(), entry.getValue().getMaxTime());
+    }
+    return latestTimeForEachDevice;
+  }
+
   public static class Factory {
     private Factory() {}
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index 845fc8c4b3..dabd3b472d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -237,6 +237,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
     return null;
   }
 
+  @Override
+  public long getMaxTime() {
+    return list.getMaxTime();
+  }
+
   @Override
   public synchronized TVList getSortedTvListForQuery() {
     sortTVList();
@@ -445,6 +450,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
         .getTimestamp();
   }
 
+  @Override
+  public boolean isEmpty() {
+    return list.rowCount() == 0;
+  }
+
   @Override
   public int serializedSize() {
     int size = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
index 25856adf27..878f3b691e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
@@ -135,6 +135,11 @@ public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup {
     return memChunk.getTVList().rowCount();
   }
 
+  @Override
+  public long getMaxTime() {
+    return memChunk.getMaxTime();
+  }
+
   public AlignedWritableMemChunk getAlignedMemChunk() {
     return memChunk;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 9101efdf7c..293d1276b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -169,4 +169,6 @@ public interface IMemTable extends WALEntryValue {
   FlushStatus getFlushStatus();
 
   void setFlushStatus(FlushStatus flushStatus);
+
+  Map<String, Long> getMaxTime();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 5237e53535..6eec2dc559 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -124,8 +124,8 @@ public interface IWritableMemChunk extends WALEntryValue {
     return null;
   }
 
-  default long getMinTime() {
-    return Long.MIN_VALUE;
+  default long getMaxTime() {
+    return Long.MAX_VALUE;
   }
 
   /** @return how many points are deleted */
@@ -140,4 +140,6 @@ public interface IWritableMemChunk extends WALEntryValue {
   long getFirstPoint();
 
   long getLastPoint();
+
+  boolean isEmpty();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
index e3d089f5a7..3ef362cdaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
@@ -52,4 +52,6 @@ public interface IWritableMemChunkGroup extends WALEntryValue {
       PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp);
 
   long getCurrentTVListSize(String measurement);
+
+  long getMaxTime();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index c8bd6144f4..11b6f55d4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -246,8 +246,8 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public long getMinTime() {
-    return list.getMinTime();
+  public long getMaxTime() {
+    return list.getMaxTime();
   }
 
   @Override
@@ -268,6 +268,11 @@ public class WritableMemChunk implements IWritableMemChunk {
         .getTimestamp();
   }
 
+  @Override
+  public boolean isEmpty() {
+    return list.rowCount() == 0;
+  }
+
   @Override
   public int delete(long lowerBound, long upperBound) {
     return list.delete(lowerBound, upperBound);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
index 128a3d6403..e8f2d12e3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
@@ -129,16 +129,28 @@ public class WritableMemChunkGroup implements IWritableMemChunkGroup {
         IWritableMemChunk chunk = entry.getValue();
         if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
           iter.remove();
+          deletedPointsNumber += chunk.count();
+          chunk.release();
+        } else {
+          deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
+          if (chunk.count() == 0) {
+            iter.remove();
+          }
         }
-        deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
       }
     } else {
       IWritableMemChunk chunk = memChunkMap.get(targetMeasurement);
       if (chunk != null) {
         if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
           memChunkMap.remove(targetMeasurement);
+          deletedPointsNumber += chunk.count();
+          chunk.release();
+        } else {
+          deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
+          if (chunk.count() == 0) {
+            memChunkMap.remove(targetMeasurement);
+          }
         }
-        deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
       }
     }
 
@@ -150,6 +162,15 @@ public class WritableMemChunkGroup implements IWritableMemChunkGroup {
     return memChunkMap.get(measurement).getTVList().rowCount();
   }
 
+  @Override
+  public long getMaxTime() {
+    long maxTime = Long.MIN_VALUE;
+    for (IWritableMemChunk memChunk : memChunkMap.values()) {
+      maxTime = Math.max(maxTime, memChunk.getMaxTime());
+    }
+    return maxTime;
+  }
+
   @Override
   public int serializedSize() {
     int size = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index aef9ff8ac5..75b26d34ff 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -254,7 +255,7 @@ public class DataRegion {
   /** flush listeners */
   private List<FlushListener> customFlushListeners = Collections.emptyList();
 
-  private ILastFlushTimeManager lastFlushTimeManager;
+  private ILastFlushTimeMap lastFlushTimeMap;
 
   /**
    * record the insertWriteLock in SG is being hold by which method, it will be empty string if on
@@ -300,9 +301,9 @@ public class DataRegion {
     // if use id table, we use id table flush time manager
     if (config.isEnableIDTable()) {
       idTable = IDTableManager.getInstance().getIDTableDirectly(storageGroupName);
-      lastFlushTimeManager = new IDTableFlushTimeManager(idTable);
+      lastFlushTimeMap = new IDTableLastFlushTimeMap(idTable, tsFileManager);
     } else {
-      lastFlushTimeManager = new LastFlushTimeManager();
+      lastFlushTimeMap = new HashLastFlushTimeMap(tsFileManager);
     }
 
     // recover tsfiles unless consensus protocol is ratis and storage engine is not ready
@@ -364,7 +365,7 @@ public class DataRegion {
 
   private Map<Long, List<TsFileResource>> splitResourcesByPartition(
       List<TsFileResource> resources) {
-    Map<Long, List<TsFileResource>> ret = new HashMap<>();
+    Map<Long, List<TsFileResource>> ret = new TreeMap<>();
     for (TsFileResource resource : resources) {
       ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource);
     }
@@ -487,15 +488,25 @@ public class DataRegion {
       }
       WALRecoverManager.getInstance().getAllDataRegionScannedLatch().countDown();
       // recover sealed TsFiles
-      for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
-        for (TsFileResource tsFileResource : value) {
-          recoverSealedTsFiles(tsFileResource, DataRegionRecoveryContext, true);
+      if (!partitionTmpSeqTsFiles.isEmpty()) {
+        long latestPartitionId =
+            ((TreeMap<Long, List<TsFileResource>>) partitionTmpSeqTsFiles).lastKey();
+        for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) {
+          recoverFilesInPartition(
+              partitionFiles.getKey(),
+              DataRegionRecoveryContext,
+              partitionFiles.getValue(),
+              true,
+              partitionFiles.getKey() == latestPartitionId);
         }
       }
-      for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
-        for (TsFileResource tsFileResource : value) {
-          recoverSealedTsFiles(tsFileResource, DataRegionRecoveryContext, false);
-        }
+      for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpUnseqTsFiles.entrySet()) {
+        recoverFilesInPartition(
+            partitionFiles.getKey(),
+            DataRegionRecoveryContext,
+            partitionFiles.getValue(),
+            false,
+            false);
       }
       // wait until all unsealed TsFiles have been recovered
       for (WALRecoverListener recoverListener : recoverListeners) {
@@ -549,9 +560,8 @@ public class DataRegion {
         long endTime = resource.getEndTime(deviceId);
         endTimeMap.put(deviceId.intern(), endTime);
       }
-      lastFlushTimeManager.setMultiDeviceLastTime(timePartitionId, endTimeMap);
-      lastFlushTimeManager.setMultiDeviceFlushedTime(timePartitionId, endTimeMap);
-      lastFlushTimeManager.setMultiDeviceGlobalFlushedTime(endTimeMap);
+      lastFlushTimeMap.setMultiDeviceFlushedTime(timePartitionId, endTimeMap);
+      lastFlushTimeMap.setMultiDeviceGlobalFlushedTime(endTimeMap);
     }
   }
 
@@ -601,13 +611,12 @@ public class DataRegion {
       for (String deviceId : resource.getDevices()) {
         long endTime = resource.getEndTime(deviceId);
         long endTimePartitionId = StorageEngineV2.getTimePartition(endTime);
-        lastFlushTimeManager.setOneDeviceLastTime(endTimePartitionId, deviceId, endTime);
-        lastFlushTimeManager.setOneDeviceGlobalFlushedTime(deviceId, endTime);
+        lastFlushTimeMap.setOneDeviceGlobalFlushedTime(deviceId, endTime);
 
         // set all the covered partition's LatestFlushedTime
         long partitionId = StorageEngineV2.getTimePartition(resource.getStartTime(deviceId));
         while (partitionId <= endTimePartitionId) {
-          lastFlushTimeManager.setOneDeviceFlushedTime(partitionId, deviceId, endTime);
+          lastFlushTimeMap.setOneDeviceFlushedTime(partitionId, deviceId, endTime);
           if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
             File directory =
                 SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId));
@@ -754,13 +763,16 @@ public class DataRegion {
       // the last file is not closed, continue writing to it
       RestorableTsFileIOWriter writer = recoverPerformer.getWriter();
       long timePartitionId = tsFileResource.getTimePartition();
+      TimePartitionManager.getInstance()
+          .updateAfterOpeningTsFileProcessor(
+              new DataRegionId(Integer.parseInt(dataRegionId)), timePartitionId);
       TsFileProcessor tsFileProcessor =
           new TsFileProcessor(
               dataRegionId,
               dataRegionInfo,
               tsFileResource,
               this::closeUnsealedTsFileProcessorCallBack,
-              isSeq ? this::updateLatestFlushTimeCallback : this::unsequenceFlushCallback,
+              isSeq ? this::sequenceFlushCallback : this::unsequenceFlushCallback,
               isSeq,
               writer);
       if (isSeq) {
@@ -787,7 +799,6 @@ public class DataRegion {
         }
         tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize);
       }
-      updateLastFlushTime(tsFileResource, isSeq);
     }
     tsFileManager.add(tsFileResource, recoverPerformer.isSequence());
   }
@@ -811,7 +822,6 @@ public class DataRegion {
       }
       sealedTsFile.close();
       tsFileManager.add(sealedTsFile, isSeq);
-      updateLastFlushTime(sealedTsFile, isSeq);
       tsFileResourceManager.registerSealedTsFileResource(sealedTsFile);
     } catch (DataRegionException | IOException e) {
       logger.error("Fail to recover sealed TsFile {}, skip it.", sealedTsFile.getTsFilePath(), e);
@@ -821,6 +831,32 @@ public class DataRegion {
     }
   }
 
+  private void recoverFilesInPartition(
+      long partitionId,
+      DataRegionRecoveryContext context,
+      List<TsFileResource> resourceList,
+      boolean isSeq,
+      boolean isLatestPartition) {
+    for (TsFileResource tsFileResource : resourceList) {
+      recoverSealedTsFiles(tsFileResource, context, isSeq);
+    }
+    if (isLatestPartition && isSeq) {
+      lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId);
+      for (TsFileResource tsFileResource : resourceList) {
+        updateLastFlushTime(tsFileResource, true);
+      }
+      TimePartitionManager.getInstance()
+          .registerTimePartitionInfo(
+              new TimePartitionInfo(
+                  new DataRegionId(Integer.valueOf(dataRegionId)),
+                  partitionId,
+                  false,
+                  Long.MAX_VALUE,
+                  lastFlushTimeMap.getMemSize(partitionId),
+                  true));
+    }
+  }
+
   // ({systemTime}-{versionNum}-{mergeNum}.tsfile)
   private int compareFileName(File o1, File o2) {
     String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
@@ -854,11 +890,21 @@ public class DataRegion {
       // init map
       long timePartitionId = StorageEngineV2.getTimePartition(insertRowNode.getTime());
 
-      lastFlushTimeManager.ensureFlushedTimePartition(timePartitionId);
+      if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+        TimePartitionManager.getInstance()
+            .registerTimePartitionInfo(
+                new TimePartitionInfo(
+                    new DataRegionId(Integer.valueOf(dataRegionId)),
+                    timePartitionId,
+                    true,
+                    Long.MAX_VALUE,
+                    0,
+                    tsFileManager.isLatestTimePartition(timePartitionId)));
+      }
 
       boolean isSequence =
           insertRowNode.getTime()
-              > lastFlushTimeManager.getFlushedTime(
+              > lastFlushTimeMap.getFlushedTime(
                   timePartitionId, insertRowNode.getDevicePath().getFullPath());
 
       // is unsequence and user set config to discard out of order data
@@ -867,8 +913,6 @@ public class DataRegion {
         return;
       }
 
-      lastFlushTimeManager.ensureLastTimePartition(timePartitionId);
-
       // fire trigger before insertion
       // TriggerEngine.fire(TriggerEvent.BEFORE_INSERT, insertRowNode);
       // insert to sequence or unSequence file
@@ -932,9 +976,23 @@ public class DataRegion {
       long beforeTimePartition =
           StorageEngineV2.getTimePartition(insertTabletNode.getTimes()[before]);
       // init map
+
+      if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
+        TimePartitionManager.getInstance()
+            .registerTimePartitionInfo(
+                new TimePartitionInfo(
+                    new DataRegionId(Integer.valueOf(dataRegionId)),
+                    beforeTimePartition,
+                    true,
+                    Long.MAX_VALUE,
+                    0,
+                    tsFileManager.isLatestTimePartition(beforeTimePartition)));
+      }
+
       long lastFlushTime =
-          lastFlushTimeManager.ensureFlushedTimePartitionAndInit(
-              beforeTimePartition, insertTabletNode.getDevicePath().getFullPath(), Long.MIN_VALUE);
+          lastFlushTimeMap.getFlushedTime(
+              beforeTimePartition, insertTabletNode.getDevicePath().getFullPath());
+
       // if is sequence
       boolean isSequence = false;
       while (loc < insertTabletNode.getRowCount()) {
@@ -965,7 +1023,7 @@ public class DataRegion {
                 && noFailure;
       }
       long globalLatestFlushedTime =
-          lastFlushTimeManager.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
+          lastFlushTimeMap.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
       tryToUpdateBatchInsertLastCache(insertTabletNode, globalLatestFlushedTime);
 
       if (!noFailure) {
@@ -1034,15 +1092,6 @@ public class DataRegion {
       return false;
     }
 
-    lastFlushTimeManager.ensureLastTimePartition(timePartitionId);
-    // try to update the latest time of the device of this tsRecord
-    if (sequence) {
-      lastFlushTimeManager.updateLastTime(
-          timePartitionId,
-          insertTabletNode.getDevicePath().getFullPath(),
-          insertTabletNode.getTimes()[end - 1]);
-    }
-
     // check memtable size and may async try to flush the work memtable
     if (tsFileProcessor.shouldFlush()) {
       fileFlushPolicy.apply(this, tsFileProcessor, sequence);
@@ -1078,12 +1127,8 @@ public class DataRegion {
 
     tsFileProcessor.insert(insertRowNode);
 
-    // try to update the latest time of the device of this tsRecord
-    lastFlushTimeManager.updateLastTime(
-        timePartitionId, insertRowNode.getDevicePath().getFullPath(), insertRowNode.getTime());
-
     long globalLatestFlushTime =
-        lastFlushTimeManager.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
+        lastFlushTimeMap.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
 
     tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
 
@@ -1206,6 +1251,9 @@ public class DataRegion {
 
     if (null == res) {
       // build new processor, memory control module will control the number of memtables
+      TimePartitionManager.getInstance()
+          .updateAfterOpeningTsFileProcessor(
+              new DataRegionId(Integer.valueOf(dataRegionId)), timeRangeId);
       res = newTsFileProcessor(sequence, timeRangeId);
       tsFileProcessorTreeMap.put(timeRangeId, res);
       tsFileManager.add(res.getTsFileResource(), sequence);
@@ -1243,7 +1291,7 @@ public class DataRegion {
               fsFactory.getFileWithParent(filePath),
               dataRegionInfo,
               this::closeUnsealedTsFileProcessorCallBack,
-              this::updateLatestFlushTimeCallback,
+              this::sequenceFlushCallback,
               true);
     } else {
       tsFileProcessor =
@@ -1335,7 +1383,6 @@ public class DataRegion {
         tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
     if (sequence) {
       closingSequenceTsFileProcessor.add(tsFileProcessor);
-      updateEndTimeMap(tsFileProcessor);
       tsFileProcessor.asyncClose();
 
       workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
@@ -1414,9 +1461,8 @@ public class DataRegion {
       this.workSequenceTsFileProcessors.clear();
       this.workUnsequenceTsFileProcessors.clear();
       this.tsFileManager.clear();
-      lastFlushTimeManager.clearFlushedTime();
-      lastFlushTimeManager.clearGlobalFlushedTime();
-      lastFlushTimeManager.clearLastTime();
+      lastFlushTimeMap.clearFlushedTime();
+      lastFlushTimeMap.clearGlobalFlushedTime();
     } finally {
       writeUnlock();
     }
@@ -2029,61 +2075,44 @@ public class DataRegion {
     }
   }
 
-  /**
-   * when close an TsFileProcessor, update its EndTimeMap immediately
-   *
-   * @param tsFileProcessor processor to be closed
-   */
-  private void updateEndTimeMap(TsFileProcessor tsFileProcessor) {
-    TsFileResource resource = tsFileProcessor.getTsFileResource();
-    for (String deviceId : resource.getDevices()) {
-      resource.updateEndTime(
-          deviceId, lastFlushTimeManager.getLastTime(tsFileProcessor.getTimeRangeId(), deviceId));
-    }
-  }
-
-  private boolean unsequenceFlushCallback(TsFileProcessor processor) {
+  private boolean unsequenceFlushCallback(
+      TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
+    TimePartitionManager.getInstance()
+        .updateAfterFlushing(
+            new DataRegionId(Integer.valueOf(dataRegionId)),
+            processor.getTimeRangeId(),
+            systemFlushTime,
+            lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
+            workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
     return true;
   }
 
-  private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
-    boolean res = lastFlushTimeManager.updateLatestFlushTime(processor.getTimeRangeId());
+  private boolean sequenceFlushCallback(
+      TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
+    boolean res = lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), updateMap);
     if (!res) {
       logger.warn(
           "Partition: {} does't have latest time for each device. "
               + "No valid record is written into memtable. Flushing tsfile is: {}",
           processor.getTimeRangeId(),
           processor.getTsFileResource().getTsFile());
+      return res;
     }
 
-    return res;
-  }
-
-  /**
-   * update latest flush time for partition id
-   *
-   * @param partitionId partition id
-   * @param latestFlushTime lastest flush time
-   * @return true if update latest flush time success
-   */
-  private boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
-    boolean res =
-        lastFlushTimeManager.updateLatestFlushTimeToPartition(partitionId, latestFlushTime);
-    if (!res) {
-      logger.warn(
-          "Partition: {} does't have latest time for each device. "
-              + "No valid record is written into memtable.  latest flush time is: {}",
-          partitionId,
-          latestFlushTime);
-    }
-
+    TimePartitionManager.getInstance()
+        .updateAfterFlushing(
+            new DataRegionId(Integer.valueOf(dataRegionId)),
+            processor.getTimeRangeId(),
+            systemFlushTime,
+            lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
+            workUnsequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
     return res;
   }
 
   /** used for upgrading */
   public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
       long partitionId, String deviceId, long time) {
-    lastFlushTimeManager.updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+    lastFlushTimeMap.updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
         partitionId, deviceId, time);
   }
 
@@ -2163,7 +2192,7 @@ public class DataRegion {
         writeUnlock();
       }
       // after upgrade complete, update partitionLatestFlushedTimeForEachDevice
-      lastFlushTimeManager.applyNewlyFlushedTimeToFlushedTime();
+      lastFlushTimeMap.applyNewlyFlushedTimeToFlushedTime();
     }
   }
 
@@ -2579,9 +2608,8 @@ public class DataRegion {
     for (String device : newTsFileResource.getDevices()) {
       long endTime = newTsFileResource.getEndTime(device);
       long timePartitionId = StorageEngineV2.getTimePartition(endTime);
-      lastFlushTimeManager.updateLastTime(timePartitionId, device, endTime);
-      lastFlushTimeManager.updateFlushedTime(timePartitionId, device, endTime);
-      lastFlushTimeManager.updateGlobalFlushedTime(device, endTime);
+      lastFlushTimeMap.updateFlushedTime(timePartitionId, device, endTime);
+      lastFlushTimeMap.updateGlobalFlushedTime(device, endTime);
     }
   }
 
@@ -3006,13 +3034,15 @@ public class DataRegion {
         iterator.hasNext(); ) {
       Entry<Long, TsFileProcessor> longTsFileProcessorEntry = iterator.next();
       long partitionId = longTsFileProcessorEntry.getKey();
+      lastFlushTimeMap.removePartition(partitionId);
+      TimePartitionManager.getInstance()
+          .removePartition(new DataRegionId(Integer.valueOf(dataRegionId)), partitionId);
       TsFileProcessor processor = longTsFileProcessorEntry.getValue();
       if (filter.satisfy(storageGroupName, partitionId)) {
         processor.syncClose();
         iterator.remove();
         processor.getTsFileResource().remove();
         tsFileManager.remove(processor.getTsFileResource(), sequence);
-        updateLatestFlushTimeToPartition(partitionId, Long.MIN_VALUE);
         logger.debug(
             "{} is removed during deleting partitions",
             processor.getTsFileResource().getTsFilePath());
@@ -3028,7 +3058,10 @@ public class DataRegion {
       if (filter.satisfy(storageGroupName, tsFileResource.getTimePartition())) {
         tsFileResource.remove();
         tsFileManager.remove(tsFileResource, sequence);
-        updateLatestFlushTimeToPartition(tsFileResource.getTimePartition(), Long.MIN_VALUE);
+        lastFlushTimeMap.removePartition(tsFileResource.getTimePartition());
+        TimePartitionManager.getInstance()
+            .removePartition(
+                new DataRegionId(Integer.valueOf(dataRegionId)), tsFileResource.getTimePartition());
         logger.debug("{} is removed during deleting partitions", tsFileResource.getTsFilePath());
       }
     }
@@ -3072,14 +3105,25 @@ public class DataRegion {
         // init map
         long timePartitionId = StorageEngineV2.getTimePartition(insertRowNode.getTime());
 
-        lastFlushTimeManager.ensureFlushedTimePartition(timePartitionId);
+        if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+          TimePartitionManager.getInstance()
+              .registerTimePartitionInfo(
+                  new TimePartitionInfo(
+                      new DataRegionId(Integer.valueOf(dataRegionId)),
+                      timePartitionId,
+                      true,
+                      Long.MAX_VALUE,
+                      0,
+                      tsFileManager.isLatestTimePartition(timePartitionId)));
+        }
+
         // as the plans have been ordered, and we have get the write lock,
         // So, if a plan is sequenced, then all the rest plans are sequenced.
         //
         if (!isSequence) {
           isSequence =
               insertRowNode.getTime()
-                  > lastFlushTimeManager.getFlushedTime(
+                  > lastFlushTimeMap.getFlushedTime(
                       timePartitionId, insertRowNode.getDevicePath().getFullPath());
         }
         // is unsequence and user set config to discard out of order data
@@ -3088,8 +3132,6 @@ public class DataRegion {
           return;
         }
 
-        lastFlushTimeManager.ensureLastTimePartition(timePartitionId);
-
         // fire trigger before insertion
         // TriggerEngine.fire(TriggerEvent.BEFORE_INSERT, plan);
         // insert to sequence or unSequence file
@@ -3251,7 +3293,7 @@ public class DataRegion {
   @FunctionalInterface
   public interface UpdateEndTimeCallBack {
 
-    boolean call(TsFileProcessor caller);
+    boolean call(TsFileProcessor caller, Map<String, Long> updateMap, long systemFlushTime);
   }
 
   @FunctionalInterface
@@ -3336,13 +3378,17 @@ public class DataRegion {
     }
   }
 
+  public void releaseFlushTimeMap(long timePartitionId) {
+    lastFlushTimeMap.removePartition(timePartitionId);
+  }
+
   public long getMemCost() {
     return dataRegionInfo.getMemCost();
   }
 
   @TestOnly
-  public ILastFlushTimeManager getLastFlushTimeManager() {
-    return lastFlushTimeManager;
+  public ILastFlushTimeMap getLastFlushTimeMap() {
+    return lastFlushTimeMap;
   }
 
   @TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java
index f01139166b..fc89a2f125 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java
@@ -41,7 +41,7 @@ public class DataRegionInfo {
       (long)
           (IoTDBDescriptor.getInstance().getConfig().getWriteMemoryVariationReportProportion()
               * IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForStorageEngine()
-              * IoTDBDescriptor.getInstance().getConfig().getWriteProportion());
+              * IoTDBDescriptor.getInstance().getConfig().getWriteProportionForMemtable());
 
   private final AtomicLong lastReportedSize = new AtomicLong();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
similarity index 52%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
index f16f3181f8..b110cd6768 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
@@ -23,22 +23,30 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
-/**
- * This class manages last time and flush time for sequence and unsequence determination This class
- * This class is NOT thread safe, caller should ensure synchronization
- */
-public class LastFlushTimeManager implements ILastFlushTimeManager {
-  private static final Logger logger = LoggerFactory.getLogger(LastFlushTimeManager.class);
-  /*
-   * time partition id -> map, which contains
-   * device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
-   * changes upon timestamps of each device, and is used to update partitionLatestFlushedTimeForEachDevice
-   * when a flush is issued.
+public class HashLastFlushTimeMap implements ILastFlushTimeMap {
+
+  private static final Logger logger = LoggerFactory.getLogger(HashLastFlushTimeMap.class);
+
+  /**
+   * String basic total, 40B
+   *
+   * <ul>
+   *   <li>Object header: Mark Word + Classic Pointer, 12B
+   *   <li>char[] reference 4B
+   *   <li>hash code, 4B
+   *   <li>padding 4B
+   *   <li>char[] header + length 16B
+   * </ul>
    */
-  private Map<Long, Map<String, Long>> latestTimeForEachDevice = new HashMap<>();
+  long STRING_BASE_SIZE = 40;
+
+  long LONG_SIZE = 24;
+
+  long HASHMAP_NODE_BASIC_SIZE = 14 + STRING_BASE_SIZE + LONG_SIZE;
+
   /**
    * time partition id -> map, which contains device -> largest timestamp of the latest memtable to
    * be submitted to asyncTryToFlush partitionLatestFlushedTimeForEachDevice determines whether a
@@ -58,31 +66,46 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
    */
   private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
 
-  // region set
-  @Override
-  public void setMultiDeviceLastTime(long timePartitionId, Map<String, Long> lastTimeMap) {
-    latestTimeForEachDevice
-        .computeIfAbsent(timePartitionId, l -> new HashMap<>())
-        .putAll(lastTimeMap);
-  }
+  /** used for recovering flush time from tsfile resource */
+  TsFileManager tsFileManager;
 
-  @Override
-  public void setOneDeviceLastTime(long timePartitionId, String path, long time) {
-    latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()).put(path, time);
+  /** record memory cost of map for each partitionId */
+  private Map<Long, Long> memCostForEachPartition = new HashMap<>();
+
+  public HashLastFlushTimeMap(TsFileManager tsFileManager) {
+    this.tsFileManager = tsFileManager;
   }
 
   @Override
   public void setMultiDeviceFlushedTime(long timePartitionId, Map<String, Long> flushedTimeMap) {
-    partitionLatestFlushedTimeForEachDevice
-        .computeIfAbsent(timePartitionId, l -> new HashMap<>())
-        .putAll(flushedTimeMap);
+    Map<String, Long> flushTimeMapForPartition =
+        partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
+    if (flushTimeMapForPartition == null) {
+      return;
+    }
+    long memIncr = 0;
+    for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
+      if (flushTimeMapForPartition.put(entry.getKey(), entry.getValue()) == null) {
+        memIncr += HASHMAP_NODE_BASIC_SIZE + 2L * entry.getKey().length();
+      }
+    }
+    long finalMemIncr = memIncr;
+    memCostForEachPartition.compute(
+        timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + finalMemIncr);
   }
 
   @Override
   public void setOneDeviceFlushedTime(long timePartitionId, String path, long time) {
-    partitionLatestFlushedTimeForEachDevice
-        .computeIfAbsent(timePartitionId, l -> new HashMap<>())
-        .put(path, time);
+    Map<String, Long> flushTimeMapForPartition =
+        partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
+    if (flushTimeMapForPartition == null) {
+      return;
+    }
+    if (flushTimeMapForPartition.put(path, time) == null) {
+      long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * path.length();
+      memCostForEachPartition.compute(
+          timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
+    }
   }
 
   @Override
@@ -95,22 +118,24 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
     globalLatestFlushedTimeForEachDevice.put(path, time);
   }
 
-  // endregion
-
-  // region update
-
-  @Override
-  public void updateLastTime(long timePartitionId, String path, long time) {
-    latestTimeForEachDevice
-        .computeIfAbsent(timePartitionId, id -> new HashMap<>())
-        .compute(path, (k, v) -> v == null ? time : Math.max(v, time));
-  }
-
   @Override
   public void updateFlushedTime(long timePartitionId, String path, long time) {
-    partitionLatestFlushedTimeForEachDevice
-        .computeIfAbsent(timePartitionId, id -> new HashMap<>())
-        .compute(path, (k, v) -> v == null ? time : Math.max(v, time));
+    Map<String, Long> flushTimeMapForPartition =
+        partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
+    if (flushTimeMapForPartition == null) {
+      return;
+    }
+    flushTimeMapForPartition.compute(
+        path,
+        (k, v) -> {
+          if (v == null) {
+            long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * path.length();
+            memCostForEachPartition.compute(
+                timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
+            return time;
+          }
+          return Math.max(v, time);
+        });
   }
 
   @Override
@@ -127,39 +152,23 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
         .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
   }
 
-  // endregion
-
-  // region ensure
-
   @Override
-  public void ensureLastTimePartition(long timePartitionId) {
-    latestTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
-  }
-
-  @Override
-  public void ensureFlushedTimePartition(long timePartitionId) {
-    partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
-  }
-
-  @Override
-  public long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime) {
-    return partitionLatestFlushedTimeForEachDevice
-        .computeIfAbsent(timePartitionId, id -> new HashMap<>())
-        .computeIfAbsent(path, id -> initTime);
+  public boolean checkAndCreateFlushedTimePartition(long timePartitionId) {
+    if (!partitionLatestFlushedTimeForEachDevice.containsKey(timePartitionId)) {
+      partitionLatestFlushedTimeForEachDevice.put(timePartitionId, new HashMap<>());
+      return false;
+    }
+    return true;
   }
 
-  // endregion
-
-  // region upgrade support methods
-
   @Override
   public void applyNewlyFlushedTimeToFlushedTime() {
-    for (Entry<Long, Map<String, Long>> entry :
+    for (Map.Entry<Long, Map<String, Long>> entry :
         newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet()) {
       long timePartitionId = entry.getKey();
       Map<String, Long> latestFlushTimeForPartition =
           partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>());
-      for (Entry<String, Long> endTimeMap : entry.getValue().entrySet()) {
+      for (Map.Entry<String, Long> endTimeMap : entry.getValue().entrySet()) {
         String device = endTimeMap.getKey();
         long endTime = endTimeMap.getValue();
         if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
@@ -171,49 +180,9 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
     }
   }
 
-  /**
-   * update latest flush time for partition id
-   *
-   * @param partitionId partition id
-   * @param latestFlushTime lastest flush time
-   * @return true if update latest flush time success
-   */
-  @Override
-  public boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
-    Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice.get(partitionId);
-
-    if (curPartitionDeviceLatestTime == null) {
-      return false;
-    }
-
-    for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
-      // set lastest flush time to latestTimeForEachDevice
-      entry.setValue(latestFlushTime);
-
-      partitionLatestFlushedTimeForEachDevice
-          .computeIfAbsent(partitionId, id -> new HashMap<>())
-          .put(entry.getKey(), entry.getValue());
-      newlyFlushedPartitionLatestFlushedTimeForEachDevice
-          .computeIfAbsent(partitionId, id -> new HashMap<>())
-          .put(entry.getKey(), entry.getValue());
-      if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
-          < entry.getValue()) {
-        globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
-      }
-    }
-    return true;
-  }
-
   @Override
-  public boolean updateLatestFlushTime(long partitionId) {
-    // update the largest timestamp in the last flushing memtable
-    Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice.get(partitionId);
-
-    if (curPartitionDeviceLatestTime == null) {
-      return false;
-    }
-
-    for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+  public boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
+    for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(partitionId, id -> new HashMap<>())
           .put(entry.getKey(), entry.getValue());
@@ -227,19 +196,11 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
     return true;
   }
 
-  // endregion
-
-  // region query
   @Override
   public long getFlushedTime(long timePartitionId, String path) {
     return partitionLatestFlushedTimeForEachDevice
         .get(timePartitionId)
-        .getOrDefault(path, Long.MIN_VALUE);
-  }
-
-  @Override
-  public long getLastTime(long timePartitionId, String path) {
-    return latestTimeForEachDevice.get(timePartitionId).getOrDefault(path, Long.MIN_VALUE);
+        .computeIfAbsent(path, k -> recoverFlushTime(timePartitionId, path));
   }
 
   @Override
@@ -247,14 +208,6 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
     return globalLatestFlushedTimeForEachDevice.getOrDefault(path, Long.MIN_VALUE);
   }
 
-  // endregion
-
-  // region clear
-  @Override
-  public void clearLastTime() {
-    latestTimeForEachDevice.clear();
-  }
-
   @Override
   public void clearFlushedTime() {
     partitionLatestFlushedTimeForEachDevice.clear();
@@ -264,5 +217,33 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
   public void clearGlobalFlushedTime() {
     globalLatestFlushedTimeForEachDevice.clear();
   }
-  // endregion
+
+  @Override
+  public void removePartition(long partitionId) {
+    partitionLatestFlushedTimeForEachDevice.remove(partitionId);
+    memCostForEachPartition.remove(partitionId);
+  }
+
+  private long recoverFlushTime(long partitionId, String devicePath) {
+    List<TsFileResource> tsFileResourceList =
+        tsFileManager.getSequenceListByTimePartition(partitionId);
+
+    for (int i = tsFileResourceList.size() - 1; i >= 0; i--) {
+      if (tsFileResourceList.get(i).timeIndex.mayContainsDevice(devicePath)) {
+        return tsFileResourceList.get(i).timeIndex.getEndTime(devicePath);
+      }
+    }
+
+    long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * devicePath.length();
+    memCostForEachPartition.compute(partitionId, (k, v) -> v == null ? memCost : v + memCost);
+    return Long.MIN_VALUE;
+  }
+
+  @Override
+  public long getMemSize(long partitionId) {
+    if (memCostForEachPartition.containsKey(partitionId)) {
+      return memCostForEachPartition.get(partitionId);
+    }
+    return 0;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
similarity index 51%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
index 61388ce66f..8e690290cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
@@ -22,43 +22,51 @@ package org.apache.iotdb.db.engine.storagegroup;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
+public class IDTableLastFlushTimeMap implements ILastFlushTimeMap {
+
+  long LONG_SIZE = 24;
+
+  long HASHMAP_NODE_BASIC_SIZE = 14 + LONG_SIZE + LONG_SIZE;
 
-/**
- * This class manages last time and flush time for sequence and unsequence determination This class
- * This class is NOT thread safe, caller should ensure synchronization This class not support
- * upgrade
- */
-public class IDTableFlushTimeManager implements ILastFlushTimeManager {
   IDTable idTable;
 
-  public IDTableFlushTimeManager(IDTable idTable) {
-    this.idTable = idTable;
-  }
+  TsFileManager tsFileManager;
 
-  // region set
-  @Override
-  public void setMultiDeviceLastTime(long timePartitionId, Map<String, Long> lastTimeMap) {
-    for (Map.Entry<String, Long> entry : lastTimeMap.entrySet()) {
-      idTable.getDeviceEntry(entry.getKey()).putLastTimeMap(timePartitionId, entry.getValue());
-    }
-  }
+  /** record memory cost of map for each partitionId */
+  private Map<Long, Long> memCostForEachPartition = new HashMap<>();
 
-  @Override
-  public void setOneDeviceLastTime(long timePartitionId, String path, long time) {
-    idTable.getDeviceEntry(path).putLastTimeMap(timePartitionId, time);
+  private Set<Long> partitionSet = new HashSet<>();
+
+  public IDTableLastFlushTimeMap(IDTable idTable, TsFileManager tsFileManager) {
+    this.idTable = idTable;
+    this.tsFileManager = tsFileManager;
   }
 
   @Override
   public void setMultiDeviceFlushedTime(long timePartitionId, Map<String, Long> flushedTimeMap) {
     for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
-      idTable.getDeviceEntry(entry.getKey()).putFlushTimeMap(timePartitionId, entry.getValue());
+      if (idTable.getDeviceEntry(entry.getKey()).putFlushTimeMap(timePartitionId, entry.getValue())
+          == null) {
+        memCostForEachPartition.compute(
+            timePartitionId,
+            (k, v) -> v == null ? HASHMAP_NODE_BASIC_SIZE : v + HASHMAP_NODE_BASIC_SIZE);
+      }
     }
   }
 
   @Override
   public void setOneDeviceFlushedTime(long timePartitionId, String path, long time) {
-    idTable.getDeviceEntry(path).putFlushTimeMap(timePartitionId, time);
+    if (idTable.getDeviceEntry(path).putFlushTimeMap(timePartitionId, time) == null) {
+      memCostForEachPartition.compute(
+          timePartitionId,
+          (k, v) -> v == null ? HASHMAP_NODE_BASIC_SIZE : v + HASHMAP_NODE_BASIC_SIZE);
+    }
   }
 
   @Override
@@ -73,17 +81,13 @@ public class IDTableFlushTimeManager implements ILastFlushTimeManager {
     idTable.getDeviceEntry(path).setGlobalFlushTime(time);
   }
 
-  // endregion
-
-  // region update
-
-  @Override
-  public void updateLastTime(long timePartitionId, String path, long time) {
-    idTable.getDeviceEntry(path).updateLastTimeMap(timePartitionId, time);
-  }
-
   @Override
   public void updateFlushedTime(long timePartitionId, String path, long time) {
+    if (idTable.getDeviceEntry(path).getFlushTime(timePartitionId) == null) {
+      memCostForEachPartition.compute(
+          timePartitionId,
+          (k, v) -> v == null ? HASHMAP_NODE_BASIC_SIZE : v + HASHMAP_NODE_BASIC_SIZE);
+    }
     idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, time);
   }
 
@@ -98,110 +102,87 @@ public class IDTableFlushTimeManager implements ILastFlushTimeManager {
     throw new UnsupportedOperationException("IDTableFlushTimeManager doesn't support upgrade");
   }
 
-  // endregion
-
-  // region ensure
-
-  @Override
-  public void ensureLastTimePartition(long timePartitionId) {
-    // do nothing is correct
-  }
-
   @Override
-  public void ensureFlushedTimePartition(long timePartitionId) {
-    // do nothing is correct
+  public boolean checkAndCreateFlushedTimePartition(long timePartitionId) {
+    return !partitionSet.add(timePartitionId);
   }
 
-  @Override
-  public long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime) {
-    return idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, initTime);
-  }
-
-  // endregion
-
-  // region upgrade support methods
-
   @Override
   public void applyNewlyFlushedTimeToFlushedTime() {
     throw new UnsupportedOperationException("IDTableFlushTimeManager doesn't support upgrade");
   }
 
-  /**
-   * update latest flush time for partition id
-   *
-   * @param partitionId partition id
-   * @param latestFlushTime lastest flush time
-   * @return true if update latest flush time success
-   */
   @Override
-  public boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
-    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
-      deviceEntry.putLastTimeMap(partitionId, latestFlushTime);
-      deviceEntry.putFlushTimeMap(partitionId, latestFlushTime);
-      deviceEntry.updateGlobalFlushTime(latestFlushTime);
+  public boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
+    for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
+      DeviceEntry deviceEntry = idTable.getDeviceEntry(entry.getKey());
+      deviceEntry.updateFlushTimeMap(partitionId, entry.getValue());
+      if (deviceEntry.getGlobalFlushTime() < entry.getValue()) {
+        deviceEntry.setGlobalFlushTime(entry.getValue());
+      }
     }
-
     return true;
   }
 
   @Override
-  public boolean updateLatestFlushTime(long partitionId) {
-    boolean updated = false;
-
-    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
-      Long lastTime = deviceEntry.getLastTime(partitionId);
-      if (lastTime == null) {
-        continue;
-      }
-
-      updated = true;
-      deviceEntry.putFlushTimeMap(partitionId, lastTime);
-      deviceEntry.updateGlobalFlushTime(lastTime);
+  public long getFlushedTime(long timePartitionId, String path) {
+    Long flushTime = idTable.getDeviceEntry(path).getFlushTime(timePartitionId);
+    if (flushTime != null) {
+      return flushTime;
     }
-
-    return updated;
+    long time = recoverFlushTime(timePartitionId, path);
+    idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, time);
+    return time;
   }
 
-  // endregion
-
-  // region query
   @Override
-  public long getFlushedTime(long timePartitionId, String path) {
-    return idTable.getDeviceEntry(path).getFLushTimeWithDefaultValue(timePartitionId);
+  public long getGlobalFlushedTime(String path) {
+    return idTable.getDeviceEntry(path).getGlobalFlushTime();
   }
 
   @Override
-  public long getLastTime(long timePartitionId, String path) {
-    return idTable.getDeviceEntry(path).getLastTimeWithDefaultValue(timePartitionId);
+  public void clearFlushedTime() {
+    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+      deviceEntry.clearFlushTime();
+    }
   }
 
   @Override
-  public long getGlobalFlushedTime(String path) {
-    return idTable.getDeviceEntry(path).getGlobalFlushTime();
+  public void clearGlobalFlushedTime() {
+    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+      deviceEntry.setGlobalFlushTime(Long.MIN_VALUE);
+    }
   }
 
-  // endregion
-
-  // region clear
   @Override
-  public void clearLastTime() {
+  public void removePartition(long partitionId) {
     for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
-      deviceEntry.clearLastTime();
+      deviceEntry.removePartition(partitionId);
     }
+    partitionSet.remove(partitionId);
+    memCostForEachPartition.remove(partitionId);
   }
 
-  @Override
-  public void clearFlushedTime() {
-    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
-      deviceEntry.clearFlushTime();
+  private long recoverFlushTime(long partitionId, String devicePath) {
+    List<TsFileResource> tsFileResourceList =
+        tsFileManager.getSequenceListByTimePartition(partitionId);
+
+    for (int i = tsFileResourceList.size() - 1; i >= 0; i--) {
+      if (tsFileResourceList.get(i).timeIndex.mayContainsDevice(devicePath)) {
+        return tsFileResourceList.get(i).timeIndex.getEndTime(devicePath);
+      }
     }
+
+    memCostForEachPartition.compute(
+        partitionId, (k, v) -> v == null ? HASHMAP_NODE_BASIC_SIZE : v + HASHMAP_NODE_BASIC_SIZE);
+    return Long.MIN_VALUE;
   }
 
   @Override
-  public void clearGlobalFlushedTime() {
-    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
-      deviceEntry.setGlobalFlushTime(Long.MIN_VALUE);
+  public long getMemSize(long partitionId) {
+    if (memCostForEachPartition.containsKey(partitionId)) {
+      return memCostForEachPartition.get(partitionId);
     }
+    return 0;
   }
-  // endregion
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
similarity index 67%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
index 23b988aaf7..a98dd06e6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,18 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.engine.storagegroup;
 
 import java.util.Map;
 
 /** This interface manages last time and flush time for sequence and unsequence determination */
-public interface ILastFlushTimeManager {
+public interface ILastFlushTimeMap {
 
   // region set
-  void setMultiDeviceLastTime(long timePartitionId, Map<String, Long> lastTimeMap);
-
-  void setOneDeviceLastTime(long timePartitionId, String path, long time);
-
   void setMultiDeviceFlushedTime(long timePartitionId, Map<String, Long> flushedTimeMap);
 
   void setOneDeviceFlushedTime(long timePartitionId, String path, long time);
@@ -38,7 +35,6 @@ public interface ILastFlushTimeManager {
   // endregion
 
   // region update
-  void updateLastTime(long timePartitionId, String path, long time);
 
   void updateFlushedTime(long timePartitionId, String path, long time);
 
@@ -49,41 +45,29 @@ public interface ILastFlushTimeManager {
   // endregion
 
   // region ensure
-  void ensureLastTimePartition(long timePartitionId);
+  boolean checkAndCreateFlushedTimePartition(long timePartitionId);
 
-  void ensureFlushedTimePartition(long timePartitionId);
-
-  long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime);
   // endregion
 
   // region support upgrade methods
   void applyNewlyFlushedTimeToFlushedTime();
 
-  /**
-   * update latest flush time for partition id
-   *
-   * @param partitionId partition id
-   * @param latestFlushTime lastest flush time
-   * @return true if update latest flush time success
-   */
-  boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime);
-
-  boolean updateLatestFlushTime(long partitionId);
+  boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
   // endregion
 
   // region query
   long getFlushedTime(long timePartitionId, String path);
 
-  long getLastTime(long timePartitionId, String path);
-
   long getGlobalFlushedTime(String path);
   // endregion
 
   // region clear
-  void clearLastTime();
-
   void clearFlushedTime();
 
   void clearGlobalFlushedTime();
   // endregion
+
+  void removePartition(long partitionId);
+
+  long getMemSize(long partitionId);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionInfo.java
new file mode 100644
index 0000000000..4d01a573ad
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionInfo.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+
+/** Time partition info records necessary info of a time partition for a data region */
+public class TimePartitionInfo {
+  DataRegionId dataRegionId;
+
+  long partitionId;
+
+  boolean isActive;
+
+  long lastSystemFlushTime;
+
+  boolean isLatestPartition;
+
+  long memSize;
+
+  public TimePartitionInfo(
+      DataRegionId dataRegionId,
+      long partitionId,
+      boolean isActive,
+      long lastSystemFlushTime,
+      long memsize,
+      boolean isLatestPartition) {
+    this.dataRegionId = dataRegionId;
+    this.partitionId = partitionId;
+    this.isActive = isActive;
+    this.lastSystemFlushTime = lastSystemFlushTime;
+    this.memSize = memsize;
+    this.isLatestPartition = isLatestPartition;
+  }
+
+  public int comparePriority(TimePartitionInfo timePartitionInfo) {
+    int cmp = Boolean.compare(isActive, timePartitionInfo.isActive);
+    if (cmp != 0) {
+      return cmp;
+    }
+
+    cmp = Boolean.compare(isLatestPartition, timePartitionInfo.isLatestPartition);
+    if (cmp != 0) {
+      return cmp;
+    }
+
+    return Long.compare(lastSystemFlushTime, timePartitionInfo.lastSystemFlushTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java
new file mode 100644
index 0000000000..1e26a46b8a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/** Manage all the time partitions for all data regions and control the total memory of them */
+public class TimePartitionManager {
+  final Map<DataRegionId, Map<Long, TimePartitionInfo>> timePartitionInfoMap;
+
+  long memCost = 0;
+  long timePartitionInfoMemoryThreshold =
+      IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForTimePartitionInfo();
+
+  private TimePartitionManager() {
+    timePartitionInfoMap = new HashMap<>();
+  }
+
+  public void registerTimePartitionInfo(TimePartitionInfo timePartitionInfo) {
+    synchronized (timePartitionInfoMap) {
+      TreeMap<Long, TimePartitionInfo> timePartitionInfoMapForRegion =
+          (TreeMap<Long, TimePartitionInfo>)
+              timePartitionInfoMap.computeIfAbsent(
+                  timePartitionInfo.dataRegionId, k -> new TreeMap<>());
+
+      Map.Entry<Long, TimePartitionInfo> entry =
+          timePartitionInfoMapForRegion.floorEntry(timePartitionInfo.partitionId);
+      if (entry != null) {
+        entry.getValue().isLatestPartition = false;
+      }
+
+      timePartitionInfoMapForRegion.put(timePartitionInfo.partitionId, timePartitionInfo);
+    }
+  }
+
+  public void updateAfterFlushing(
+      DataRegionId dataRegionId,
+      long timePartitionId,
+      long systemFlushTime,
+      long memSize,
+      boolean isActive) {
+    synchronized (timePartitionInfoMap) {
+      TimePartitionInfo timePartitionInfo =
+          timePartitionInfoMap
+              .computeIfAbsent(dataRegionId, k -> new TreeMap<>())
+              .get(timePartitionId);
+      if (timePartitionInfo != null) {
+        timePartitionInfo.lastSystemFlushTime = systemFlushTime;
+        memCost += memSize - timePartitionInfo.memSize;
+        timePartitionInfo.memSize = memSize;
+        timePartitionInfo.isActive = isActive;
+        if (memCost > timePartitionInfoMemoryThreshold) {
+          evictOldPartition();
+        }
+      }
+    }
+  }
+
+  public void updateAfterOpeningTsFileProcessor(DataRegionId dataRegionId, long timePartitionId) {
+    synchronized (timePartitionInfoMap) {
+      TimePartitionInfo timePartitionInfo =
+          timePartitionInfoMap
+              .computeIfAbsent(dataRegionId, k -> new TreeMap<>())
+              .get(timePartitionId);
+      if (timePartitionInfo != null) {
+        timePartitionInfo.isActive = true;
+      }
+    }
+  }
+
+  private void evictOldPartition() {
+    TreeSet<TimePartitionInfo> treeSet = new TreeSet<>(TimePartitionInfo::comparePriority);
+    synchronized (timePartitionInfoMap) {
+      for (Map.Entry<DataRegionId, Map<Long, TimePartitionInfo>> entry :
+          timePartitionInfoMap.entrySet()) {
+        treeSet.addAll(entry.getValue().values());
+      }
+
+      while (memCost > timePartitionInfoMemoryThreshold) {
+        TimePartitionInfo timePartitionInfo = treeSet.first();
+        memCost -= timePartitionInfo.memSize;
+        DataRegion dataRegion =
+            StorageEngineV2.getInstance().getDataRegion(timePartitionInfo.dataRegionId);
+        if (dataRegion != null) {
+          dataRegion.releaseFlushTimeMap(timePartitionInfo.partitionId);
+        }
+        timePartitionInfoMap
+            .get(timePartitionInfo.dataRegionId)
+            .remove(timePartitionInfo.partitionId);
+      }
+    }
+  }
+
+  public void removePartition(DataRegionId dataRegionId, long partitionId) {
+    synchronized (timePartitionInfoMap) {
+      Map<Long, TimePartitionInfo> timePartitionInfoMapForDataRegion =
+          timePartitionInfoMap.get(dataRegionId);
+      if (timePartitionInfoMapForDataRegion != null) {
+        TimePartitionInfo timePartitionInfo = timePartitionInfoMapForDataRegion.get(partitionId);
+        if (timePartitionInfo != null) {
+          timePartitionInfoMapForDataRegion.remove(partitionId);
+          memCost -= timePartitionInfo.memSize;
+        }
+      }
+    }
+  }
+
+  public TimePartitionInfo getTimePartitionInfo(DataRegionId dataRegionId, long timePartitionId) {
+    synchronized (timePartitionInfoMap) {
+      Map<Long, TimePartitionInfo> timePartitionInfoMapForDataRegion =
+          timePartitionInfoMap.get(dataRegionId);
+      if (timePartitionInfoMapForDataRegion == null) {
+        return null;
+      }
+      return timePartitionInfoMapForDataRegion.get(timePartitionId);
+    }
+  }
+
+  public void clear() {
+    synchronized (timePartitionInfoMap) {
+      timePartitionInfoMap.clear();
+      memCost = 0;
+    }
+  }
+
+  @TestOnly
+  public void setTimePartitionInfoMemoryThreshold(long timePartitionInfoMemoryThreshold) {
+    this.timePartitionInfoMemoryThreshold = timePartitionInfoMemoryThreshold;
+  }
+
+  public static TimePartitionManager getInstance() {
+    return TimePartitionManager.InstanceHolder.instance;
+  }
+
+  private static class InstanceHolder {
+    private InstanceHolder() {}
+
+    private static TimePartitionManager instance = new TimePartitionManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 58d13bb673..725bcb13f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -442,4 +442,10 @@ public class TsFileManager {
       return false;
     }
   }
+
+  // determine whether time partition is the latest(largest) or not
+  public boolean isLatestTimePartition(long timePartitionId) {
+    return (sequenceFiles.higherKey(timePartitionId) == null
+        && unsequenceFiles.higherKey(timePartitionId) == null);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 35eda60ca8..66a72980c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -951,8 +951,15 @@ public class TsFileProcessor {
    * flushManager again.
    */
   private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
+    Map<String, Long> lastTimeForEachDevice = new HashMap<>();
+    if (sequence) {
+      lastTimeForEachDevice = tobeFlushed.getMaxTime();
+      tsFileResource.updateEndTime(lastTimeForEachDevice);
+    }
     if (!tobeFlushed.isSignalMemTable()
-        && (!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0)) {
+        && (tobeFlushed.memSize() == 0
+            || (!updateLatestFlushTimeCallback.call(
+                this, lastTimeForEachDevice, System.currentTimeMillis())))) {
       logger.warn(
           "This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
           storageGroupName,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 0f2fd7424c..eb234e24ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -1076,6 +1076,12 @@ public class TsFileResource {
     }
   }
 
+  public void updateEndTime(Map<String, Long> times) {
+    for (Map.Entry<String, Long> entry : times.entrySet()) {
+      timeIndex.updateEndTime(entry.getKey(), entry.getValue());
+    }
+  }
+
   /** @return is this tsfile resource in a TsFileResourceList */
   public boolean isFileInList() {
     return prev != null || next != null;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
index 3ef77edec7..9da30cfb55 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
@@ -34,10 +34,6 @@ public class DeviceEntry {
 
   boolean isAligned;
 
-  // for managing last time
-  // time partition -> last time
-  Map<Long, Long> lastTimeMapOfEachPartition;
-
   // for managing flush time
   // time partition -> flush time
   Map<Long, Long> flushTimeMapOfEachPartition;
@@ -47,7 +43,6 @@ public class DeviceEntry {
   public DeviceEntry(IDeviceID deviceID) {
     this.deviceID = deviceID;
     measurementMap = new ConcurrentHashMap<>();
-    lastTimeMapOfEachPartition = new HashMap<>();
     flushTimeMapOfEachPartition = new HashMap<>();
   }
 
@@ -94,17 +89,9 @@ public class DeviceEntry {
   }
 
   // region support flush time
-  public void putLastTimeMap(long timePartition, long lastTime) {
-    lastTimeMapOfEachPartition.put(timePartition, lastTime);
-  }
-
-  public void putFlushTimeMap(long timePartition, long flushTime) {
-    flushTimeMapOfEachPartition.put(timePartition, flushTime);
-  }
 
-  public long updateLastTimeMap(long timePartition, long lastTime) {
-    return lastTimeMapOfEachPartition.compute(
-        timePartition, (k, v) -> v == null ? lastTime : Math.max(v, lastTime));
+  public Long putFlushTimeMap(long timePartition, long flushTime) {
+    return flushTimeMapOfEachPartition.put(timePartition, flushTime);
   }
 
   public long updateFlushTimeMap(long timePartition, long flushTime) {
@@ -120,18 +107,10 @@ public class DeviceEntry {
     this.globalFlushTime = globalFlushTime;
   }
 
-  public Long getLastTime(long timePartition) {
-    return lastTimeMapOfEachPartition.get(timePartition);
-  }
-
   public Long getFlushTime(long timePartition) {
     return flushTimeMapOfEachPartition.get(timePartition);
   }
 
-  public Long getLastTimeWithDefaultValue(long timePartition) {
-    return lastTimeMapOfEachPartition.getOrDefault(timePartition, Long.MIN_VALUE);
-  }
-
   public Long getFLushTimeWithDefaultValue(long timePartition) {
     return flushTimeMapOfEachPartition.getOrDefault(timePartition, Long.MIN_VALUE);
   }
@@ -140,8 +119,8 @@ public class DeviceEntry {
     return globalFlushTime;
   }
 
-  public void clearLastTime() {
-    lastTimeMapOfEachPartition.clear();
+  public void removePartition(long partitionId) {
+    flushTimeMapOfEachPartition.remove(partitionId);
   }
 
   public void clearFlushTime() {
@@ -166,18 +145,12 @@ public class DeviceEntry {
         && globalFlushTime == that.globalFlushTime
         && deviceID.equals(that.deviceID)
         && measurementMap.equals(that.measurementMap)
-        && lastTimeMapOfEachPartition.equals(that.lastTimeMapOfEachPartition)
         && flushTimeMapOfEachPartition.equals(that.flushTimeMapOfEachPartition);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(
-        deviceID,
-        measurementMap,
-        isAligned,
-        lastTimeMapOfEachPartition,
-        flushTimeMapOfEachPartition,
-        globalFlushTime);
+        deviceID, measurementMap, isAligned, flushTimeMapOfEachPartition, globalFlushTime);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 59b6defdd0..32d7648426 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -46,7 +46,7 @@ public class SystemInfo {
   private long totalStorageGroupMemCost = 0L;
   private volatile boolean rejected = false;
 
-  private long memorySizeForWrite;
+  private long memorySizeForMemtable;
   private long memorySizeForCompaction;
   private Map<DataRegionInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>();
 
@@ -55,8 +55,8 @@ public class SystemInfo {
 
   private ExecutorService flushTaskSubmitThreadPool =
       IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.FLUSH_TASK_SUBMIT.getName());
-  private double FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion();
-  private double REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion();
+  private double FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
+  private double REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
 
   private volatile boolean isEncodingFasterThanIo = true;
 
@@ -106,14 +106,14 @@ public class SystemInfo {
           REJECT_THERSHOLD);
       rejected = true;
       if (chooseMemTablesToMarkFlush(tsFileProcessor)) {
-        if (totalStorageGroupMemCost < memorySizeForWrite) {
+        if (totalStorageGroupMemCost < memorySizeForMemtable) {
           return true;
         } else {
           throw new WriteProcessRejectException(
               "Total Storage Group MemCost "
                   + totalStorageGroupMemCost
                   + " is over than memorySizeForWriting "
-                  + memorySizeForWrite);
+                  + memorySizeForMemtable);
         }
       } else {
         return false;
@@ -198,12 +198,13 @@ public class SystemInfo {
   }
 
   public void allocateWriteMemory() {
-    memorySizeForWrite =
-        (long) (config.getAllocateMemoryForStorageEngine() * config.getWriteProportion());
+    memorySizeForMemtable =
+        (long)
+            (config.getAllocateMemoryForStorageEngine() * config.getWriteProportionForMemtable());
     memorySizeForCompaction =
         (long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion());
-    FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion();
-    REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion();
+    FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
+    REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
   }
 
   @TestOnly
@@ -284,15 +285,15 @@ public class SystemInfo {
   }
 
   public synchronized void applyTemporaryMemoryForFlushing(long estimatedTemporaryMemSize) {
-    memorySizeForWrite -= estimatedTemporaryMemSize;
-    FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion();
-    REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion();
+    memorySizeForMemtable -= estimatedTemporaryMemSize;
+    FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
+    REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
   }
 
   public synchronized void releaseTemporaryMemoryForFlushing(long estimatedTemporaryMemSize) {
-    memorySizeForWrite += estimatedTemporaryMemSize;
-    FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion();
-    REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion();
+    memorySizeForMemtable += estimatedTemporaryMemSize;
+    FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
+    REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
   }
 
   public long getTotalMemTableSize() {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index b6ff3044e7..01d900a8a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -170,7 +170,7 @@ public abstract class AlignedTVList extends TVList {
     checkExpansion();
     int arrayIndex = rowCount / ARRAY_SIZE;
     int elementIndex = rowCount % ARRAY_SIZE;
-    minTime = Math.min(minTime, timestamp);
+    maxTime = Math.max(maxTime, timestamp);
     timestamps.get(arrayIndex)[elementIndex] = timestamp;
     for (int i = 0; i < values.size(); i++) {
       Object columnValue = columnIndexArray[i] < 0 ? null : value[columnIndexArray[i]];
@@ -704,7 +704,7 @@ public abstract class AlignedTVList extends TVList {
     checkExpansion();
     int idx = start;
 
-    updateMinTimeAndSorted(time, start, end);
+    updateMaxTimeAndSorted(time, start, end);
 
     while (idx < end) {
       int inputRemaining = end - idx;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index 6c4e500495..1e644e41a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -87,7 +87,7 @@ public abstract class BinaryTVList extends TVList {
     checkExpansion();
     int arrayIndex = rowCount / ARRAY_SIZE;
     int elementIndex = rowCount % ARRAY_SIZE;
-    minTime = Math.min(minTime, timestamp);
+    maxTime = Math.max(maxTime, timestamp);
     timestamps.get(arrayIndex)[elementIndex] = timestamp;
     values.get(arrayIndex)[elementIndex] = value;
     rowCount++;
@@ -105,12 +105,12 @@ public abstract class BinaryTVList extends TVList {
   @Override
   public int delete(long lowerBound, long upperBound) {
     int newSize = 0;
-    minTime = Long.MAX_VALUE;
+    maxTime = Long.MIN_VALUE;
     for (int i = 0; i < rowCount; i++) {
       long time = getTime(i);
       if (time < lowerBound || time > upperBound) {
         set(i, newSize++);
-        minTime = Math.min(time, minTime);
+        maxTime = Math.max(maxTime, time);
       } else {
         memoryBinaryChunkSize -= getBinarySize(getBinary(i));
       }
@@ -215,10 +215,10 @@ public abstract class BinaryTVList extends TVList {
       timeIdxOffset = start;
       // drop null at the end of value array
       int nullCnt =
-          dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+          dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
       end -= nullCnt;
     } else {
-      updateMinTimeAndSorted(time, start, end);
+      updateMaxTimeAndSorted(time, start, end);
     }
 
     // update raw size
@@ -252,7 +252,7 @@ public abstract class BinaryTVList extends TVList {
   }
 
   // move null values to the end of time array and value array, then return number of null values
-  int dropNullValThenUpdateMinTimeAndSorted(
+  int dropNullValThenUpdateMaxTimeAndSorted(
       long[] time, Binary[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
     long inPutMinTime = Long.MAX_VALUE;
     boolean inputSorted = true;
@@ -269,14 +269,15 @@ public abstract class BinaryTVList extends TVList {
         time[tIdx - nullCnt] = time[tIdx];
         values[vIdx - nullCnt] = values[vIdx];
       }
-      // update minTime and sorted
+      // update maxTime and sorted
       tIdx = tIdx - nullCnt;
       inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+      maxTime = Math.max(maxTime, time[tIdx]);
       if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
         inputSorted = false;
       }
     }
-    minTime = Math.min(inPutMinTime, minTime);
+
     sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
     return nullCnt;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index 5e6f7ea366..3af4613bc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -80,7 +80,7 @@ public abstract class BooleanTVList extends TVList {
     checkExpansion();
     int arrayIndex = rowCount / ARRAY_SIZE;
     int elementIndex = rowCount % ARRAY_SIZE;
-    minTime = Math.min(minTime, timestamp);
+    maxTime = Math.max(maxTime, timestamp);
     timestamps.get(arrayIndex)[elementIndex] = timestamp;
     values.get(arrayIndex)[elementIndex] = value;
     rowCount++;
@@ -174,10 +174,10 @@ public abstract class BooleanTVList extends TVList {
       timeIdxOffset = start;
       // drop null at the end of value array
       int nullCnt =
-          dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+          dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
       end -= nullCnt;
     } else {
-      updateMinTimeAndSorted(time, start, end);
+      updateMaxTimeAndSorted(time, start, end);
     }
 
     while (idx < end) {
@@ -206,7 +206,7 @@ public abstract class BooleanTVList extends TVList {
   }
 
   // move null values to the end of time array and value array, then return number of null values
-  int dropNullValThenUpdateMinTimeAndSorted(
+  int dropNullValThenUpdateMaxTimeAndSorted(
       long[] time, boolean[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
     long inPutMinTime = Long.MAX_VALUE;
     boolean inputSorted = true;
@@ -223,14 +223,15 @@ public abstract class BooleanTVList extends TVList {
         time[tIdx - nullCnt] = time[tIdx];
         values[vIdx - nullCnt] = values[vIdx];
       }
-      // update minTime and sorted
+      // update maxTime and sorted
       tIdx = tIdx - nullCnt;
       inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+      maxTime = Math.max(maxTime, time[tIdx]);
       if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
         inputSorted = false;
       }
     }
-    minTime = Math.min(inPutMinTime, minTime);
+
     sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
     return nullCnt;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index 41ae89264c..98800c457e 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -80,7 +80,7 @@ public abstract class DoubleTVList extends TVList {
     checkExpansion();
     int arrayIndex = rowCount / ARRAY_SIZE;
     int elementIndex = rowCount % ARRAY_SIZE;
-    minTime = Math.min(minTime, timestamp);
+    maxTime = Math.max(maxTime, timestamp);
     timestamps.get(arrayIndex)[elementIndex] = timestamp;
     values.get(arrayIndex)[elementIndex] = value;
     rowCount++;
@@ -179,10 +179,10 @@ public abstract class DoubleTVList extends TVList {
       timeIdxOffset = start;
       // drop null at the end of value array
       int nullCnt =
-          dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+          dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
       end -= nullCnt;
     } else {
-      updateMinTimeAndSorted(time, start, end);
+      updateMaxTimeAndSorted(time, start, end);
     }
 
     while (idx < end) {
@@ -211,7 +211,7 @@ public abstract class DoubleTVList extends TVList {
   }
 
   // move null values to the end of time array and value array, then return number of null values
-  int dropNullValThenUpdateMinTimeAndSorted(
+  int dropNullValThenUpdateMaxTimeAndSorted(
       long[] time, double[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
     long inPutMinTime = Long.MAX_VALUE;
     boolean inputSorted = true;
@@ -228,14 +228,15 @@ public abstract class DoubleTVList extends TVList {
         time[tIdx - nullCnt] = time[tIdx];
         values[vIdx - nullCnt] = values[vIdx];
       }
-      // update minTime and sorted
+      // update maxTime and sorted
       tIdx = tIdx - nullCnt;
       inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+      maxTime = Math.max(maxTime, time[tIdx]);
       if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
         inputSorted = false;
       }
     }
-    minTime = Math.min(inPutMinTime, minTime);
+
     sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
     return nullCnt;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index 05e2b4289e..da39321830 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -80,7 +80,7 @@ public abstract class FloatTVList extends TVList {
     checkExpansion();
     int arrayIndex = rowCount / ARRAY_SIZE;
     int elementIndex = rowCount % ARRAY_SIZE;
-    minTime = Math.min(minTime, timestamp);
+    maxTime = Math.max(maxTime, timestamp);
     timestamps.get(arrayIndex)[elementIndex] = timestamp;
     values.get(arrayIndex)[elementIndex] = value;
     rowCount++;
@@ -179,10 +179,10 @@ public abstract class FloatTVList extends TVList {
       timeIdxOffset = start;
       // drop null at the end of value array
       int nullCnt =
-          dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+          dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
       end -= nullCnt;
     } else {
-      updateMinTimeAndSorted(time, start, end);
+      updateMaxTimeAndSorted(time, start, end);
     }
 
     while (idx < end) {
@@ -211,7 +211,7 @@ public abstract class FloatTVList extends TVList {
   }
 
   // move null values to the end of time array and value array, then return number of null values
-  int dropNullValThenUpdateMinTimeAndSorted(
+  int dropNullValThenUpdateMaxTimeAndSorted(
       long[] time, float[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
     long inPutMinTime = Long.MAX_VALUE;
     boolean inputSorted = true;
@@ -228,14 +228,15 @@ public abstract class FloatTVList extends TVList {
         time[tIdx - nullCnt] = time[tIdx];
         values[vIdx - nullCnt] = values[vIdx];
       }
-      // update minTime and sorted
+      // update maxTime and sorted
       tIdx = tIdx - nullCnt;
       inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+      maxTime = Math.max(maxTime, time[tIdx]);
       if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
         inputSorted = false;
       }
     }
-    minTime = Math.min(inPutMinTime, minTime);
+
     sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
     return nullCnt;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index 9ea657755c..341deebbeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -79,7 +79,7 @@ public abstract class IntTVList extends TVList {
     checkExpansion();
     int arrayIndex = rowCount / ARRAY_SIZE;
     int elementIndex = rowCount % ARRAY_SIZE;
-    minTime = Math.min(minTime, timestamp);
+    maxTime = Math.max(maxTime, timestamp);
     timestamps.get(arrayIndex)[elementIndex] = timestamp;
     values.get(arrayIndex)[elementIndex] = value;
     rowCount++;
@@ -172,10 +172,10 @@ public abstract class IntTVList extends TVList {
       timeIdxOffset = start;
       // drop null at the end of value array
       int nullCnt =
-          dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+          dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
       end -= nullCnt;
     } else {
-      updateMinTimeAndSorted(time, start, end);
+      updateMaxTimeAndSorted(time, start, end);
     }
 
     while (idx < end) {
@@ -204,7 +204,7 @@ public abstract class IntTVList extends TVList {
   }
 
   // move null values to the end of time array and value array, then return number of null values
-  int dropNullValThenUpdateMinTimeAndSorted(
+  int dropNullValThenUpdateMaxTimeAndSorted(
       long[] time, int[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
     long inPutMinTime = Long.MAX_VALUE;
     boolean inputSorted = true;
@@ -221,14 +221,15 @@ public abstract class IntTVList extends TVList {
         time[tIdx - nullCnt] = time[tIdx];
         values[vIdx - nullCnt] = values[vIdx];
       }
-      // update minTime and sorted
+      // update maxTime and sorted
       tIdx = tIdx - nullCnt;
       inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+      maxTime = Math.max(maxTime, time[tIdx]);
       if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
         inputSorted = false;
       }
     }
-    minTime = Math.min(inPutMinTime, minTime);
+
     sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
     return nullCnt;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index 41ce036ab7..7004543ac9 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -79,7 +79,7 @@ public abstract class LongTVList extends TVList {
     checkExpansion();
     int arrayIndex = rowCount / ARRAY_SIZE;
     int elementIndex = rowCount % ARRAY_SIZE;
-    minTime = Math.min(minTime, timestamp);
+    maxTime = Math.max(maxTime, timestamp);
     timestamps.get(arrayIndex)[elementIndex] = timestamp;
     values.get(arrayIndex)[elementIndex] = value;
     rowCount++;
@@ -172,10 +172,10 @@ public abstract class LongTVList extends TVList {
       timeIdxOffset = start;
       // drop null at the end of value array
       int nullCnt =
-          dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+          dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
       end -= nullCnt;
     } else {
-      updateMinTimeAndSorted(time, start, end);
+      updateMaxTimeAndSorted(time, start, end);
     }
 
     while (idx < end) {
@@ -204,7 +204,7 @@ public abstract class LongTVList extends TVList {
   }
 
   // move null values to the end of time array and value array, then return number of null values
-  int dropNullValThenUpdateMinTimeAndSorted(
+  int dropNullValThenUpdateMaxTimeAndSorted(
       long[] time, long[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
     long inPutMinTime = Long.MAX_VALUE;
     boolean inputSorted = true;
@@ -221,14 +221,15 @@ public abstract class LongTVList extends TVList {
         time[tIdx - nullCnt] = time[tIdx];
         values[vIdx - nullCnt] = values[vIdx];
       }
-      // update minTime and sorted
+      // update maxTime and sorted
       tIdx = tIdx - nullCnt;
       inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+      maxTime = Math.max(maxTime, time[tIdx]);
       if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
         inputSorted = false;
       }
     }
-    minTime = Math.min(inPutMinTime, minTime);
+
     sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
     return nullCnt;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index cb0f75c573..c6105744fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -57,7 +57,7 @@ public abstract class TVList implements WALEntryValue {
   protected int rowCount;
 
   protected boolean sorted = true;
-  protected long minTime;
+  protected long maxTime;
   // record reference count of this tv list
   // currently this reference will only be increase because we can't know when to decrease it
   protected AtomicInteger referenceCount;
@@ -66,7 +66,7 @@ public abstract class TVList implements WALEntryValue {
   public TVList() {
     timestamps = new ArrayList<>();
     rowCount = 0;
-    minTime = Long.MAX_VALUE;
+    maxTime = Long.MIN_VALUE;
     referenceCount = new AtomicInteger();
   }
 
@@ -228,8 +228,8 @@ public abstract class TVList implements WALEntryValue {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
-  public long getMinTime() {
-    return minTime;
+  public long getMaxTime() {
+    return maxTime;
   }
 
   public long getVersion() {
@@ -256,12 +256,12 @@ public abstract class TVList implements WALEntryValue {
 
   public int delete(long lowerBound, long upperBound) {
     int newSize = 0;
-    minTime = Long.MAX_VALUE;
+    maxTime = Long.MIN_VALUE;
     for (int i = 0; i < rowCount; i++) {
       long time = getTime(i);
       if (time < lowerBound || time > upperBound) {
         set(i, newSize++);
-        minTime = Math.min(time, minTime);
+        maxTime = Math.max(time, maxTime);
       }
     }
     int deletedNumber = rowCount - newSize;
@@ -285,13 +285,13 @@ public abstract class TVList implements WALEntryValue {
     }
     cloneList.rowCount = rowCount;
     cloneList.sorted = sorted;
-    cloneList.minTime = minTime;
+    cloneList.maxTime = maxTime;
   }
 
   public void clear() {
     rowCount = 0;
     sorted = true;
-    minTime = Long.MAX_VALUE;
+    maxTime = Long.MIN_VALUE;
     clearTime();
     clearValue();
   }
@@ -324,17 +324,17 @@ public abstract class TVList implements WALEntryValue {
     return cloneArray;
   }
 
-  void updateMinTimeAndSorted(long[] time, int start, int end) {
+  void updateMaxTimeAndSorted(long[] time, int start, int end) {
     int length = time.length;
     long inPutMinTime = Long.MAX_VALUE;
     boolean inputSorted = true;
     for (int i = start; i < end; i++) {
       inPutMinTime = Math.min(inPutMinTime, time[i]);
+      maxTime = Math.max(maxTime, time[i]);
       if (inputSorted && i < length - 1 && time[i] > time[i + 1]) {
         inputSorted = false;
       }
     }
-    minTime = Math.min(inPutMinTime, minTime);
     sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
index c19fad496c..8a5888601e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -1111,7 +1111,7 @@ public class DataRegionTest {
     // delete data which is in work memtable
     dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0, null);
     dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0, null);
-    dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 300, 0, null);
+    dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 190, 0, null);
 
     dataRegion.syncCloseAllWorkingTsFileProcessors();
     Assert.assertFalse(tsFileResource.getModFile().exists());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMapTest.java
new file mode 100644
index 0000000000..dd3fa63b04
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMapTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+public class IDTableLastFlushTimeMapTest extends LastFlushTimeMapTest {
+
+  //  private boolean isEnableIDTable = false;
+  //
+  //  private String originalDeviceIDTransformationMethod = null;
+  //
+  //  private boolean isEnableIDTableLogFile = false;
+  //
+  //  public IDTableLastFlushTimeMapTest() throws QueryProcessException {}
+  //
+  //  @Before
+  //  public void before() {
+  //    IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+  //    isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
+  //    originalDeviceIDTransformationMethod =
+  //        IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
+  //    isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
+  //
+  //    IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
+  //    IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
+  //    IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
+  //    super.before();
+  //  }
+  //
+  //  @After
+  //  public void clean() throws IOException, StorageEngineException {
+  //    IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable);
+  //    IoTDBDescriptor.getInstance()
+  //        .getConfig()
+  //        .setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
+  //    IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
+  //    super.clean();
+  //  }
+  //
+  //  @Override
+  //  @Test
+  //  public void testMemoryCalculation()
+  //      throws QueryProcessException, IllegalPathException, StorageEngineException,
+  //          StorageGroupNotSetException {
+  //    insertRecord("root.sg.d1", 100L);
+  //    CreateMultiTimeSeriesPlan plan = new CreateMultiTimeSeriesPlan();
+  //    plan.setPaths(
+  //        Arrays.asList(
+  //            new PartialPath("root.sg.d100.s1"),
+  //            new PartialPath("root.sg.d101.s1"),
+  //            new PartialPath("root.sg.d102.s1"),
+  //            new PartialPath("root.sg.d103.s1"),
+  //            new PartialPath("root.sg.d104.s1")));
+  //    plan.setDataTypes(
+  //        Arrays.asList(
+  //            TSDataType.INT64,
+  //            TSDataType.INT64,
+  //            TSDataType.INT64,
+  //            TSDataType.INT64,
+  //            TSDataType.INT64));
+  //    plan.setEncodings(
+  //        Arrays.asList(
+  //            TSEncoding.GORILLA,
+  //            TSEncoding.GORILLA,
+  //            TSEncoding.GORILLA,
+  //            TSEncoding.GORILLA,
+  //            TSEncoding.GORILLA));
+  //    plan.setCompressors(
+  //        Arrays.asList(
+  //            CompressionType.SNAPPY,
+  //            CompressionType.SNAPPY,
+  //            CompressionType.SNAPPY,
+  //            CompressionType.SNAPPY,
+  //            CompressionType.SNAPPY));
+  //    executor.processNonQuery(plan);
+  //
+  //    DataRegion storageGroupProcessor =
+  //        StorageEngine.getInstance().getProcessor(new PartialPath("root.sg"));
+  //    assertEquals(62l, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+  //
+  //    storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.sg.d100");
+  //    storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.sg.d101");
+  //    assertEquals(186l, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+  //
+  //    storageGroupProcessor.getLastFlushTimeMap().setOneDeviceFlushedTime(0L, "root.sg.d102", 0L);
+  //    HashMap<String, Long> updateMap = new HashMap<>();
+  //    updateMap.put("root.sg.d103", 1L);
+  //    updateMap.put("root.sg.d100", 1L);
+  //    storageGroupProcessor.getLastFlushTimeMap().setMultiDeviceFlushedTime(0L, updateMap);
+  //    storageGroupProcessor.getLastFlushTimeMap().updateFlushedTime(0L, "root.sg.d103", 2L);
+  //    storageGroupProcessor.getLastFlushTimeMap().updateFlushedTime(0L, "root.sg.d104", 2L);
+  //    assertEquals(372L, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+  //  }
+  //
+  //  @Test
+  //  public void testRecoverFlushTime()
+  //      throws QueryProcessException, IllegalPathException, StorageEngineException,
+  //          StorageGroupNotSetException {
+  //    insertData(100);
+  //    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+  //    executor.processNonQuery(flushPlan);
+  //    DataRegion storageGroupProcessor =
+  //        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp"));
+  //    String deviceId = DeviceIDFactory.getInstance().getDeviceID("root.isp.d1").toStringID();
+  //    assertEquals(103L, storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0l,
+  // deviceId));
+  //
+  //    storageGroupProcessor.getLastFlushTimeMap().removePartition(0l);
+  //    storageGroupProcessor.getLastFlushTimeMap().checkAndCreateFlushedTimePartition(0l);
+  //    assertEquals(103L, storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0l,
+  // deviceId));
+  //  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeMapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeMapTest.java
new file mode 100644
index 0000000000..2d945c6722
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeMapTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+public class LastFlushTimeMapTest {
+  //  protected PlanExecutor executor = new PlanExecutor();
+  //
+  //  protected final Planner processor = new Planner();
+  //
+  //  public LastFlushTimeMapTest() throws QueryProcessException {}
+  //
+  //  @Before
+  //  public void before() {
+  //    IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+  //    EnvironmentUtils.envSetUp();
+  //  }
+  //
+  //  @Test
+  //  public void testSequenceInsert()
+  //      throws MetadataException, QueryProcessException, StorageEngineException {
+  //    insertData(0);
+  //    insertData(10);
+  //    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+  //    executor.processNonQuery(flushPlan);
+  //
+  //    insertData(20);
+  //
+  //    DataRegion storageGroupProcessor =
+  //        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+  //    assertEquals(2, storageGroupProcessor.getSequenceFileList().size());
+  //    assertEquals(0, storageGroupProcessor.getUnSequenceFileList().size());
+  //  }
+  //
+  //  @Test
+  //  public void testUnSequenceInsert()
+  //      throws MetadataException, QueryProcessException, StorageEngineException {
+  //    insertData(100);
+  //    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+  //    executor.processNonQuery(flushPlan);
+  //
+  //    insertData(20);
+  //
+  //    DataRegion storageGroupProcessor =
+  //        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+  //    assertEquals(1, storageGroupProcessor.getSequenceFileList().size());
+  //    assertEquals(1, storageGroupProcessor.getUnSequenceFileList().size());
+  //  }
+  //
+  //  @Test
+  //  public void testSequenceAndUnSequenceInsert()
+  //      throws MetadataException, QueryProcessException, StorageEngineException {
+  //    // sequence
+  //    insertData(100);
+  //    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+  //    executor.processNonQuery(flushPlan);
+  //
+  //    // sequence
+  //    insertData(120);
+  //    executor.processNonQuery(flushPlan);
+  //
+  //    // unsequence
+  //    insertData(20);
+  //    // sequence
+  //    insertData(130);
+  //    executor.processNonQuery(flushPlan);
+  //
+  //    // sequence
+  //    insertData(150);
+  //    // unsequence
+  //    insertData(90);
+  //
+  //    DataRegion storageGroupProcessor =
+  //        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+  //    assertEquals(4, storageGroupProcessor.getSequenceFileList().size());
+  //    assertEquals(2, storageGroupProcessor.getUnSequenceFileList().size());
+  //    assertEquals(1, storageGroupProcessor.getWorkSequenceTsFileProcessors().size());
+  //    assertEquals(1, storageGroupProcessor.getWorkUnsequenceTsFileProcessors().size());
+  //  }
+  //
+  //  @Test
+  //  public void testDeletePartition()
+  //      throws MetadataException, QueryProcessException, StorageEngineException {
+  //    insertData(100);
+  //    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+  //    executor.processNonQuery(flushPlan);
+  //    insertData(20);
+  //    insertData(120);
+  //
+  //    DataRegion storageGroupProcessor =
+  //        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+  //
+  //    assertEquals(
+  //        103L, storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.isp.d1"));
+  //    assertEquals(
+  //        103L, storageGroupProcessor.getLastFlushTimeMap().getGlobalFlushedTime("root.isp.d1"));
+  //
+  //    // delete time partition
+  //    Set<Long> deletedPartition = new HashSet<>();
+  //    deletedPartition.add(0L);
+  //    DeletePartitionPlan deletePartitionPlan =
+  //        new DeletePartitionPlan(new PartialPath("root.isp"), deletedPartition);
+  //    executor.processNonQuery(deletePartitionPlan);
+  //
+  //    assertEquals(
+  //        123L, storageGroupProcessor.getLastFlushTimeMap().getGlobalFlushedTime("root.isp.d1"));
+  //  }
+  //
+  //  @Test
+  //  public void testMemoryCalculation()
+  //      throws QueryProcessException, IllegalPathException, StorageEngineException,
+  //          StorageGroupNotSetException {
+  //    insertRecord("root.sg.d1", 100L);
+  //    DataRegion storageGroupProcessor =
+  //        StorageEngine.getInstance().getProcessor(new PartialPath("root.sg"));
+  //    assertEquals(98l, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+  //
+  //    storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.sg.d100");
+  //    storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.sg.d101");
+  //    assertEquals(302l, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+  //
+  //    storageGroupProcessor.getLastFlushTimeMap().setOneDeviceFlushedTime(0L, "root.sg.d102", 0L);
+  //    HashMap<String, Long> updateMap = new HashMap<>();
+  //    updateMap.put("root.sg.d103", 1L);
+  //    updateMap.put("root.sg.d100", 1L);
+  //    storageGroupProcessor.getLastFlushTimeMap().setMultiDeviceFlushedTime(0L, updateMap);
+  //    storageGroupProcessor.getLastFlushTimeMap().updateFlushedTime(0L, "root.sg.d103", 2L);
+  //    storageGroupProcessor.getLastFlushTimeMap().updateFlushedTime(0L, "root.sg.d104", 2L);
+  //    assertEquals(608L, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+  //  }
+  //
+  //  @Test
+  //  public void testRecoverFlushTime()
+  //      throws QueryProcessException, IllegalPathException, StorageEngineException,
+  //          StorageGroupNotSetException {
+  //    insertData(100);
+  //    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+  //    executor.processNonQuery(flushPlan);
+  //    DataRegion storageGroupProcessor =
+  //        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp"));
+  //    assertEquals(
+  //        103L, storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0l, "root.isp.d1"));
+  //
+  //    storageGroupProcessor.getLastFlushTimeMap().removePartition(0l);
+  //    storageGroupProcessor.getLastFlushTimeMap().checkAndCreateFlushedTimePartition(0l);
+  //    assertEquals(
+  //        103L, storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0l, "root.isp.d1"));
+  //  }
+  //
+  //  @After
+  //  public void clean() throws IOException, StorageEngineException {
+  //    EnvironmentUtils.cleanEnv();
+  //  }
+  //
+  //  protected void insertData(long initTime) throws IllegalPathException, QueryProcessException {
+  //
+  //    long[] times = new long[] {initTime, initTime + 1, initTime + 2, initTime + 3};
+  //    List<Integer> dataTypes = new ArrayList<>();
+  //    dataTypes.add(TSDataType.DOUBLE.ordinal());
+  //    dataTypes.add(TSDataType.FLOAT.ordinal());
+  //    dataTypes.add(TSDataType.INT64.ordinal());
+  //    dataTypes.add(TSDataType.INT32.ordinal());
+  //    dataTypes.add(TSDataType.BOOLEAN.ordinal());
+  //    dataTypes.add(TSDataType.TEXT.ordinal());
+  //
+  //    Object[] columns = new Object[6];
+  //    columns[0] = new double[4];
+  //    columns[1] = new float[4];
+  //    columns[2] = new long[4];
+  //    columns[3] = new int[4];
+  //    columns[4] = new boolean[4];
+  //    columns[5] = new Binary[4];
+  //
+  //    for (int r = 0; r < 4; r++) {
+  //      ((double[]) columns[0])[r] = 10.0 + r;
+  //      ((float[]) columns[1])[r] = 20 + r;
+  //      ((long[]) columns[2])[r] = 100000 + r;
+  //      ((int[]) columns[3])[r] = 1000 + r;
+  //      ((boolean[]) columns[4])[r] = false;
+  //      ((Binary[]) columns[5])[r] = new Binary("mm" + r);
+  //    }
+  //
+  //    InsertTabletPlan tabletPlan =
+  //        new InsertTabletPlan(
+  //            new PartialPath("root.isp.d1"),
+  //            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+  //            dataTypes);
+  //    tabletPlan.setTimes(times);
+  //    tabletPlan.setColumns(columns);
+  //    tabletPlan.setRowCount(times.length);
+  //
+  //    executor.insertTablet(tabletPlan);
+  //  }
+  //
+  //  protected void insertRecord(String devicePath, long time)
+  //      throws IllegalPathException, QueryProcessException {
+  //    InsertRowPlan insertRowPlan =
+  //        new InsertRowPlan(
+  //            new PartialPath(devicePath),
+  //            time,
+  //            new String[] {"s1", "s2", "s3"},
+  //            new TSDataType[] {TSDataType.INT32, TSDataType.INT32, TSDataType.INT32},
+  //            new String[] {"1", "1", "1"});
+  //
+  //    executor.insert(insertRowPlan);
+  //  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 8f96c9223d..6cc26b0a11 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -21,6 +21,7 @@
 package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
@@ -74,7 +75,9 @@ import static org.junit.Assert.assertTrue;
 public class TTLTest {
 
   private String sg1 = "root.TTL_SG1";
+  private DataRegionId dataRegionId1 = new DataRegionId(1);
   private String sg2 = "root.TTL_SG2";
+  private DataRegionId dataRegionId2 = new DataRegionId(1);
   private long ttl = 12345;
   private DataRegion dataRegion;
   private String s1 = "s1";
@@ -105,7 +108,7 @@ public class TTLTest {
     dataRegion =
         new DataRegion(
             IoTDBDescriptor.getInstance().getConfig().getSystemDir(),
-            sg1,
+            String.valueOf(dataRegionId1.getId()),
             new DirectFlushPolicy(),
             sg1);
     IoTDB.schemaProcessor.createTimeseries(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManagerTest.java
new file mode 100644
index 0000000000..1b5f1f15b2
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManagerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TimePartitionManagerTest {
+
+  private final TimePartitionManager timePartitionManager = TimePartitionManager.getInstance();
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+  private long prevTimePartitionInfoMemoryThreshold;
+
+  public TimePartitionManagerTest() throws QueryProcessException {}
+
+  @Before
+  public void setUp() throws IOException, WriteProcessException, MetadataException {
+    prevTimePartitionInfoMemoryThreshold = CONFIG.getAllocateMemoryForTimePartitionInfo();
+    timePartitionManager.setTimePartitionInfoMemoryThreshold(100L);
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    timePartitionManager.setTimePartitionInfoMemoryThreshold(prevTimePartitionInfoMemoryThreshold);
+    timePartitionManager.clear();
+  }
+
+  @Test
+  public void testRegisterPartitionInfo() {
+    TimePartitionInfo timePartitionInfo1 =
+        new TimePartitionInfo(new DataRegionId(1), 0L, true, Long.MAX_VALUE, 0, true);
+    timePartitionManager.registerTimePartitionInfo(timePartitionInfo1);
+
+    assertEquals(
+        timePartitionInfo1, timePartitionManager.getTimePartitionInfo(new DataRegionId(1), 0L));
+
+    TimePartitionInfo timePartitionInfo2 =
+        new TimePartitionInfo(new DataRegionId(1), 1L, true, Long.MAX_VALUE, 0, true);
+    timePartitionManager.registerTimePartitionInfo(timePartitionInfo2);
+
+    Assert.assertFalse(
+        timePartitionManager.getTimePartitionInfo(new DataRegionId(1), 0L).isLatestPartition);
+    Assert.assertTrue(
+        timePartitionManager.getTimePartitionInfo(new DataRegionId(1), 1L).isLatestPartition);
+  }
+
+  @Test
+  public void testUpdate() {
+    TimePartitionInfo timePartitionInfo =
+        new TimePartitionInfo(new DataRegionId(1), 0L, true, Long.MAX_VALUE, 0, true);
+    timePartitionManager.registerTimePartitionInfo(timePartitionInfo);
+
+    timePartitionManager.updateAfterFlushing(new DataRegionId(1), 0L, 100L, 100L, false);
+
+    TimePartitionInfo timePartitionInfo1 =
+        timePartitionManager.getTimePartitionInfo(new DataRegionId(1), 0L);
+
+    assertTrue(timePartitionInfo1.isLatestPartition);
+    assertEquals(timePartitionInfo1.lastSystemFlushTime, 100L);
+    assertEquals(timePartitionInfo1.memSize, 100);
+    assertFalse(timePartitionInfo1.isActive);
+
+    timePartitionManager.updateAfterOpeningTsFileProcessor(new DataRegionId(1), 0L);
+    TimePartitionInfo timePartitionInfo2 =
+        timePartitionManager.getTimePartitionInfo(new DataRegionId(1), 0L);
+    assertTrue(timePartitionInfo2.isActive);
+  }
+
+  @Test
+  public void testMemoryControl() {
+    for (int i = 0; i < 5; i++) {
+      TimePartitionInfo timePartitionInfo =
+          new TimePartitionInfo(new DataRegionId(i), 0L, true, Long.MAX_VALUE, 0, true);
+      timePartitionManager.registerTimePartitionInfo(timePartitionInfo);
+    }
+    timePartitionManager.updateAfterFlushing(new DataRegionId(0), 0L, 100L, 20L, false);
+    timePartitionManager.updateAfterFlushing(new DataRegionId(1), 0L, 101L, 20L, true);
+    timePartitionManager.updateAfterFlushing(new DataRegionId(2), 0L, 102L, 20L, false);
+    timePartitionManager.updateAfterFlushing(new DataRegionId(3), 0L, 103L, 20L, false);
+    timePartitionManager.updateAfterFlushing(new DataRegionId(4), 0L, 104L, 20L, true);
+    timePartitionManager.registerTimePartitionInfo(
+        new TimePartitionInfo(new DataRegionId(0), 1L, true, Long.MAX_VALUE, 0, true));
+
+    timePartitionManager.updateAfterFlushing(new DataRegionId(0), 1L, 105L, 20L, true);
+
+    Assert.assertNull(timePartitionManager.getTimePartitionInfo(new DataRegionId(0), 0L));
+
+    timePartitionManager.updateAfterFlushing(new DataRegionId(0), 1L, 106L, 40L, true);
+
+    Assert.assertNull(timePartitionManager.getTimePartitionInfo(new DataRegionId(2), 0L));
+
+    timePartitionManager.updateAfterFlushing(new DataRegionId(0), 1L, 107L, 60L, true);
+
+    Assert.assertNull(timePartitionManager.getTimePartitionInfo(new DataRegionId(3), 0L));
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
index 6b006f8966..ca944ce80c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
@@ -110,7 +110,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> true,
             true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -169,7 +169,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> true,
             true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -256,7 +256,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> true,
             true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -299,7 +299,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> true,
             true);
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -335,7 +335,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> true,
             true);
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -371,7 +371,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> true,
             true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
deleted file mode 100644
index 4eab19cb67..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.metadata.idtable;
-
-// public class IDTableFlushTimeTest {
-//  private PlanExecutor executor = new PlanExecutor();
-//
-//  private final Planner processor = new Planner();
-//
-//  private boolean isEnableIDTable = false;
-//
-//  private String originalDeviceIDTransformationMethod = null;
-//
-//  private boolean isEnableIDTableLogFile = false;
-//
-//  public IDTableFlushTimeTest() throws QueryProcessException {}
-//
-//  @Before
-//  public void before() {
-//    IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
-//    isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
-//    originalDeviceIDTransformationMethod =
-//        IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
-//    isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
-//
-//    IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
-//    IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
-//    IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
-//    EnvironmentUtils.envSetUp();
-//  }
-//
-//  @After
-//  public void clean() throws IOException, StorageEngineException {
-//    IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable);
-//    IoTDBDescriptor.getInstance()
-//        .getConfig()
-//        .setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
-//    IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
-//    EnvironmentUtils.cleanEnv();
-//  }
-//
-//  @Test
-//  public void testSequenceInsert()
-//      throws MetadataException, QueryProcessException, StorageEngineException {
-//    insertData(0);
-//    insertData(10);
-//    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
-//    executor.processNonQuery(flushPlan);
-//
-//    insertData(20);
-//
-//    DataRegion storageGroupProcessor =
-//        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
-//    assertEquals(2, storageGroupProcessor.getSequenceFileList().size());
-//    assertEquals(0, storageGroupProcessor.getUnSequenceFileList().size());
-//  }
-//
-//  @Test
-//  public void testUnSequenceInsert()
-//      throws MetadataException, QueryProcessException, StorageEngineException {
-//    insertData(100);
-//    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
-//    executor.processNonQuery(flushPlan);
-//
-//    insertData(20);
-//
-//    DataRegion storageGroupProcessor =
-//        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
-//    assertEquals(1, storageGroupProcessor.getSequenceFileList().size());
-//    assertEquals(1, storageGroupProcessor.getUnSequenceFileList().size());
-//  }
-//
-//  @Test
-//  public void testSequenceAndUnSequenceInsert()
-//      throws MetadataException, QueryProcessException, StorageEngineException {
-//    // sequence
-//    insertData(100);
-//    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
-//    executor.processNonQuery(flushPlan);
-//
-//    // sequence
-//    insertData(120);
-//    executor.processNonQuery(flushPlan);
-//
-//    // unsequence
-//    insertData(20);
-//    // sequence
-//    insertData(130);
-//    executor.processNonQuery(flushPlan);
-//
-//    // sequence
-//    insertData(150);
-//    // unsequence
-//    insertData(90);
-//
-//    DataRegion storageGroupProcessor =
-//        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
-//    assertEquals(4, storageGroupProcessor.getSequenceFileList().size());
-//    assertEquals(2, storageGroupProcessor.getUnSequenceFileList().size());
-//    assertEquals(1, storageGroupProcessor.getWorkSequenceTsFileProcessors().size());
-//    assertEquals(1, storageGroupProcessor.getWorkUnsequenceTsFileProcessors().size());
-//  }
-//
-//  @Test
-//  public void testDeletePartition()
-//      throws MetadataException, QueryProcessException, StorageEngineException {
-//    insertData(100);
-//    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
-//    executor.processNonQuery(flushPlan);
-//    insertData(20);
-//    insertData(120);
-//
-//    DataRegion storageGroupProcessor =
-//        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
-//
-//    assertEquals(
-//        103L, storageGroupProcessor.getLastFlushTimeManager().getFlushedTime(0L, "root.isp.d1"));
-//    assertEquals(
-//        123L, storageGroupProcessor.getLastFlushTimeManager().getLastTime(0L, "root.isp.d1"));
-//    assertEquals(
-//        103L,
-// storageGroupProcessor.getLastFlushTimeManager().getGlobalFlushedTime("root.isp.d1"));
-//
-//    // delete time partition
-//    Set<Long> deletedPartition = new HashSet<>();
-//    deletedPartition.add(0L);
-//    DeletePartitionPlan deletePartitionPlan =
-//        new DeletePartitionPlan(new PartialPath("root.isp"), deletedPartition);
-//    executor.processNonQuery(deletePartitionPlan);
-//
-//    assertEquals(
-//        Long.MIN_VALUE,
-//        storageGroupProcessor.getLastFlushTimeManager().getFlushedTime(0L, "root.isp.d1"));
-//    assertEquals(
-//        Long.MIN_VALUE,
-//        storageGroupProcessor.getLastFlushTimeManager().getLastTime(0L, "root.isp.d1"));
-//    assertEquals(
-//        123L,
-// storageGroupProcessor.getLastFlushTimeManager().getGlobalFlushedTime("root.isp.d1"));
-//  }
-//
-//  private void insertData(long initTime) throws IllegalPathException, QueryProcessException {
-//
-//    long[] times = new long[] {initTime, initTime + 1, initTime + 2, initTime + 3};
-//    List<Integer> dataTypes = new ArrayList<>();
-//    dataTypes.add(TSDataType.DOUBLE.ordinal());
-//    dataTypes.add(TSDataType.FLOAT.ordinal());
-//    dataTypes.add(TSDataType.INT64.ordinal());
-//    dataTypes.add(TSDataType.INT32.ordinal());
-//    dataTypes.add(TSDataType.BOOLEAN.ordinal());
-//    dataTypes.add(TSDataType.TEXT.ordinal());
-//
-//    Object[] columns = new Object[6];
-//    columns[0] = new double[4];
-//    columns[1] = new float[4];
-//    columns[2] = new long[4];
-//    columns[3] = new int[4];
-//    columns[4] = new boolean[4];
-//    columns[5] = new Binary[4];
-//
-//    for (int r = 0; r < 4; r++) {
-//      ((double[]) columns[0])[r] = 10.0 + r;
-//      ((float[]) columns[1])[r] = 20 + r;
-//      ((long[]) columns[2])[r] = 100000 + r;
-//      ((int[]) columns[3])[r] = 1000 + r;
-//      ((boolean[]) columns[4])[r] = false;
-//      ((Binary[]) columns[5])[r] = new Binary("mm" + r);
-//    }
-//
-//    InsertTabletPlan tabletPlan =
-//        new InsertTabletPlan(
-//            new PartialPath("root.isp.d1"),
-//            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
-//            dataTypes);
-//    tabletPlan.setTimes(times);
-//    tabletPlan.setColumns(columns);
-//    tabletPlan.setRowCount(times.length);
-//
-//    PlanExecutor executor = new PlanExecutor();
-//    executor.insertTablet(tabletPlan);
-//  }
-// }