You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/12/04 09:58:23 UTC

[iotdb] branch dev_sliding_mem_table created (now 5207bac)

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a change to branch dev_sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 5207bac  finish sliding window version 1

This branch includes the following new commits:

     new 5207bac  finish sliding window version 1

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: finish sliding window version 1

Posted by ej...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dev_sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5207bace0f75bc1beccb33994d5b9a6c7a81d464
Author: EJTTianyu <16...@qq.com>
AuthorDate: Fri Dec 4 17:57:47 2020 +0800

    finish sliding window version 1
---
 .../apache/iotdb/tsfile/TsFileSequenceRead.java    |   2 +-
 .../resources/conf/iotdb-engine.properties         |  13 +++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  39 ++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  11 +++
 .../apache/iotdb/db/engine/flush/FlushManager.java |  21 +++++
 .../engine/storagegroup/StorageGroupProcessor.java | 101 ++++++++++++++++-----
 .../db/engine/storagegroup/TsFileProcessor.java    |  73 ++++++++++++---
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  11 +++
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |   5 +
 .../storagegroup/StorageGroupProcessorTest.java    |  25 +++++
 10 files changed, 263 insertions(+), 38 deletions(-)

diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index da5bae6..4cbf862 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -41,7 +41,7 @@ public class TsFileSequenceRead {
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public static void main(String[] args) throws IOException {
-    String filename = "test.tsfile";
+    String filename = "/Users/tianyu/2019秋季学期/iotdb/server/target/data/sequence/root.vehicle.d0/0/1607075644401-1-0.tsfile";
     if (args.length >= 1) {
       filename = args[0];
     }
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index b0a691b..1f296ef 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -232,6 +232,9 @@ primitive_array_size=128
 # Ratio of write memory for invoking flush disk, 0.3 by default
 flush_proportion=0.3
 
+# Ratio of write memory for invoking flush immediately, 0.6 by default
+force_flush_proportion=0.6
+
 # Ratio of write memory allocated for buffered arrays, 0.6 by default
 buffered_arrays_memory_proportion=0.6
 
@@ -255,6 +258,16 @@ max_waiting_time_when_insert_blocked=10000
 estimated_series_size=300
 
 ####################
+### Sliding Memory Table Configurations
+####################
+
+# Whether to enable sliding memory table
+enable_sliding_mem_table=true
+
+# Save the flushing memtable in the memory during the period, can help reduce the unseq ratio, Unit: millis.
+flush_wait_time=60000
+
+####################
 ### Upgrade Configurations
 ####################
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 72718e2..a337152 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -152,6 +152,11 @@ public class IoTDBConfig {
   private double flushProportion = 0.3;
 
   /**
+   * Force flush proportion for system
+   */
+  private double forceFlushProportion = 0.6;
+
+  /**
    * Reject proportion for system
    */
   private double rejectProportion = 0.8;
@@ -205,6 +210,16 @@ public class IoTDBConfig {
   private int estimatedSeriesSize = 300;
 
   /**
+   * Whether to enable sliding memory table
+   */
+  private boolean enableSlidingMemTable = true;
+
+  /**
+   * Save the flushing memtable in the memory during the period, can help reduce the unseq ratio, Unit: millis.
+   */
+  private int flushWaitTime = 60000;
+
+  /**
    * default base dir, stores all IoTDB runtime files
    */
   private static final String DEFAULT_BASE_DIR = "data";
@@ -989,6 +1004,22 @@ public class IoTDBConfig {
     this.forceWalPeriodInMs = forceWalPeriodInMs;
   }
 
+  public boolean isEnableSlidingMemTable() {
+    return enableSlidingMemTable;
+  }
+
+  public void setEnableSlidingMemTable(boolean enableSlidingMemTable) {
+    this.enableSlidingMemTable = enableSlidingMemTable;
+  }
+
+  public int getFlushWaitTime() {
+    return flushWaitTime;
+  }
+
+  public void setFlushWaitTime(int flushWaitTime) {
+    this.flushWaitTime = flushWaitTime;
+  }
+
   public String getSystemDir() {
     return systemDir;
   }
@@ -1261,6 +1292,14 @@ public class IoTDBConfig {
     this.flushProportion = flushProportion;
   }
 
+  public double getForceFlushProportion() {
+    return forceFlushProportion;
+  }
+
+  public void setForceFlushProportion(double forceFlushProportion) {
+    this.forceFlushProportion = forceFlushProportion;
+  }
+
   public double getRejectProportion() {
     return rejectProportion;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d7a7058..ac2c686 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -205,6 +205,9 @@ public class IoTDBDescriptor {
       conf.setFlushProportion(Double.parseDouble(properties.getProperty("flush_proportion",
           Double.toString(conf.getFlushProportion()))));
 
+      conf.setForceFlushProportion(Double.parseDouble(properties.getProperty("force_flush_proportion",
+          Double.toString(conf.getForceFlushProportion()))));
+
       conf.setRejectProportion(Double.parseDouble(properties.getProperty("reject_proportion",
           Double.toString(conf.getRejectProportion()))));
 
@@ -295,6 +298,14 @@ public class IoTDBDescriptor {
           .getProperty("estimated_series_size",
               Integer.toString(conf.getEstimatedSeriesSize()))));
 
+      conf.setEnableSlidingMemTable(Boolean.parseBoolean(properties
+          .getProperty("enable_sliding_mem_table",
+              Boolean.toString(conf.isEnableSlidingMemTable()))));
+
+      conf.setFlushWaitTime(Integer.parseInt(properties
+          .getProperty("flush_wait_time",
+              Integer.toString(conf.getFlushWaitTime()))));
+
       conf.setMergeChunkPointNumberThreshold(Integer.parseInt(properties
           .getProperty("merge_chunk_point_number",
               Integer.toString(conf.getMergeChunkPointNumberThreshold()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index 0d49f2c..b102d03 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -19,11 +19,14 @@
 package org.apache.iotdb.db.engine.flush;
 
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.ServiceType;
@@ -86,6 +89,24 @@ public class FlushManager implements FlushManagerMBean, IService {
     @Override
     public void runMayThrow() {
       TsFileProcessor tsFileProcessor = tsFileProcessorQueue.poll();
+      if (tsFileProcessor.isSequence() && IoTDBDescriptor.getInstance().getConfig()
+          .isEnableSlidingMemTable() && !SystemInfo.getInstance().forceFlush()) {
+        long startTime = System.currentTimeMillis();
+        while (!tsFileProcessor.isShouldClose()
+            && System.currentTimeMillis() - startTime < IoTDBDescriptor.getInstance().getConfig()
+            .getFlushWaitTime()) {
+          // wait
+          try {
+            TimeUnit.MILLISECONDS
+                .sleep(IoTDBDescriptor.getInstance().getConfig().getWaitingTimeWhenInsertBlocked());
+          } catch (InterruptedException e) {
+            logger.error("flush mem table wait error", e);
+          }
+        }
+      }
+      tsFileProcessor.setFlushingMemTable(null);
+      tsFileProcessor.setFlushMemTableAlive(false);
+      tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor);
       tsFileProcessor.flushOneMemTable();
       tsFileProcessor.setManagedByFlushManager(false);
       if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2b49583..b431823 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -148,6 +148,11 @@ public class StorageGroupProcessor {
    */
   private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
   /**
+   * a read write lock for guaranteeing concurrent safety when latestTimeForEachDevice is updated
+   * using flushingLatestTimeForEachDevice
+   */
+  private final ReadWriteLock flushTimeUpdateLock = new ReentrantReadWriteLock();
+  /**
    * closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done.
    */
   private final Object closeStorageGroupCondition = new Object();
@@ -177,10 +182,15 @@ public class StorageGroupProcessor {
 
   private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
   /*
+   * time partition id -> map, when a memory table is marked as to be flush, use latestTimeForEachDevice
+   * to cache the flushing time bound, and is used to update partitionLatestFlushedTimeForEachDevice
+   * when a flush is actually issued.
+   */
+  private Map<Long, Map<String, Long>> flushingLatestTimeForEachDevice = new HashMap<>();
+  /*
    * time partition id -> map, which contains
    * device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
-   * changes upon timestamps of each device, and is used to update partitionLatestFlushedTimeForEachDevice
-   * when a flush is issued.
+   * changes upon timestamps of each device
    */
   private Map<Long, Map<String, Long>> latestTimeForEachDevice = new HashMap<>();
   /**
@@ -653,6 +663,7 @@ public class StorageGroupProcessor {
       throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
     }
     writeLock();
+    flushUpdateLock();
     try {
       // init map
       long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime());
@@ -676,6 +687,7 @@ public class StorageGroupProcessor {
 
     } finally {
       writeUnlock();
+      flushUpdateUnLock();
     }
   }
 
@@ -869,6 +881,10 @@ public class StorageGroupProcessor {
       return;
     }
 
+    if (sequence && config.isEnableSlidingMemTable()) {
+      insertRowPlan
+          .setToFlushingMemTable(isInsertToFlushingMemTable(timePartitionId, insertRowPlan));
+    }
     // insert TsFileProcessor
     tsFileProcessor.insert(insertRowPlan);
 
@@ -891,6 +907,17 @@ public class StorageGroupProcessor {
     }
   }
 
+  /**
+   * judge whether a insert plan should be inserted into the flushingMemtable
+   */
+  private boolean isInsertToFlushingMemTable(long timePartitionId, InsertRowPlan insertRowPlan){
+    if (!flushingLatestTimeForEachDevice.containsKey(timePartitionId)){
+      return false;
+    }
+    return flushingLatestTimeForEachDevice.get(timePartitionId).
+        getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE) > insertRowPlan.getTime();
+  }
+
   private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) {
     if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
       return;
@@ -916,7 +943,7 @@ public class StorageGroupProcessor {
   public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
     writeLock();
     try {
-      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) && 
+      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
           !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
         fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
       }
@@ -1092,7 +1119,7 @@ public class StorageGroupProcessor {
   public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
     //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
     //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
-    if (closingSequenceTsFileProcessor.contains(tsFileProcessor) || 
+    if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
         closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
       return;
     }
@@ -1372,6 +1399,17 @@ public class StorageGroupProcessor {
     insertLock.writeLock().unlock();
   }
 
+  public void flushUpdateLock(){
+    if (config.isEnableSlidingMemTable()) {
+      flushTimeUpdateLock.writeLock().lock();
+    }
+  }
+
+  public void flushUpdateUnLock(){
+    if (config.isEnableSlidingMemTable()) {
+      flushTimeUpdateLock.writeLock().unlock();
+    }
+  }
 
   /**
    * @param tsFileResources includes sealed and unsealed tsfile resources
@@ -1616,27 +1654,44 @@ public class StorageGroupProcessor {
   }
 
   private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
-    // update the largest timestamp in the last flushing memtable
-    Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
-        .get(processor.getTimeRangeId());
-
-    if (curPartitionDeviceLatestTime == null) {
-      logger.warn("Partition: {} does't have latest time for each device. "
-              + "No valid record is written into memtable. Flushing tsfile is: {}",
-          processor.getTimeRangeId(), processor.getTsFileResource().getTsFile());
-      return false;
-    }
+    flushUpdateLock();
+    try {
+      // update the largest timestamp in the last flushing memtable
+      Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
+          .get(processor.getTimeRangeId());
+
+      if (curPartitionDeviceLatestTime == null) {
+        logger.warn("Partition: {} does't have latest time for each device. "
+                + "No valid record is written into memtable. Flushing tsfile is: {}",
+            processor.getTimeRangeId(), processor.getTsFileResource().getTsFile());
+        return false;
+      }
 
-    for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
-      partitionLatestFlushedTimeForEachDevice
-          .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
-          .put(entry.getKey(), entry.getValue());
-      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
-          entry.getKey(), entry.getValue());
-      if (globalLatestFlushedTimeForEachDevice
-          .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
-        globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+      if (processor.isFlushMemTableAlive()) {
+        for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+          flushingLatestTimeForEachDevice
+              .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+              .put(entry.getKey(), entry.getValue());
+        }
+      } else {
+        if (processor.isSequence()) {
+          curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
+              .get(processor.getTimeRangeId());
+        }
+        for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+          partitionLatestFlushedTimeForEachDevice
+              .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+              .put(entry.getKey(), entry.getValue());
+          updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+              entry.getKey(), entry.getValue());
+          if (globalLatestFlushedTimeForEachDevice
+              .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+            globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+          }
+        }
       }
+    } finally {
+      flushUpdateUnLock();
     }
     return true;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b810774..32dec07 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -101,13 +100,18 @@ public class TsFileProcessor {
    */
   private volatile boolean managedByFlushManager;
   private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
+
   /**
    * It is set by the StorageGroupProcessor and checked by flush threads. (If shouldClose == true
    * and its flushingMemTables are all flushed, then the flush thread will close this file.)
    */
   private volatile boolean shouldClose;
+
+  private IMemTable flushingMemTable;
   private IMemTable workMemTable;
 
+  private boolean isFlushMemTableAlive = false;
+
   private final VersionController versionController;
 
   /**
@@ -180,8 +184,11 @@ public class TsFileProcessor {
       blockInsertionIfReject();
       checkMemCostAndAddToTspInfo(insertRowPlan);
     }
-
-    workMemTable.insert(insertRowPlan);
+    if (isFlushMemTableAlive && insertRowPlan.isToFlushingMemTable()){
+      flushingMemTable.insert(insertRowPlan);
+    } else {
+      workMemTable.insert(insertRowPlan);
+    }
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       try {
         getLogNode().write(insertRowPlan);
@@ -258,7 +265,7 @@ public class TsFileProcessor {
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
     String deviceId = insertRowPlan.getDeviceId().getFullPath();
-    long unsealedResourceIncrement = 
+    long unsealedResourceIncrement =
         tsFileResource.estimateRamIncrement(deviceId);
     for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
       // skip failed Measurements
@@ -496,11 +503,12 @@ public class TsFileProcessor {
     try {
 
       if (logger.isInfoEnabled()) {
-        if (workMemTable != null) {
+        if (flushingMemTable != null || workMemTable != null) {
           logger.info(
-              "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile size: {}",
+              "{}: flush a memtable in async close tsfile {}, flushing memtable size: {}, "
+                  + "working memtable size: {}, tsfile size: {}",
               storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
-              workMemTable.memSize(),
+              flushingMemTable == null ? 0 : flushingMemTable.memSize(), workMemTable.memSize(),
               tsFileResource.getTsFileSize());
         } else {
           logger.info("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
@@ -522,14 +530,18 @@ public class TsFileProcessor {
 
       // we have to add the memtable into flushingList first and then set the shouldClose tag.
       // see https://issues.apache.org/jira/browse/IOTDB-510
-      IMemTable tmpMemTable = workMemTable == null || workMemTable.memSize() == 0 
-          ? new NotifyFlushMemTable() 
+//      IMemTable tmpFlushMemTable = flushingMemTable == null || flushingMemTable.memSize() == 0
+//          ? new NotifyFlushMemTable()
+//          : flushingMemTable;
+      IMemTable tmpWorkMemTable = workMemTable == null || workMemTable.memSize() == 0
+          ? new NotifyFlushMemTable()
           : workMemTable;
 
       try {
         // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
         // flushing memTable in System module.
-        addAMemtableIntoFlushingList(tmpMemTable);
+//        addAMemtableIntoFlushingList(tmpFlushMemTable);
+        addAMemtableIntoFlushingList(tmpWorkMemTable);
         shouldClose = true;
         tsFileResource.setCloseFlag();
       } catch (Exception e) {
@@ -557,7 +569,11 @@ public class TsFileProcessor {
           .debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
     }
     try {
-      tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
+      if (flushingMemTable == null) {
+        tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
+      } else {
+        tmpMemTable = flushingMemTable;
+      }
       if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
         logger.debug("{}: {} add a signal memtable into flushing memtable list when sync flush",
             storageGroupName, tsFileResource.getTsFile().getName());
@@ -606,6 +622,11 @@ public class TsFileProcessor {
       }
       logger.info("Async flush a memtable to tsfile: {}",
           tsFileResource.getTsFile().getAbsolutePath());
+      if (config.isEnableSlidingMemTable()){
+        while (flushingMemTable != null) {
+          TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+        }
+      }
       addAMemtableIntoFlushingList(workMemTable);
     } catch (Exception e) {
       logger.error("{}: {} add a memtable into flushing list failed", storageGroupName,
@@ -625,8 +646,7 @@ public class TsFileProcessor {
    * flushManager again.
    */
   private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
-    if (!tobeFlushed.isSignalMemTable() &&
-        (!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0)) {
+    if (!tobeFlushed.isSignalMemTable() && tobeFlushed.memSize() == 0) {
       logger.warn("This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
           storageGroupName, tsFileResource.getTsFile().getName(), tobeFlushed.getMemTableMap());
       return;
@@ -649,12 +669,16 @@ public class TsFileProcessor {
     if (!tobeFlushed.isSignalMemTable()) {
       totalMemTableSize += tobeFlushed.memSize();
     }
+    if (sequence && config.isEnableSlidingMemTable()) {
+      flushingMemTable = workMemTable;
+      isFlushMemTableAlive = true;
+    }
+    updateLatestFlushTimeCallback.call(this);
     workMemTable = null;
     shouldFlush = false;
     FlushManager.getInstance().registerTsFileProcessor(this);
   }
 
-
   /**
    * put back the memtable to MemTablePool and make metadata in writer visible
    */
@@ -1004,4 +1028,25 @@ public class TsFileProcessor {
   public void addCloseFileListeners(Collection<CloseFileListener> listeners) {
     closeFileListeners.addAll(listeners);
   }
+
+  public void setFlushingMemTable(IMemTable flushingMemTable) {
+    this.flushingMemTable = flushingMemTable;
+  }
+
+
+  public UpdateEndTimeCallBack getUpdateLatestFlushTimeCallback() {
+    return updateLatestFlushTimeCallback;
+  }
+
+  public boolean isFlushMemTableAlive() {
+    return isFlushMemTableAlive;
+  }
+
+  public void setFlushMemTableAlive(boolean flushMemTableAlive) {
+    isFlushMemTableAlive = flushMemTableAlive;
+  }
+
+  public boolean isShouldClose() {
+    return shouldClose;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index b41311e..65ee5fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -61,6 +61,9 @@ public class InsertRowPlan extends InsertPlan {
 
   private List<Object> failedValues;
 
+  // judge whether a insert should use flushingMemTable or not
+  private boolean toFlushingMemTable = false;
+
   public InsertRowPlan() {
     super(OperatorType.INSERT);
   }
@@ -464,4 +467,12 @@ public class InsertRowPlan extends InsertPlan {
     failedValues = null;
     return this;
   }
+
+  public boolean isToFlushingMemTable() {
+    return toFlushingMemTable;
+  }
+
+  public void setToFlushingMemTable(boolean toFlushingMemTable) {
+    this.toFlushingMemTable = toFlushingMemTable;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 74c00f0..c7eacff 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -45,6 +45,7 @@ public class SystemInfo {
   private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new ConcurrentHashMap<>();
 
   private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double FORCE_FLUSH_PROPORTION = config.getForceFlushProportion();
   private static final double REJECT_PROPORTION = config.getRejectProportion();
 
   /**
@@ -202,4 +203,8 @@ public class SystemInfo {
 
     private static SystemInfo instance = new SystemInfo();
   }
+
+  public boolean forceFlush(){
+    return totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FORCE_FLUSH_PROPORTION;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 6e0efbc..87bf8c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -137,6 +137,31 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
+  public void testSlidingMemTable()
+      throws WriteProcessException, IOException, MetadataException {
+    TSRecord record;
+
+    for (int j = 5; j <= 10; j++) {
+      record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      processor.insert(new InsertRowPlan(record));
+    }
+
+    for (TsFileProcessor tsfileProcessor : processor.getWorkSequenceTsFileProcessors()) {
+      tsfileProcessor.asyncFlush();
+    }
+
+    for (int j = 1; j <= 3; j++) {
+      record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      processor.insert(new InsertRowPlan(record));
+    }
+//    System.exit(0);
+    processor.syncCloseAllWorkingTsFileProcessors();
+
+  }
+
+  @Test
   public void testSequenceSyncClose()
       throws WriteProcessException, QueryProcessException, IllegalPathException {
     for (int j = 1; j <= 10; j++) {