You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/20 01:59:48 UTC

[iotdb] branch rel/0.12 updated: [IOTDB-1496][IOTDB-1569][To rel/0.12] Timed flush memtable & Timed close TsFileProcessor (#3777)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new b8e3d84  [IOTDB-1496][IOTDB-1569][To rel/0.12] Timed flush memtable & Timed close TsFileProcessor (#3777)
b8e3d84 is described below

commit b8e3d84535115a5aee853550d1a1b87ede9d46af
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Fri Aug 20 09:59:18 2021 +0800

    [IOTDB-1496][IOTDB-1569][To rel/0.12] Timed flush memtable & Timed close TsFileProcessor (#3777)
---
 RELEASE_NOTES.md                                   |   2 +
 .../resources/conf/iotdb-engine.properties         |  44 +++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 108 +++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  94 +++++++++++
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 154 +++++++++++++++---
 .../iotdb/db/engine/memtable/AbstractMemTable.java |   7 +
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   2 +
 .../engine/storagegroup/StorageGroupProcessor.java |  86 ++++++++++
 .../db/engine/storagegroup/TsFileProcessor.java    |  13 ++
 .../virtualSg/VirtualStorageGroupManager.java      |  27 ++++
 .../apache/iotdb/db/rescon/MemTableManager.java    |   4 +
 .../storagegroup/StorageGroupProcessorTest.java    | 178 +++++++++++++++++++--
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   4 +
 13 files changed, 695 insertions(+), 28 deletions(-)

diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index 29d5a6b..1ad2908 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -28,8 +28,10 @@
 * [IOTDB-1466] Support device template
 * [IOTDB-1491] UDTF query supported in cluster
 * [IOTDB-1536] Support fuzzy query
+* [IOTDB-1496] Timed flush memtable
 * [IOTDB-1561] Support fill by specific value
 * [IOTDB-1565] Add sql: set system to readonly/writable
+* [IOTDB-1569] Timed close TsFileProcessor
 * TTL can be set to the prefix path of storage group
 * add JMX monitor to all ThreadPools in the server module 
 
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 3f7178e..ea4d4e5 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -199,6 +199,50 @@ timestamp_precision=ms
 # When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB.
 # memtable_size_threshold=1073741824
 
+# Whether to timed flush sequence tsfiles' memtables.
+# Datatype: boolean
+# enable_timed_flush_seq_memtable=false
+
+# If a memTable's created time is older than current time minus this, the memtable will be flushed to disk.
+# Only check sequence tsfiles' memtables.
+# The default flush interval is 12 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# seq_memtable_flush_interval_in_ms=43200000
+
+# The interval to check whether sequence memtables need flushing.
+# The default flush check interval is 1 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# seq_memtable_flush_check_interval_in_ms=3600000
+
+# Whether to timed flush unsequence tsfiles' memtables.
+# Datatype: boolean
+# enable_timed_flush_unseq_memtable=true
+
+# If a memTable's created time is older than current time minus this, the memtable will be flushed to disk.
+# Only check unsequence tsfiles' memtables.
+# The default flush interval is 12 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# unseq_memtable_flush_interval_in_ms=43200000
+
+# The interval to check whether unsequence memtables need flushing.
+# The default flush check interval is 1 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# unseq_memtable_flush_check_interval_in_ms=3600000
+
+# Whether to timed close tsfiles.
+# Datatype: boolean
+# enable_timed_close_tsfile=true
+
+# If a TsfileProcessor's last working memtable flush time is older than current time minus this and its working memtable is null, the TsfileProcessor will be closed.
+# The default close interval is 12 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# close_tsfile_interval_after_flushing_in_ms=43200000
+
+# The interval to check whether tsfiles need closing.
+# The default close check interval is 1 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# close_tsfile_check_interval_in_ms=3600000
+
 # When the average point number of timeseries in memtable exceeds this, the memtable is flushed to disk. The default threshold is 10000.
 # avg_series_point_number_threshold=10000
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4ffd446..aa63643 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -274,6 +274,42 @@ public class IoTDBConfig {
   /** When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. */
   private long memtableSizeThreshold = 1024 * 1024 * 1024L;
 
+  /** Whether to timed flush sequence tsfiles' memtables. */
+  private boolean enableTimedFlushSeqMemtable = false;
+
+  /**
+   * If a memTable's created time is older than current time minus this, the memtable will be
+   * flushed to disk.(only check sequence tsfiles' memtables) Unit: ms
+   */
+  private long seqMemtableFlushInterval = 12 * 60 * 60 * 1000L;
+
+  /** The interval to check whether sequence memtables need flushing. Unit: ms */
+  private long seqMemtableFlushCheckInterval = 60 * 60 * 1000L;
+
+  /** Whether to timed flush unsequence tsfiles' memtables. */
+  private boolean enableTimedFlushUnseqMemtable = true;
+
+  /**
+   * If a memTable's created time is older than current time minus this, the memtable will be
+   * flushed to disk.(only check unsequence tsfiles' memtables) Unit: ms
+   */
+  private long unseqMemtableFlushInterval = 12 * 60 * 60 * 1000L;
+
+  /** The interval to check whether unsequence memtables need flushing. Unit: ms */
+  private long unseqMemtableFlushCheckInterval = 60 * 60 * 1000L;
+
+  /** Whether to timed close tsfiles. */
+  private boolean enableTimedCloseTsFile = true;
+
+  /**
+   * If a TsfileProcessor's last working memtable flush time is older than current time minus this
+   * and its working memtable is null, the TsfileProcessor will be closed. Unit: ms
+   */
+  private long closeTsFileIntervalAfterFlushing = 12 * 60 * 60 * 1000L;
+
+  /** The interval to check whether tsfiles need closing. Unit: ms */
+  private long closeTsFileCheckInterval = 60 * 60 * 1000L;
+
   /** When average series point number reaches this, flush the memtable to disk */
   private int avgSeriesPointNumberThreshold = 10000;
 
@@ -1403,6 +1439,78 @@ public class IoTDBConfig {
     this.memtableSizeThreshold = memtableSizeThreshold;
   }
 
+  public boolean isEnableTimedFlushSeqMemtable() {
+    return enableTimedFlushSeqMemtable;
+  }
+
+  public void setEnableTimedFlushSeqMemtable(boolean enableTimedFlushSeqMemtable) {
+    this.enableTimedFlushSeqMemtable = enableTimedFlushSeqMemtable;
+  }
+
+  public long getSeqMemtableFlushInterval() {
+    return seqMemtableFlushInterval;
+  }
+
+  public void setSeqMemtableFlushInterval(long seqMemtableFlushInterval) {
+    this.seqMemtableFlushInterval = seqMemtableFlushInterval;
+  }
+
+  public long getSeqMemtableFlushCheckInterval() {
+    return seqMemtableFlushCheckInterval;
+  }
+
+  public void setSeqMemtableFlushCheckInterval(long seqMemtableFlushCheckInterval) {
+    this.seqMemtableFlushCheckInterval = seqMemtableFlushCheckInterval;
+  }
+
+  public boolean isEnableTimedFlushUnseqMemtable() {
+    return enableTimedFlushUnseqMemtable;
+  }
+
+  public void setEnableTimedFlushUnseqMemtable(boolean enableTimedFlushUnseqMemtable) {
+    this.enableTimedFlushUnseqMemtable = enableTimedFlushUnseqMemtable;
+  }
+
+  public long getUnseqMemtableFlushInterval() {
+    return unseqMemtableFlushInterval;
+  }
+
+  public void setUnseqMemtableFlushInterval(long unseqMemtableFlushInterval) {
+    this.unseqMemtableFlushInterval = unseqMemtableFlushInterval;
+  }
+
+  public long getUnseqMemtableFlushCheckInterval() {
+    return unseqMemtableFlushCheckInterval;
+  }
+
+  public void setUnseqMemtableFlushCheckInterval(long unseqMemtableFlushCheckInterval) {
+    this.unseqMemtableFlushCheckInterval = unseqMemtableFlushCheckInterval;
+  }
+
+  public boolean isEnableTimedCloseTsFile() {
+    return enableTimedCloseTsFile;
+  }
+
+  public void setEnableTimedCloseTsFile(boolean enableTimedCloseTsFile) {
+    this.enableTimedCloseTsFile = enableTimedCloseTsFile;
+  }
+
+  public long getCloseTsFileIntervalAfterFlushing() {
+    return closeTsFileIntervalAfterFlushing;
+  }
+
+  public void setCloseTsFileIntervalAfterFlushing(long closeTsFileIntervalAfterFlushing) {
+    this.closeTsFileIntervalAfterFlushing = closeTsFileIntervalAfterFlushing;
+  }
+
+  public long getCloseTsFileCheckInterval() {
+    return closeTsFileCheckInterval;
+  }
+
+  public void setCloseTsFileCheckInterval(long closeTsFileCheckInterval) {
+    this.closeTsFileCheckInterval = closeTsFileCheckInterval;
+  }
+
   public int getAvgSeriesPointNumberThreshold() {
     return avgSeriesPointNumberThreshold;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2bdb253..7696908 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.conf;
 
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.utils.FilePathUtils;
@@ -756,6 +757,9 @@ public class IoTDBDescriptor {
               properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
       TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
 
+      // timed flush memtable, timed close tsfile
+      loadTimedService(properties);
+
       // set tsfile-format config
       loadTsFileProps(properties);
 
@@ -968,6 +972,92 @@ public class IoTDBDescriptor {
                         TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode()))));
   }
 
+  // timed flush memtable, timed close tsfile
+  private void loadTimedService(Properties properties) {
+    conf.setEnableTimedFlushSeqMemtable(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_timed_flush_seq_memtable",
+                Boolean.toString(conf.isEnableTimedFlushSeqMemtable()))));
+
+    long seqMemTableFlushInterval =
+        Long.parseLong(
+            properties
+                .getProperty(
+                    "seq_memtable_flush_interval_in_ms",
+                    Long.toString(conf.getSeqMemtableFlushInterval()))
+                .trim());
+    if (seqMemTableFlushInterval > 0) {
+      conf.setSeqMemtableFlushInterval(seqMemTableFlushInterval);
+    }
+
+    long seqMemTableFlushCheckInterval =
+        Long.parseLong(
+            properties
+                .getProperty(
+                    "seq_memtable_flush_check_interval_in_ms",
+                    Long.toString(conf.getSeqMemtableFlushCheckInterval()))
+                .trim());
+    if (seqMemTableFlushCheckInterval > 0) {
+      conf.setSeqMemtableFlushCheckInterval(seqMemTableFlushCheckInterval);
+    }
+
+    conf.setEnableTimedFlushUnseqMemtable(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_timed_flush_unseq_memtable",
+                Boolean.toString(conf.isEnableTimedFlushUnseqMemtable()))));
+
+    long unseqMemTableFlushInterval =
+        Long.parseLong(
+            properties
+                .getProperty(
+                    "unseq_memtable_flush_interval_in_ms",
+                    Long.toString(conf.getUnseqMemtableFlushInterval()))
+                .trim());
+    if (unseqMemTableFlushInterval > 0) {
+      conf.setUnseqMemtableFlushInterval(unseqMemTableFlushInterval);
+    }
+
+    long unseqMemTableFlushCheckInterval =
+        Long.parseLong(
+            properties
+                .getProperty(
+                    "unseq_memtable_flush_check_interval_in_ms",
+                    Long.toString(conf.getUnseqMemtableFlushCheckInterval()))
+                .trim());
+    if (unseqMemTableFlushCheckInterval > 0) {
+      conf.setUnseqMemtableFlushCheckInterval(unseqMemTableFlushCheckInterval);
+    }
+
+    conf.setEnableTimedCloseTsFile(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_timed_close_tsfile", Boolean.toString(conf.isEnableTimedCloseTsFile()))));
+
+    long closeTsFileIntervalAfterFlushing =
+        Long.parseLong(
+            properties
+                .getProperty(
+                    "close_tsfile_interval_after_flushing_in_ms",
+                    Long.toString(conf.getCloseTsFileIntervalAfterFlushing()))
+                .trim());
+    if (closeTsFileIntervalAfterFlushing > 0) {
+      conf.setCloseTsFileIntervalAfterFlushing(closeTsFileIntervalAfterFlushing);
+    }
+
+    long closeTsFileCheckInterval =
+        Long.parseLong(
+            properties
+                .getProperty(
+                    "close_tsfile_check_interval_in_ms",
+                    Long.toString(conf.getCloseTsFileCheckInterval()))
+                .trim());
+    if (closeTsFileCheckInterval > 0) {
+      conf.setCloseTsFileCheckInterval(closeTsFileCheckInterval);
+    }
+  }
+
   public void loadHotModifiedProps(Properties properties) throws QueryProcessException {
     try {
       // update data dirs
@@ -987,6 +1077,10 @@ public class IoTDBDescriptor {
       // update WAL conf
       loadWALProps(properties);
 
+      // update timed flush & close conf
+      loadTimedService(properties);
+      StorageEngine.getInstance().rebootTimedService();
+
       long seqTsFileSize =
           Long.parseLong(
               properties
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index eafaf0e..49750f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -99,6 +99,7 @@ public class StorageEngine implements IService {
 
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
+
   /**
    * Time range for dividing storage group, the time unit is the same with IoTDB's
    * TimestampPrecision
@@ -122,6 +123,10 @@ public class StorageEngine implements IService {
 
   private ExecutorService recoverAllSgThreadPool;
   private ScheduledExecutorService ttlCheckThread;
+  private ScheduledExecutorService seqMemtableTimedFlushCheckThread;
+  private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
+  private ScheduledExecutorService tsFileTimedCloseCheckThread;
+
   private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
   private ExecutorService recoveryThreadPool;
   // add customized listeners here for flush and close events
@@ -295,6 +300,9 @@ public class StorageEngine implements IService {
     ttlCheckThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("TTL");
     ttlCheckThread.scheduleAtFixedRate(
         this::checkTTL, TTL_CHECK_INTERVAL, TTL_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+    logger.info("start ttl check thread successfully.");
+
+    startTimedService();
   }
 
   private void checkTTL() {
@@ -313,20 +321,79 @@ public class StorageEngine implements IService {
     }
   }
 
+  private void startTimedService() {
+    // timed flush sequence memtable
+    if (config.isEnableTimedFlushSeqMemtable()) {
+      seqMemtableTimedFlushCheckThread =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("TimedFlushSeqMemtable");
+      seqMemtableTimedFlushCheckThread.scheduleAtFixedRate(
+          this::timedFlushSeqMemTable,
+          config.getSeqMemtableFlushCheckInterval(),
+          config.getSeqMemtableFlushCheckInterval(),
+          TimeUnit.MILLISECONDS);
+      logger.info("start sequence memtable timed flush check thread successfully.");
+    }
+    // timed flush unsequence memtable
+    if (config.isEnableTimedFlushUnseqMemtable()) {
+      unseqMemtableTimedFlushCheckThread =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("TimedFlushUnSeqMemtable");
+      unseqMemtableTimedFlushCheckThread.scheduleAtFixedRate(
+          this::timedFlushUnseqMemTable,
+          config.getUnseqMemtableFlushCheckInterval(),
+          config.getUnseqMemtableFlushCheckInterval(),
+          TimeUnit.MILLISECONDS);
+      logger.info("start unsequence memtable timed flush check thread successfully.");
+    }
+    // timed close tsfile
+    if (config.isEnableTimedCloseTsFile()) {
+      tsFileTimedCloseCheckThread =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("TimedCloseTsFileProcessor");
+      tsFileTimedCloseCheckThread.scheduleAtFixedRate(
+          this::timedCloseTsFileProcessor,
+          config.getCloseTsFileCheckInterval(),
+          config.getCloseTsFileCheckInterval(),
+          TimeUnit.MILLISECONDS);
+      logger.info("start tsfile timed close check thread successfully.");
+    }
+  }
+
+  private void timedFlushSeqMemTable() {
+    try {
+      for (VirtualStorageGroupManager processor : processorMap.values()) {
+        processor.timedFlushSeqMemTable();
+      }
+    } catch (Exception e) {
+      logger.error("An error occurred when timed flushing sequence memtables", e);
+    }
+  }
+
+  private void timedFlushUnseqMemTable() {
+    try {
+      for (VirtualStorageGroupManager processor : processorMap.values()) {
+        processor.timedFlushUnseqMemTable();
+      }
+    } catch (Exception e) {
+      logger.error("An error occurred when timed flushing unsequence memtables", e);
+    }
+  }
+
+  private void timedCloseTsFileProcessor() {
+    try {
+      for (VirtualStorageGroupManager processor : processorMap.values()) {
+        processor.timedCloseTsFileProcessor();
+      }
+    } catch (Exception e) {
+      logger.error("An error occurred when timed closing tsfiles interval", e);
+    }
+  }
+
   @Override
   public void stop() {
     syncCloseAllProcessor();
-    if (ttlCheckThread != null) {
-      ttlCheckThread.shutdownNow();
-      try {
-        ttlCheckThread.awaitTermination(60, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        logger.warn("TTL check thread still doesn't exit after 60s");
-        Thread.currentThread().interrupt();
-        throw new StorageEngineFailureException(
-            "StorageEngine failed to stop because of " + "ttlCheckThread.", e);
-      }
-    }
+    stopTimedService(ttlCheckThread, "TTlCheckThread");
+    stopTimedService(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
+    stopTimedService(unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
+    stopTimedService(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
     recoveryThreadPool.shutdownNow();
     if (!recoverAllSgThreadPool.isShutdown()) {
       recoverAllSgThreadPool.shutdownNow();
@@ -336,7 +403,7 @@ public class StorageEngine implements IService {
         logger.warn("recoverAllSgThreadPool thread still doesn't exit after 60s");
         Thread.currentThread().interrupt();
         throw new StorageEngineFailureException(
-            "StorageEngine failed to stop because of " + "recoverAllSgThreadPool.", e);
+            "StorageEngine failed to stop because of recoverAllSgThreadPool.", e);
       }
     }
     for (PartialPath storageGroup : IoTDB.metaManager.getAllStorageGroupPaths()) {
@@ -345,6 +412,20 @@ public class StorageEngine implements IService {
     this.reset();
   }
 
+  private void stopTimedService(ScheduledExecutorService pool, String poolName) {
+    if (pool != null) {
+      pool.shutdownNow();
+      try {
+        pool.awaitTermination(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        logger.warn("{} still doesn't exit after 60s", poolName);
+        Thread.currentThread().interrupt();
+        throw new StorageEngineFailureException(
+            String.format("StorageEngine failed to stop because of %s.", poolName), e);
+      }
+    }
+  }
+
   @Override
   public void shutdown(long milliseconds) throws ShutdownException {
     try {
@@ -352,17 +433,54 @@ public class StorageEngine implements IService {
     } catch (TsFileProcessorException e) {
       throw new ShutdownException(e);
     }
-    if (ttlCheckThread != null) {
-      ttlCheckThread.shutdownNow();
+    shutdownTimedService(ttlCheckThread, "TTlCheckThread");
+    shutdownTimedService(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
+    shutdownTimedService(unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
+    shutdownTimedService(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
+    recoveryThreadPool.shutdownNow();
+    this.reset();
+  }
+
+  private void shutdownTimedService(ScheduledExecutorService pool, String poolName) {
+    if (pool != null) {
+      pool.shutdownNow();
       try {
-        ttlCheckThread.awaitTermination(30, TimeUnit.SECONDS);
+        pool.awaitTermination(30, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
-        logger.warn("TTL check thread still doesn't exit after 30s");
+        logger.warn("{} still doesn't exit after 30s", poolName);
         Thread.currentThread().interrupt();
       }
     }
-    recoveryThreadPool.shutdownNow();
-    this.reset();
+  }
+
+  /** reboot timed flush sequence/unsequence memetable thread, timed close tsfile thread */
+  public void rebootTimedService() throws ShutdownException {
+    logger.info("Start rebooting all timed service.");
+
+    // exclude ttl check thread
+    stopTimedServiceAndThrow(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
+    stopTimedServiceAndThrow(
+        unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
+    stopTimedServiceAndThrow(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
+
+    logger.info("Stop all timed service successfully, and now restart them.");
+
+    startTimedService();
+
+    logger.info("Reboot all timed service successfully");
+  }
+
+  private void stopTimedServiceAndThrow(ScheduledExecutorService pool, String poolName)
+      throws ShutdownException {
+    if (pool != null) {
+      pool.shutdownNow();
+      try {
+        pool.awaitTermination(30, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        logger.warn("{} still doesn't exit after 30s", poolName);
+        throw new ShutdownException(e);
+      }
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index b49fc31..6bf79e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -71,6 +71,8 @@ public abstract class AbstractMemTable implements IMemTable {
 
   private long minPlanIndex = Long.MAX_VALUE;
 
+  private long createdTime = System.currentTimeMillis();
+
   public AbstractMemTable() {
     this.memTableMap = new HashMap<>();
   }
@@ -359,4 +361,9 @@ public abstract class AbstractMemTable implements IMemTable {
     maxPlanIndex = Math.max(index, maxPlanIndex);
     minPlanIndex = Math.min(index, minPlanIndex);
   }
+
+  @Override
+  public long getCreatedTime() {
+    return createdTime;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index ce5de4e..98532aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -143,4 +143,6 @@ public interface IMemTable {
   long getMaxPlanIndex();
 
   long getMinPlanIndex();
+
+  long getCreatedTime();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 9964d46..ab8ccdc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1421,6 +1421,92 @@ public class StorageGroupProcessor {
     }
   }
 
+  public void timedFlushSeqMemTable() {
+    writeLock("timedFlushSeqMemTable");
+    try {
+      // only check sequence tsfiles' memtables
+      List<TsFileProcessor> tsFileProcessors =
+          new ArrayList<>(workSequenceTsFileProcessors.values());
+      long timeLowerBound = System.currentTimeMillis() - config.getSeqMemtableFlushInterval();
+
+      for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
+        if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
+          logger.info(
+              "Exceed sequence memtable flush interval, so flush working memtable of time partition {} in storage group {}[{}]",
+              tsFileProcessor.getTimeRangeId(),
+              logicalStorageGroupName,
+              virtualStorageGroupId);
+          fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+        }
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  public void timedFlushUnseqMemTable() {
+    writeLock("timedFlushUnseqMemTable");
+    try {
+      // only check unsequence tsfiles' memtables
+      List<TsFileProcessor> tsFileProcessors =
+          new ArrayList<>(workUnsequenceTsFileProcessors.values());
+      long timeLowerBound = System.currentTimeMillis() - config.getUnseqMemtableFlushInterval();
+
+      for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
+        if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
+          logger.info(
+              "Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in storage group {}[{}]",
+              tsFileProcessor.getTimeRangeId(),
+              logicalStorageGroupName,
+              virtualStorageGroupId);
+          fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+        }
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  public void timedCloseTsFileProcessor() {
+    writeLock("timedCloseTsFileProcessor");
+    try {
+      List<TsFileProcessor> seqTsFileProcessors =
+          new ArrayList<>(workSequenceTsFileProcessors.values());
+      long timeLowerBound =
+          System.currentTimeMillis() - config.getCloseTsFileIntervalAfterFlushing();
+      for (TsFileProcessor tsFileProcessor : seqTsFileProcessors) {
+        // working memtable is null(no more write ops) and last flush time exceeds close interval
+        if (tsFileProcessor.getWorkMemTableCreatedTime() == Long.MAX_VALUE
+            && tsFileProcessor.getLastWorkMemtableFlushTime() < timeLowerBound) {
+          logger.info(
+              "Exceed tsfile close interval, so close TsFileProcessor of time partition {} in storage group {}[{}]",
+              tsFileProcessor.getTimeRangeId(),
+              logicalStorageGroupName,
+              virtualStorageGroupId);
+          asyncCloseOneTsFileProcessor(true, tsFileProcessor);
+        }
+      }
+
+      List<TsFileProcessor> unSeqTsFileProcessors =
+          new ArrayList<>(workUnsequenceTsFileProcessors.values());
+      timeLowerBound = System.currentTimeMillis() - config.getCloseTsFileIntervalAfterFlushing();
+      for (TsFileProcessor tsFileProcessor : unSeqTsFileProcessors) {
+        // working memtable is null(no more write ops) and last flush time exceeds close interval
+        if (tsFileProcessor.getWorkMemTableCreatedTime() == Long.MAX_VALUE
+            && tsFileProcessor.getLastWorkMemtableFlushTime() < timeLowerBound) {
+          logger.info(
+              "Exceed tsfile close interval, so close TsFileProcessor of time partition {} in storage group {}[{}]",
+              tsFileProcessor.getTimeRangeId(),
+              logicalStorageGroupName,
+              virtualStorageGroupId);
+          asyncCloseOneTsFileProcessor(false, tsFileProcessor);
+        }
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
   /** This method will be blocked until all tsfile processors are closed. */
   public void syncCloseAllWorkingTsFileProcessors() {
     synchronized (closeStorageGroupCondition) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index e35cc3a..73fcb81 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -113,6 +113,9 @@ public class TsFileProcessor {
 
   private IMemTable workMemTable;
 
+  /** last flush time to flush the working memtable */
+  private long lastWorkMemtableFlushTime;
+
   /** this callback is called before the workMemtable is added into the flushingMemTables. */
   private final UpdateEndTimeCallBack updateLatestFlushTimeCallback;
 
@@ -745,6 +748,7 @@ public class TsFileProcessor {
       totalMemTableSize += tobeFlushed.memSize();
     }
     workMemTable = null;
+    lastWorkMemtableFlushTime = System.currentTimeMillis();
     FlushManager.getInstance().registerTsFileProcessor(this);
   }
 
@@ -1217,6 +1221,15 @@ public class TsFileProcessor {
     return workMemTable != null ? workMemTable.getTVListsRamCost() : 0;
   }
 
+  /** Return Long.MAX_VALUE if workMemTable is null */
+  public long getWorkMemTableCreatedTime() {
+    return workMemTable != null ? workMemTable.getCreatedTime() : Long.MAX_VALUE;
+  }
+
+  public long getLastWorkMemtableFlushTime() {
+    return lastWorkMemtableFlushTime;
+  }
+
   public boolean isSequence() {
     return sequence;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index 996b857..8ad42bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -85,6 +85,33 @@ public class VirtualStorageGroupManager {
     }
   }
 
+  /** push check sequence memtable flush interval down to all sg */
+  public void timedFlushSeqMemTable() {
+    for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+      if (storageGroupProcessor != null) {
+        storageGroupProcessor.timedFlushSeqMemTable();
+      }
+    }
+  }
+
+  /** push check unsequence memtable flush interval down to all sg */
+  public void timedFlushUnseqMemTable() {
+    for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+      if (storageGroupProcessor != null) {
+        storageGroupProcessor.timedFlushUnseqMemTable();
+      }
+    }
+  }
+
+  /** push check TsFileProcessor close interval down to all sg */
+  public void timedCloseTsFileProcessor() {
+    for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+      if (storageGroupProcessor != null) {
+        storageGroupProcessor.timedCloseTsFileProcessor();
+      }
+    }
+  }
+
   /**
    * get processor from device id
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java
index 856ffc8..461b64b 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java
@@ -102,6 +102,10 @@ public class MemTableManager {
     notifyAll();
   }
 
+  public synchronized void close() {
+    currentMemtableNumber = 0;
+  }
+
   private static class InstanceHolder {
 
     private static final MemTableManager INSTANCE = new MemTableManager();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 20b4cf7..6493755 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -22,11 +22,14 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.ShutdownException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -37,6 +40,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -50,6 +54,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -57,6 +63,8 @@ import java.util.Collections;
 import java.util.List;
 
 public class StorageGroupProcessorTest {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static Logger logger = LoggerFactory.getLogger(StorageGroupProcessorTest.class);
 
   private String storageGroup = "root.vehicle.d0";
   private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
@@ -67,9 +75,7 @@ public class StorageGroupProcessorTest {
 
   @Before
   public void setUp() throws Exception {
-    IoTDBDescriptor.getInstance()
-        .getConfig()
-        .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+    config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
     MetadataManagerHelper.initMetadata();
     EnvironmentUtils.envSetUp();
     processor = new DummySGP(systemDir, storageGroup);
@@ -83,9 +89,7 @@ public class StorageGroupProcessorTest {
     EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
     MergeManager.getINSTANCE().stop();
     EnvironmentUtils.cleanEnv();
-    IoTDBDescriptor.getInstance()
-        .getConfig()
-        .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+    config.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
   }
 
   private void insertToStorageGroupProcessor(TSRecord record)
@@ -296,7 +300,6 @@ public class StorageGroupProcessorTest {
   @Test
   public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
       throws WriteProcessException, QueryProcessException, IllegalPathException, IOException {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     boolean defaultValue = config.isEnableDiscardOutOfOrderData();
     config.setEnableDiscardOutOfOrderData(true);
 
@@ -338,7 +341,6 @@ public class StorageGroupProcessorTest {
   @Test
   public void testEnableDiscardOutOfOrderDataForInsertTablet1()
       throws QueryProcessException, IllegalPathException, IOException {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
     long defaultTimePartition = config.getPartitionInterval();
     boolean defaultEnablePartition = config.isEnablePartition();
@@ -420,7 +422,6 @@ public class StorageGroupProcessorTest {
   @Test
   public void testEnableDiscardOutOfOrderDataForInsertTablet2()
       throws QueryProcessException, IllegalPathException, IOException {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
     long defaultTimePartition = config.getPartitionInterval();
     boolean defaultEnablePartition = config.isEnablePartition();
@@ -502,7 +503,6 @@ public class StorageGroupProcessorTest {
   @Test
   public void testEnableDiscardOutOfOrderDataForInsertTablet3()
       throws QueryProcessException, IllegalPathException, IOException {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
     long defaultTimePartition = config.getPartitionInterval();
     boolean defaultEnablePartition = config.isEnablePartition();
@@ -616,6 +616,164 @@ public class StorageGroupProcessorTest {
     }
   }
 
+  @Test
+  public void testTimedFlushSeqMemTable()
+      throws IllegalPathException, InterruptedException, WriteProcessException, ShutdownException {
+    // create one sequence memtable
+    TSRecord record = new TSRecord(10000, deviceId);
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+    processor.insert(new InsertRowPlan(record));
+    Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    // change config & reboot timed service
+    boolean prevEnableTimedFlushSeqMemtable = config.isEnableTimedFlushSeqMemtable();
+    long preFLushInterval = config.getSeqMemtableFlushInterval();
+    config.setEnableTimedFlushSeqMemtable(true);
+    config.setSeqMemtableFlushInterval(5);
+    StorageEngine.getInstance().rebootTimedService();
+
+    Thread.sleep(500);
+
+    // flush the sequence memtable
+    processor.timedFlushSeqMemTable();
+
+    // wait until memtable flush task is done
+    Assert.assertEquals(1, processor.getWorkSequenceTsFileProcessors().size());
+    TsFileProcessor tsFileProcessor = processor.getWorkSequenceTsFileProcessors().iterator().next();
+    FlushManager flushManager = FlushManager.getInstance();
+    int waitCnt = 0;
+    while (tsFileProcessor.getFlushingMemTableSize() != 0
+        || tsFileProcessor.isManagedByFlushManager()
+        || flushManager.getNumberOfPendingTasks() != 0
+        || flushManager.getNumberOfPendingSubTasks() != 0
+        || flushManager.getNumberOfWorkingTasks() != 0
+        || flushManager.getNumberOfWorkingSubTasks() != 0) {
+      Thread.sleep(500);
+      ++waitCnt;
+      if (waitCnt % 10 == 0) {
+        logger.info("already wait {} s", waitCnt / 2);
+      }
+    }
+
+    Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    config.setEnableTimedFlushSeqMemtable(prevEnableTimedFlushSeqMemtable);
+    config.setSeqMemtableFlushInterval(preFLushInterval);
+  }
+
+  @Test
+  public void testTimedFlushUnseqMemTable()
+      throws IllegalPathException, InterruptedException, WriteProcessException, ShutdownException {
+    // create one sequence memtable & close
+    TSRecord record = new TSRecord(10000, deviceId);
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+    processor.insert(new InsertRowPlan(record));
+    Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+    processor.syncCloseAllWorkingTsFileProcessors();
+    Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    // create one unsequence memtable
+    record = new TSRecord(1, deviceId);
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+    processor.insert(new InsertRowPlan(record));
+    Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    // change config & reboot timed service
+    boolean prevEnableTimedFlushUnseqMemtable = config.isEnableTimedFlushUnseqMemtable();
+    long preFLushInterval = config.getUnseqMemtableFlushInterval();
+    config.setEnableTimedFlushUnseqMemtable(true);
+    config.setUnseqMemtableFlushInterval(5);
+    StorageEngine.getInstance().rebootTimedService();
+
+    Thread.sleep(500);
+
+    // flush the unsequence memtable
+    processor.timedFlushUnseqMemTable();
+
+    // wait until memtable flush task is done
+    Assert.assertEquals(1, processor.getWorkUnsequenceTsFileProcessors().size());
+    TsFileProcessor tsFileProcessor =
+        processor.getWorkUnsequenceTsFileProcessors().iterator().next();
+    FlushManager flushManager = FlushManager.getInstance();
+    int waitCnt = 0;
+    while (tsFileProcessor.getFlushingMemTableSize() != 0
+        || tsFileProcessor.isManagedByFlushManager()
+        || flushManager.getNumberOfPendingTasks() != 0
+        || flushManager.getNumberOfPendingSubTasks() != 0
+        || flushManager.getNumberOfWorkingTasks() != 0
+        || flushManager.getNumberOfWorkingSubTasks() != 0) {
+      Thread.sleep(500);
+      ++waitCnt;
+      if (waitCnt % 10 == 0) {
+        logger.info("already wait {} s", waitCnt / 2);
+      }
+    }
+
+    Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    config.setEnableTimedFlushUnseqMemtable(prevEnableTimedFlushUnseqMemtable);
+    config.setUnseqMemtableFlushInterval(preFLushInterval);
+  }
+
+  @Test
+  public void testTimedCloseTsFile()
+      throws IllegalPathException, InterruptedException, WriteProcessException, ShutdownException {
+    // create one sequence memtable
+    TSRecord record = new TSRecord(10000, deviceId);
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+    processor.insert(new InsertRowPlan(record));
+    Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    // change config & reboot timed service
+    boolean prevEnableTimedFlushSeqMemtable = config.isEnableTimedFlushSeqMemtable();
+    long preFLushInterval = config.getSeqMemtableFlushInterval();
+    config.setEnableTimedFlushSeqMemtable(true);
+    config.setSeqMemtableFlushInterval(5);
+    boolean prevEnableTimedCloseTsFile = config.isEnableTimedCloseTsFile();
+    long prevCloseTsFileInterval = config.getCloseTsFileIntervalAfterFlushing();
+    config.setEnableTimedCloseTsFile(true);
+    config.setCloseTsFileIntervalAfterFlushing(5);
+    StorageEngine.getInstance().rebootTimedService();
+
+    Thread.sleep(500);
+
+    // flush the sequence memtable
+    processor.timedFlushSeqMemTable();
+
+    // wait until memtable flush task is done
+    Assert.assertEquals(1, processor.getWorkSequenceTsFileProcessors().size());
+    TsFileProcessor tsFileProcessor = processor.getWorkSequenceTsFileProcessors().iterator().next();
+    FlushManager flushManager = FlushManager.getInstance();
+    int waitCnt = 0;
+    while (tsFileProcessor.getFlushingMemTableSize() != 0
+        || tsFileProcessor.isManagedByFlushManager()
+        || flushManager.getNumberOfPendingTasks() != 0
+        || flushManager.getNumberOfPendingSubTasks() != 0
+        || flushManager.getNumberOfWorkingTasks() != 0
+        || flushManager.getNumberOfWorkingSubTasks() != 0) {
+      Thread.sleep(500);
+      ++waitCnt;
+      if (waitCnt % 10 == 0) {
+        logger.info("already wait {} s", waitCnt / 2);
+      }
+    }
+
+    Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+    Assert.assertFalse(tsFileProcessor.alreadyMarkedClosing());
+
+    // close the tsfile
+    processor.timedCloseTsFileProcessor();
+
+    Thread.sleep(500);
+
+    Assert.assertTrue(tsFileProcessor.alreadyMarkedClosing());
+
+    config.setEnableTimedFlushSeqMemtable(prevEnableTimedFlushSeqMemtable);
+    config.setSeqMemtableFlushInterval(preFLushInterval);
+    config.setEnableTimedCloseTsFile(prevEnableTimedCloseTsFile);
+    config.setCloseTsFileIntervalAfterFlushing(prevCloseTsFileInterval);
+  }
+
   class DummySGP extends StorageGroupProcessor {
 
     DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index c459760..f7a9310 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.control.TracingManager;
 import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.IoTDB;
@@ -147,6 +148,9 @@ public class EnvironmentUtils {
     // clear system info
     SystemInfo.getInstance().close();
 
+    // clear memtable manager info
+    MemTableManager.getInstance().close();
+
     // delete all directory
     cleanAllDir();
     config.setSeqTsFileSize(oldSeqTsFileSize);