You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2022/01/05 02:17:27 UTC

[iotdb] 01/02: init

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch xkf_id_table_flush_time
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3355a9ecd2dc68871bd2e76f71daee9c10b7a314
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Jan 5 10:14:52 2022 +0800

    init
---
 .../storagegroup/IDTableFlushTimeManager.java      | 212 +++++++++++++++++++
 .../storagegroup/VirtualStorageGroupProcessor.java |  78 +++----
 .../apache/iotdb/db/metadata/idtable/IDTable.java  |  16 ++
 .../db/metadata/idtable/IDTableHashmapImpl.java    |  39 +++-
 .../db/metadata/idtable/entry/DeviceEntry.java     |  65 ++++++
 .../db/metadata/idtable/entry/DeviceIDFactory.java |  16 +-
 .../db/metadata/idtable/IDTableFlushTimeTest.java  | 224 +++++++++++++++++++++
 7 files changed, 605 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java
new file mode 100644
index 0000000..b7ff81b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.db.metadata.idtable.IDTable;
+import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * This class manages last time and flush time for sequence and unsequence determination This class
+ * This class is NOT thread safe, caller should ensure synchronization This class not support
+ * upgrade
+ */
+public class IDTableFlushTimeManager implements ILastFlushTimeManager {
+  private static final Logger logger = LoggerFactory.getLogger(LastFlushTimeManager.class);
+
+  IDTable idTable;
+
+  public IDTableFlushTimeManager(IDTable idTable) {
+    this.idTable = idTable;
+  }
+
+  // region set
+  @Override
+  public void setLastTimeAll(long timePartitionId, Map<String, Long> lastTimeMap) {
+    for (Map.Entry<String, Long> entry : lastTimeMap.entrySet()) {
+      idTable.getDeviceEntry(entry.getKey()).putLastTimeMap(timePartitionId, entry.getValue());
+    }
+  }
+
+  @Override
+  public void setLastTime(long timePartitionId, String path, long time) {
+    idTable.getDeviceEntry(path).putLastTimeMap(timePartitionId, time);
+  }
+
+  @Override
+  public void setFlushedTimeAll(long timePartitionId, Map<String, Long> flushedTimeMap) {
+    for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
+      idTable.getDeviceEntry(entry.getKey()).putFlushTimeMap(timePartitionId, entry.getValue());
+    }
+  }
+
+  @Override
+  public void setFlushedTime(long timePartitionId, String path, long time) {
+    idTable.getDeviceEntry(path).putFlushTimeMap(timePartitionId, time);
+  }
+
+  @Override
+  public void setGlobalFlushedTimeAll(Map<String, Long> globalFlushedTimeMap) {
+    for (Map.Entry<String, Long> entry : globalFlushedTimeMap.entrySet()) {
+      idTable.getDeviceEntry(entry.getKey()).setGlobalFlushTime(entry.getValue());
+    }
+  }
+
+  @Override
+  public void setGlobalFlushedTime(String path, long time) {
+    idTable.getDeviceEntry(path).setGlobalFlushTime(time);
+  }
+
+  // endregion
+
+  // region update
+
+  @Override
+  public void updateLastTime(long timePartitionId, String path, long time) {
+    idTable.getDeviceEntry(path).updateLastTimeMap(timePartitionId, time);
+  }
+
+  @Override
+  public void updateFlushedTime(long timePartitionId, String path, long time) {
+    idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, time);
+  }
+
+  @Override
+  public void updateGlobalFlushedTime(String path, long time) {
+    idTable.getDeviceEntry(path).updateGlobalFlushTime(time);
+  }
+
+  @Override
+  public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+      long partitionId, String deviceId, long time) {
+    throw new UnsupportedOperationException("IDTableFlushTimeManager doesn't support upgrade");
+  }
+
+  // endregion
+
+  // region ensure
+
+  @Override
+  public void ensureLastTimePartition(long timePartitionId) {
+    // do nothing is correct
+  }
+
+  @Override
+  public void ensureFlushedTimePartition(long timePartitionId) {
+    // do nothing is correct
+  }
+
+  @Override
+  public long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime) {
+    return idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, initTime);
+  }
+
+  // endregion
+
+  // region upgrade support methods
+
+  @Override
+  public void applyNewlyFlushedTimeToFlushedTime() {
+    throw new UnsupportedOperationException("IDTableFlushTimeManager doesn't support upgrade");
+  }
+
+  /**
+   * update latest flush time for partition id
+   *
+   * @param partitionId partition id
+   * @param latestFlushTime lastest flush time
+   * @return true if update latest flush time success
+   */
+  @Override
+  public boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
+    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+      deviceEntry.putLastTimeMap(partitionId, latestFlushTime);
+      deviceEntry.putFlushTimeMap(partitionId, latestFlushTime);
+      deviceEntry.updateGlobalFlushTime(latestFlushTime);
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean updateLatestFlushTime(long partitionId) {
+    boolean updated = false;
+
+    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+      Long lastTime = deviceEntry.getLastTime(partitionId);
+      if (lastTime == null) {
+        continue;
+      }
+
+      updated = true;
+      deviceEntry.putFlushTimeMap(partitionId, lastTime);
+      deviceEntry.updateGlobalFlushTime(lastTime);
+    }
+
+    return updated;
+  }
+
+  // endregion
+
+  // region query
+  @Override
+  public long getFlushedTime(long timePartitionId, String path) {
+    return idTable.getDeviceEntry(path).getFLushTimeWithDefaultValue(timePartitionId);
+  }
+
+  @Override
+  public long getLastTime(long timePartitionId, String path) {
+    return idTable.getDeviceEntry(path).getLastTimeWithDefaultValue(timePartitionId);
+  }
+
+  @Override
+  public long getGlobalFlushedTime(String path) {
+    return idTable.getDeviceEntry(path).getGlobalFlushTime();
+  }
+
+  // endregion
+
+  // region clear
+  @Override
+  public void clearLastTime() {
+    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+      deviceEntry.clearLastTime();
+    }
+  }
+
+  @Override
+  public void clearFlushedTime() {
+    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+      deviceEntry.clearFlushTime();
+    }
+  }
+
+  @Override
+  public void clearGlobalFlushedTime() {
+    for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+      deviceEntry.setGlobalFlushTime(Long.MIN_VALUE);
+    }
+  }
+  // endregion
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index 12cd817..db351f1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -18,6 +18,39 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -89,44 +122,9 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
  * TsFileProcessor in the working status. <br>
@@ -262,7 +260,7 @@ public class VirtualStorageGroupProcessor {
   // DEFAULT_POOL_TRIM_INTERVAL_MILLIS
   private long timeWhenPoolNotEmpty = Long.MAX_VALUE;
 
-  private LastFlushTimeManager lastFlushTimeManager = new LastFlushTimeManager();
+  private ILastFlushTimeManager lastFlushTimeManager = new LastFlushTimeManager();
 
   /**
    * record the insertWriteLock in SG is being hold by which method, it will be empty string if on
@@ -396,9 +394,12 @@ public class VirtualStorageGroupProcessor {
     if (config.isEnableIDTable()) {
       try {
         idTable = IDTableManager.getInstance().getIDTable(new PartialPath(logicalStorageGroupName));
+        lastFlushTimeManager = new IDTableFlushTimeManager(idTable);
       } catch (IllegalPathException e) {
         logger.error("failed to create id table");
       }
+    } else {
+      lastFlushTimeManager = new LastFlushTimeManager();
     }
     recover();
     if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
@@ -3233,4 +3234,9 @@ public class VirtualStorageGroupProcessor {
   public IDTable getIdTable() {
     return idTable;
   }
+
+  @TestOnly
+  public ILastFlushTimeManager getLastFlushTimeManager() {
+    return lastFlushTimeManager;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
index 07f4df7..c04e063 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 public interface IDTable {
@@ -139,6 +140,21 @@ public interface IDTable {
   void clear() throws IOException;
 
   /**
+   * get device entry from device path
+   *
+   * @param deviceName device name of the time series
+   * @return device entry of the timeseries
+   */
+  public DeviceEntry getDeviceEntry(String deviceName);
+
+  /**
+   * get all device entries
+   *
+   * @return all device entries
+   */
+  public List<DeviceEntry> getAllDeviceEntry();
+
+  /**
    * translate query path's device path to device id
    *
    * @param fullPath full query path
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
index a4faf6a..fd3a99f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
@@ -48,7 +48,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** id table belongs to a storage group and mapping timeseries path to it's schema */
@@ -88,7 +90,7 @@ public class IDTableHashmapImpl implements IDTable {
    */
   public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan)
       throws MetadataException {
-    DeviceEntry deviceEntry = getDeviceEntry(plan.getPrefixPath(), true);
+    DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPrefixPath(), true);
 
     for (int i = 0; i < plan.getMeasurements().size(); i++) {
       PartialPath fullPath =
@@ -112,7 +114,7 @@ public class IDTableHashmapImpl implements IDTable {
    * @throws MetadataException if the device is aligned, throw it
    */
   public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
-    DeviceEntry deviceEntry = getDeviceEntry(plan.getPath().getDevicePath(), false);
+    DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPath().getDevicePath(), false);
     SchemaEntry schemaEntry =
         new SchemaEntry(
             plan.getDataType(),
@@ -137,7 +139,7 @@ public class IDTableHashmapImpl implements IDTable {
     IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
 
     // 1. get device entry and check align
-    DeviceEntry deviceEntry = getDeviceEntry(devicePath, plan.isAligned());
+    DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(devicePath, plan.isAligned());
 
     // 2. get schema of each measurement
     for (int i = 0; i < measurementList.length; i++) {
@@ -222,7 +224,7 @@ public class IDTableHashmapImpl implements IDTable {
   public synchronized void registerTrigger(PartialPath fullPath, IMeasurementMNode measurementMNode)
       throws MetadataException {
     boolean isAligned = measurementMNode.getParent().isAligned();
-    DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned);
+    DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned);
 
     deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUsingTrigger();
   }
@@ -237,7 +239,7 @@ public class IDTableHashmapImpl implements IDTable {
   public synchronized void deregisterTrigger(
       PartialPath fullPath, IMeasurementMNode measurementMNode) throws MetadataException {
     boolean isAligned = measurementMNode.getParent().isAligned();
-    DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned);
+    DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned);
 
     deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUnUsingTrigger();
   }
@@ -279,6 +281,31 @@ public class IDTableHashmapImpl implements IDTable {
   }
 
   /**
+   * get device entry from device path
+   *
+   * @param deviceName device name of the time series
+   * @return device entry of the timeseries
+   */
+  @Override
+  public DeviceEntry getDeviceEntry(String deviceName) {
+    IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName);
+    int slot = calculateSlot(deviceID);
+
+    // reuse device entry in map
+    return idTables[slot].get(deviceID);
+  }
+
+  @Override
+  public List<DeviceEntry> getAllDeviceEntry() {
+    List<DeviceEntry> res = new ArrayList<>();
+    for (int i = 0; i < NUM_OF_SLOTS; i++) {
+      res.addAll(idTables[i].values());
+    }
+
+    return res;
+  }
+
+  /**
    * check whether a time series is exist if exist, check the type consistency if not exist, call
    * MManager to create it
    *
@@ -345,7 +372,7 @@ public class IDTableHashmapImpl implements IDTable {
    * @param isAligned whether the insert plan is aligned
    * @return device entry of the timeseries
    */
-  private DeviceEntry getDeviceEntry(PartialPath deviceName, boolean isAligned)
+  private DeviceEntry getDeviceEntryWithAlignedCheck(PartialPath deviceName, boolean isAligned)
       throws MetadataException {
     IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName);
     int slot = calculateSlot(deviceID);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
index eb7041c..c4f0369 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
@@ -32,9 +32,18 @@ public class DeviceEntry {
 
   boolean isAligned;
 
+  // for manages flush time
+  Map<Long, Long> lastTimeMap;
+
+  Map<Long, Long> flushTimeMap;
+
+  long globalFlushTime = Long.MIN_VALUE;
+
   public DeviceEntry(IDeviceID deviceID) {
     this.deviceID = deviceID;
     measurementMap = new HashMap<>();
+    lastTimeMap = new HashMap<>();
+    flushTimeMap = new HashMap<>();
   }
 
   /**
@@ -78,4 +87,60 @@ public class DeviceEntry {
   public IDeviceID getDeviceID() {
     return deviceID;
   }
+
+  // region support flush time
+  public void putLastTimeMap(long timePartition, long lastTime) {
+    lastTimeMap.put(timePartition, lastTime);
+  }
+
+  public void putFlushTimeMap(long timePartition, long flushTime) {
+    flushTimeMap.put(timePartition, flushTime);
+  }
+
+  public long updateLastTimeMap(long timePartition, long lastTime) {
+    return lastTimeMap.compute(
+        timePartition, (k, v) -> v == null ? lastTime : Math.max(v, lastTime));
+  }
+
+  public long updateFlushTimeMap(long timePartition, long flushTime) {
+    return flushTimeMap.compute(
+        timePartition, (k, v) -> v == null ? flushTime : Math.max(v, flushTime));
+  }
+
+  public void updateGlobalFlushTime(long flushTime) {
+    globalFlushTime = Math.max(globalFlushTime, flushTime);
+  }
+
+  public void setGlobalFlushTime(long globalFlushTime) {
+    this.globalFlushTime = globalFlushTime;
+  }
+
+  public Long getLastTime(long timePartition) {
+    return lastTimeMap.get(timePartition);
+  }
+
+  public Long getFlushTime(long timePartition) {
+    return flushTimeMap.get(timePartition);
+  }
+
+  public Long getLastTimeWithDefaultValue(long timePartition) {
+    return lastTimeMap.getOrDefault(timePartition, Long.MIN_VALUE);
+  }
+
+  public Long getFLushTimeWithDefaultValue(long timePartition) {
+    return flushTimeMap.getOrDefault(timePartition, Long.MIN_VALUE);
+  }
+
+  public long getGlobalFlushTime() {
+    return globalFlushTime;
+  }
+
+  public void clearLastTime() {
+    lastTimeMap.clear();
+  }
+
+  public void clearFlushTime() {
+    flushTimeMap.clear();
+  }
+  // endregion
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java
index ba47c35..c4a9676 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java
@@ -27,7 +27,7 @@ import java.util.function.Function;
 
 /** factory to build device id according to configured algorithm */
 public class DeviceIDFactory {
-  Function<PartialPath, IDeviceID> getDeviceIDFunction;
+  Function<String, IDeviceID> getDeviceIDFunction;
 
   // region DeviceIDFactory Singleton
   private static class DeviceIDFactoryHolder {
@@ -54,9 +54,9 @@ public class DeviceIDFactory {
             .getConfig()
             .getDeviceIDTransformationMethod()
             .equals("SHA256")) {
-      getDeviceIDFunction = partialPath -> new SHA256DeviceID(partialPath.toString());
+      getDeviceIDFunction = SHA256DeviceID::new;
     } else {
-      getDeviceIDFunction = partialPath -> new PlainDeviceID(partialPath.toString());
+      getDeviceIDFunction = PlainDeviceID::new;
     }
   }
   // endregion
@@ -68,6 +68,16 @@ public class DeviceIDFactory {
    * @return device id of the timeseries
    */
   public IDeviceID getDeviceID(PartialPath devicePath) {
+    return getDeviceIDFunction.apply(devicePath.toString());
+  }
+
+  /**
+   * get device id by full path
+   *
+   * @param devicePath device path of the timeseries
+   * @return device id of the timeseries
+   */
+  public IDeviceID getDeviceID(String devicePath) {
     return getDeviceIDFunction.apply(devicePath);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
new file mode 100644
index 0000000..32590f6
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
@@ -0,0 +1,224 @@
+/*
+ *Licensed to the Apache Software Foundation(ASF)under one
+ *or more contributor license agreements.See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.The ASF licenses this file
+ *to you under the Apache License,Version2.0(the
+ *"License");you may not use this file except in compliance
+ *with the License.You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS"BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND,either express or implied.See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+package org.apache.iotdb.db.metadata.idtable;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePartitionPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IDTableFlushTimeTest {
+  private PlanExecutor executor = new PlanExecutor();
+
+  private final Planner processor = new Planner();
+
+  private boolean isEnableIDTable = false;
+
+  private String originalDeviceIDTransformationMethod = null;
+
+  private boolean isEnableIDTableLogFile = false;
+
+  public IDTableFlushTimeTest() throws QueryProcessException {}
+
+  @Before
+  public void before() {
+    IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+    isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
+    originalDeviceIDTransformationMethod =
+        IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
+    isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
+
+    IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
+    IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
+    IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void clean() throws IOException, StorageEngineException {
+    IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
+    IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testSequenceInsert()
+      throws MetadataException, QueryProcessException, StorageEngineException {
+    insertData(0);
+    insertData(10);
+    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+    executor.processNonQuery(flushPlan);
+
+    insertData(20);
+
+    VirtualStorageGroupProcessor storageGroupProcessor =
+        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+    assertEquals(2, storageGroupProcessor.getSequenceFileTreeSet().size());
+    assertEquals(0, storageGroupProcessor.getUnSequenceFileList().size());
+  }
+
+  @Test
+  public void testUnSequenceInsert()
+      throws MetadataException, QueryProcessException, StorageEngineException {
+    insertData(100);
+    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+    executor.processNonQuery(flushPlan);
+
+    insertData(20);
+
+    VirtualStorageGroupProcessor storageGroupProcessor =
+        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+    assertEquals(1, storageGroupProcessor.getSequenceFileTreeSet().size());
+    assertEquals(1, storageGroupProcessor.getUnSequenceFileList().size());
+  }
+
+  @Test
+  public void testSequenceAndUnSequenceInsert()
+      throws MetadataException, QueryProcessException, StorageEngineException {
+    // sequence
+    insertData(100);
+    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+    executor.processNonQuery(flushPlan);
+
+    // sequence
+    insertData(120);
+    executor.processNonQuery(flushPlan);
+
+    // unsequence
+    insertData(20);
+    // sequence
+    insertData(130);
+    executor.processNonQuery(flushPlan);
+
+    // sequence
+    insertData(150);
+    // unsequence
+    insertData(90);
+
+    VirtualStorageGroupProcessor storageGroupProcessor =
+        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+    assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size());
+    assertEquals(2, storageGroupProcessor.getUnSequenceFileList().size());
+    assertEquals(1, storageGroupProcessor.getWorkSequenceTsFileProcessors().size());
+    assertEquals(1, storageGroupProcessor.getWorkUnsequenceTsFileProcessors().size());
+  }
+
+  @Test
+  public void testDeletePartition()
+      throws MetadataException, QueryProcessException, StorageEngineException {
+    insertData(100);
+    PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+    executor.processNonQuery(flushPlan);
+    insertData(20);
+    insertData(120);
+
+    VirtualStorageGroupProcessor storageGroupProcessor =
+        StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+
+    assertEquals(
+        103L, storageGroupProcessor.getLastFlushTimeManager().getFlushedTime(0L, "root.isp.d1"));
+    assertEquals(
+        123L, storageGroupProcessor.getLastFlushTimeManager().getLastTime(0L, "root.isp.d1"));
+    assertEquals(
+        103L, storageGroupProcessor.getLastFlushTimeManager().getGlobalFlushedTime("root.isp.d1"));
+
+    // delete time partition
+    Set<Long> deletedPartition = new HashSet<>();
+    deletedPartition.add(0L);
+    DeletePartitionPlan deletePartitionPlan =
+        new DeletePartitionPlan(new PartialPath("root.isp"), deletedPartition);
+    executor.processNonQuery(deletePartitionPlan);
+
+    assertEquals(
+        Long.MIN_VALUE,
+        storageGroupProcessor.getLastFlushTimeManager().getFlushedTime(0L, "root.isp.d1"));
+    assertEquals(
+        Long.MIN_VALUE,
+        storageGroupProcessor.getLastFlushTimeManager().getLastTime(0L, "root.isp.d1"));
+    assertEquals(
+        123L, storageGroupProcessor.getLastFlushTimeManager().getGlobalFlushedTime("root.isp.d1"));
+  }
+
+  private void insertData(long initTime) throws IllegalPathException, QueryProcessException {
+
+    long[] times = new long[] {initTime, initTime + 1, initTime + 2, initTime + 3};
+    List<Integer> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.DOUBLE.ordinal());
+    dataTypes.add(TSDataType.FLOAT.ordinal());
+    dataTypes.add(TSDataType.INT64.ordinal());
+    dataTypes.add(TSDataType.INT32.ordinal());
+    dataTypes.add(TSDataType.BOOLEAN.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+
+    Object[] columns = new Object[6];
+    columns[0] = new double[4];
+    columns[1] = new float[4];
+    columns[2] = new long[4];
+    columns[3] = new int[4];
+    columns[4] = new boolean[4];
+    columns[5] = new Binary[4];
+
+    for (int r = 0; r < 4; r++) {
+      ((double[]) columns[0])[r] = 10.0 + r;
+      ((float[]) columns[1])[r] = 20 + r;
+      ((long[]) columns[2])[r] = 100000 + r;
+      ((int[]) columns[3])[r] = 1000 + r;
+      ((boolean[]) columns[4])[r] = false;
+      ((Binary[]) columns[5])[r] = new Binary("mm" + r);
+    }
+
+    InsertTabletPlan tabletPlan =
+        new InsertTabletPlan(
+            new PartialPath("root.isp.d1"),
+            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+            dataTypes);
+    tabletPlan.setTimes(times);
+    tabletPlan.setColumns(columns);
+    tabletPlan.setRowCount(times.length);
+
+    PlanExecutor executor = new PlanExecutor();
+    executor.insertTablet(tabletPlan);
+  }
+}