You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/12/04 09:58:24 UTC

[iotdb] 01/01: finish sliding window version 1

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dev_sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5207bace0f75bc1beccb33994d5b9a6c7a81d464
Author: EJTTianyu <16...@qq.com>
AuthorDate: Fri Dec 4 17:57:47 2020 +0800

    finish sliding window version 1
---
 .../apache/iotdb/tsfile/TsFileSequenceRead.java    |   2 +-
 .../resources/conf/iotdb-engine.properties         |  13 +++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  39 ++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  11 +++
 .../apache/iotdb/db/engine/flush/FlushManager.java |  21 +++++
 .../engine/storagegroup/StorageGroupProcessor.java | 101 ++++++++++++++++-----
 .../db/engine/storagegroup/TsFileProcessor.java    |  73 ++++++++++++---
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  11 +++
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |   5 +
 .../storagegroup/StorageGroupProcessorTest.java    |  25 +++++
 10 files changed, 263 insertions(+), 38 deletions(-)

diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index da5bae6..4cbf862 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -41,7 +41,7 @@ public class TsFileSequenceRead {
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public static void main(String[] args) throws IOException {
-    String filename = "test.tsfile";
+    String filename = "/Users/tianyu/2019秋季学期/iotdb/server/target/data/sequence/root.vehicle.d0/0/1607075644401-1-0.tsfile";
     if (args.length >= 1) {
       filename = args[0];
     }
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index b0a691b..1f296ef 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -232,6 +232,9 @@ primitive_array_size=128
 # Ratio of write memory for invoking flush disk, 0.3 by default
 flush_proportion=0.3
 
+# Ratio of write memory for invoking flush immediately, 0.6 by default
+force_flush_proportion=0.6
+
 # Ratio of write memory allocated for buffered arrays, 0.6 by default
 buffered_arrays_memory_proportion=0.6
 
@@ -255,6 +258,16 @@ max_waiting_time_when_insert_blocked=10000
 estimated_series_size=300
 
 ####################
+### Sliding Memory Table Configurations
+####################
+
+# Whether to enable sliding memory table
+enable_sliding_mem_table=true
+
+# Save the flushing memtable in the memory during the period, can help reduce the unseq ratio, Unit: millis.
+flush_wait_time=60000
+
+####################
 ### Upgrade Configurations
 ####################
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 72718e2..a337152 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -152,6 +152,11 @@ public class IoTDBConfig {
   private double flushProportion = 0.3;
 
   /**
+   * Force flush proportion for system
+   */
+  private double forceFlushProportion = 0.6;
+
+  /**
    * Reject proportion for system
    */
   private double rejectProportion = 0.8;
@@ -205,6 +210,16 @@ public class IoTDBConfig {
   private int estimatedSeriesSize = 300;
 
   /**
+   * Whether to enable sliding memory table
+   */
+  private boolean enableSlidingMemTable = true;
+
+  /**
+   * Save the flushing memtable in the memory during the period, can help reduce the unseq ratio, Unit: millis.
+   */
+  private int flushWaitTime = 60000;
+
+  /**
    * default base dir, stores all IoTDB runtime files
    */
   private static final String DEFAULT_BASE_DIR = "data";
@@ -989,6 +1004,22 @@ public class IoTDBConfig {
     this.forceWalPeriodInMs = forceWalPeriodInMs;
   }
 
+  public boolean isEnableSlidingMemTable() {
+    return enableSlidingMemTable;
+  }
+
+  public void setEnableSlidingMemTable(boolean enableSlidingMemTable) {
+    this.enableSlidingMemTable = enableSlidingMemTable;
+  }
+
+  public int getFlushWaitTime() {
+    return flushWaitTime;
+  }
+
+  public void setFlushWaitTime(int flushWaitTime) {
+    this.flushWaitTime = flushWaitTime;
+  }
+
   public String getSystemDir() {
     return systemDir;
   }
@@ -1261,6 +1292,14 @@ public class IoTDBConfig {
     this.flushProportion = flushProportion;
   }
 
+  public double getForceFlushProportion() {
+    return forceFlushProportion;
+  }
+
+  public void setForceFlushProportion(double forceFlushProportion) {
+    this.forceFlushProportion = forceFlushProportion;
+  }
+
   public double getRejectProportion() {
     return rejectProportion;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d7a7058..ac2c686 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -205,6 +205,9 @@ public class IoTDBDescriptor {
       conf.setFlushProportion(Double.parseDouble(properties.getProperty("flush_proportion",
           Double.toString(conf.getFlushProportion()))));
 
+      conf.setForceFlushProportion(Double.parseDouble(properties.getProperty("force_flush_proportion",
+          Double.toString(conf.getForceFlushProportion()))));
+
       conf.setRejectProportion(Double.parseDouble(properties.getProperty("reject_proportion",
           Double.toString(conf.getRejectProportion()))));
 
@@ -295,6 +298,14 @@ public class IoTDBDescriptor {
           .getProperty("estimated_series_size",
               Integer.toString(conf.getEstimatedSeriesSize()))));
 
+      conf.setEnableSlidingMemTable(Boolean.parseBoolean(properties
+          .getProperty("enable_sliding_mem_table",
+              Boolean.toString(conf.isEnableSlidingMemTable()))));
+
+      conf.setFlushWaitTime(Integer.parseInt(properties
+          .getProperty("flush_wait_time",
+              Integer.toString(conf.getFlushWaitTime()))));
+
       conf.setMergeChunkPointNumberThreshold(Integer.parseInt(properties
           .getProperty("merge_chunk_point_number",
               Integer.toString(conf.getMergeChunkPointNumberThreshold()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index 0d49f2c..b102d03 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -19,11 +19,14 @@
 package org.apache.iotdb.db.engine.flush;
 
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.ServiceType;
@@ -86,6 +89,24 @@ public class FlushManager implements FlushManagerMBean, IService {
     @Override
     public void runMayThrow() {
       TsFileProcessor tsFileProcessor = tsFileProcessorQueue.poll();
+      if (tsFileProcessor.isSequence() && IoTDBDescriptor.getInstance().getConfig()
+          .isEnableSlidingMemTable() && !SystemInfo.getInstance().forceFlush()) {
+        long startTime = System.currentTimeMillis();
+        while (!tsFileProcessor.isShouldClose()
+            && System.currentTimeMillis() - startTime < IoTDBDescriptor.getInstance().getConfig()
+            .getFlushWaitTime()) {
+          // wait
+          try {
+            TimeUnit.MILLISECONDS
+                .sleep(IoTDBDescriptor.getInstance().getConfig().getWaitingTimeWhenInsertBlocked());
+          } catch (InterruptedException e) {
+            logger.error("flush mem table wait error", e);
+          }
+        }
+      }
+      tsFileProcessor.setFlushingMemTable(null);
+      tsFileProcessor.setFlushMemTableAlive(false);
+      tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor);
       tsFileProcessor.flushOneMemTable();
       tsFileProcessor.setManagedByFlushManager(false);
       if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2b49583..b431823 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -148,6 +148,11 @@ public class StorageGroupProcessor {
    */
   private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
   /**
+   * a read write lock for guaranteeing concurrent safety when latestTimeForEachDevice is updated
+   * using flushingLatestTimeForEachDevice
+   */
+  private final ReadWriteLock flushTimeUpdateLock = new ReentrantReadWriteLock();
+  /**
    * closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done.
    */
   private final Object closeStorageGroupCondition = new Object();
@@ -177,10 +182,15 @@ public class StorageGroupProcessor {
 
   private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
   /*
+   * time partition id -> map, when a memory table is marked as to be flush, use latestTimeForEachDevice
+   * to cache the flushing time bound, and is used to update partitionLatestFlushedTimeForEachDevice
+   * when a flush is actually issued.
+   */
+  private Map<Long, Map<String, Long>> flushingLatestTimeForEachDevice = new HashMap<>();
+  /*
    * time partition id -> map, which contains
    * device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
-   * changes upon timestamps of each device, and is used to update partitionLatestFlushedTimeForEachDevice
-   * when a flush is issued.
+   * changes upon timestamps of each device
    */
   private Map<Long, Map<String, Long>> latestTimeForEachDevice = new HashMap<>();
   /**
@@ -653,6 +663,7 @@ public class StorageGroupProcessor {
       throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
     }
     writeLock();
+    flushUpdateLock();
     try {
       // init map
       long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime());
@@ -676,6 +687,7 @@ public class StorageGroupProcessor {
 
     } finally {
       writeUnlock();
+      flushUpdateUnLock();
     }
   }
 
@@ -869,6 +881,10 @@ public class StorageGroupProcessor {
       return;
     }
 
+    if (sequence && config.isEnableSlidingMemTable()) {
+      insertRowPlan
+          .setToFlushingMemTable(isInsertToFlushingMemTable(timePartitionId, insertRowPlan));
+    }
     // insert TsFileProcessor
     tsFileProcessor.insert(insertRowPlan);
 
@@ -891,6 +907,17 @@ public class StorageGroupProcessor {
     }
   }
 
+  /**
+   * judge whether a insert plan should be inserted into the flushingMemtable
+   */
+  private boolean isInsertToFlushingMemTable(long timePartitionId, InsertRowPlan insertRowPlan){
+    if (!flushingLatestTimeForEachDevice.containsKey(timePartitionId)){
+      return false;
+    }
+    return flushingLatestTimeForEachDevice.get(timePartitionId).
+        getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE) > insertRowPlan.getTime();
+  }
+
   private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) {
     if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
       return;
@@ -916,7 +943,7 @@ public class StorageGroupProcessor {
   public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
     writeLock();
     try {
-      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) && 
+      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
           !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
         fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
       }
@@ -1092,7 +1119,7 @@ public class StorageGroupProcessor {
   public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
     //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
     //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
-    if (closingSequenceTsFileProcessor.contains(tsFileProcessor) || 
+    if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
         closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
       return;
     }
@@ -1372,6 +1399,17 @@ public class StorageGroupProcessor {
     insertLock.writeLock().unlock();
   }
 
+  public void flushUpdateLock(){
+    if (config.isEnableSlidingMemTable()) {
+      flushTimeUpdateLock.writeLock().lock();
+    }
+  }
+
+  public void flushUpdateUnLock(){
+    if (config.isEnableSlidingMemTable()) {
+      flushTimeUpdateLock.writeLock().unlock();
+    }
+  }
 
   /**
    * @param tsFileResources includes sealed and unsealed tsfile resources
@@ -1616,27 +1654,44 @@ public class StorageGroupProcessor {
   }
 
   private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
-    // update the largest timestamp in the last flushing memtable
-    Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
-        .get(processor.getTimeRangeId());
-
-    if (curPartitionDeviceLatestTime == null) {
-      logger.warn("Partition: {} does't have latest time for each device. "
-              + "No valid record is written into memtable. Flushing tsfile is: {}",
-          processor.getTimeRangeId(), processor.getTsFileResource().getTsFile());
-      return false;
-    }
+    flushUpdateLock();
+    try {
+      // update the largest timestamp in the last flushing memtable
+      Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
+          .get(processor.getTimeRangeId());
+
+      if (curPartitionDeviceLatestTime == null) {
+        logger.warn("Partition: {} does't have latest time for each device. "
+                + "No valid record is written into memtable. Flushing tsfile is: {}",
+            processor.getTimeRangeId(), processor.getTsFileResource().getTsFile());
+        return false;
+      }
 
-    for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
-      partitionLatestFlushedTimeForEachDevice
-          .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
-          .put(entry.getKey(), entry.getValue());
-      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
-          entry.getKey(), entry.getValue());
-      if (globalLatestFlushedTimeForEachDevice
-          .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
-        globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+      if (processor.isFlushMemTableAlive()) {
+        for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+          flushingLatestTimeForEachDevice
+              .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+              .put(entry.getKey(), entry.getValue());
+        }
+      } else {
+        if (processor.isSequence()) {
+          curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
+              .get(processor.getTimeRangeId());
+        }
+        for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+          partitionLatestFlushedTimeForEachDevice
+              .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+              .put(entry.getKey(), entry.getValue());
+          updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+              entry.getKey(), entry.getValue());
+          if (globalLatestFlushedTimeForEachDevice
+              .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+            globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+          }
+        }
       }
+    } finally {
+      flushUpdateUnLock();
     }
     return true;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b810774..32dec07 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -101,13 +100,18 @@ public class TsFileProcessor {
    */
   private volatile boolean managedByFlushManager;
   private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
+
   /**
    * It is set by the StorageGroupProcessor and checked by flush threads. (If shouldClose == true
    * and its flushingMemTables are all flushed, then the flush thread will close this file.)
    */
   private volatile boolean shouldClose;
+
+  private IMemTable flushingMemTable;
   private IMemTable workMemTable;
 
+  private boolean isFlushMemTableAlive = false;
+
   private final VersionController versionController;
 
   /**
@@ -180,8 +184,11 @@ public class TsFileProcessor {
       blockInsertionIfReject();
       checkMemCostAndAddToTspInfo(insertRowPlan);
     }
-
-    workMemTable.insert(insertRowPlan);
+    if (isFlushMemTableAlive && insertRowPlan.isToFlushingMemTable()){
+      flushingMemTable.insert(insertRowPlan);
+    } else {
+      workMemTable.insert(insertRowPlan);
+    }
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       try {
         getLogNode().write(insertRowPlan);
@@ -258,7 +265,7 @@ public class TsFileProcessor {
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
     String deviceId = insertRowPlan.getDeviceId().getFullPath();
-    long unsealedResourceIncrement = 
+    long unsealedResourceIncrement =
         tsFileResource.estimateRamIncrement(deviceId);
     for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
       // skip failed Measurements
@@ -496,11 +503,12 @@ public class TsFileProcessor {
     try {
 
       if (logger.isInfoEnabled()) {
-        if (workMemTable != null) {
+        if (flushingMemTable != null || workMemTable != null) {
           logger.info(
-              "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile size: {}",
+              "{}: flush a memtable in async close tsfile {}, flushing memtable size: {}, "
+                  + "working memtable size: {}, tsfile size: {}",
               storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
-              workMemTable.memSize(),
+              flushingMemTable == null ? 0 : flushingMemTable.memSize(), workMemTable.memSize(),
               tsFileResource.getTsFileSize());
         } else {
           logger.info("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
@@ -522,14 +530,18 @@ public class TsFileProcessor {
 
       // we have to add the memtable into flushingList first and then set the shouldClose tag.
       // see https://issues.apache.org/jira/browse/IOTDB-510
-      IMemTable tmpMemTable = workMemTable == null || workMemTable.memSize() == 0 
-          ? new NotifyFlushMemTable() 
+//      IMemTable tmpFlushMemTable = flushingMemTable == null || flushingMemTable.memSize() == 0
+//          ? new NotifyFlushMemTable()
+//          : flushingMemTable;
+      IMemTable tmpWorkMemTable = workMemTable == null || workMemTable.memSize() == 0
+          ? new NotifyFlushMemTable()
           : workMemTable;
 
       try {
         // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
         // flushing memTable in System module.
-        addAMemtableIntoFlushingList(tmpMemTable);
+//        addAMemtableIntoFlushingList(tmpFlushMemTable);
+        addAMemtableIntoFlushingList(tmpWorkMemTable);
         shouldClose = true;
         tsFileResource.setCloseFlag();
       } catch (Exception e) {
@@ -557,7 +569,11 @@ public class TsFileProcessor {
           .debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
     }
     try {
-      tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
+      if (flushingMemTable == null) {
+        tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
+      } else {
+        tmpMemTable = flushingMemTable;
+      }
       if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
         logger.debug("{}: {} add a signal memtable into flushing memtable list when sync flush",
             storageGroupName, tsFileResource.getTsFile().getName());
@@ -606,6 +622,11 @@ public class TsFileProcessor {
       }
       logger.info("Async flush a memtable to tsfile: {}",
           tsFileResource.getTsFile().getAbsolutePath());
+      if (config.isEnableSlidingMemTable()){
+        while (flushingMemTable != null) {
+          TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+        }
+      }
       addAMemtableIntoFlushingList(workMemTable);
     } catch (Exception e) {
       logger.error("{}: {} add a memtable into flushing list failed", storageGroupName,
@@ -625,8 +646,7 @@ public class TsFileProcessor {
    * flushManager again.
    */
   private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
-    if (!tobeFlushed.isSignalMemTable() &&
-        (!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0)) {
+    if (!tobeFlushed.isSignalMemTable() && tobeFlushed.memSize() == 0) {
       logger.warn("This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
           storageGroupName, tsFileResource.getTsFile().getName(), tobeFlushed.getMemTableMap());
       return;
@@ -649,12 +669,16 @@ public class TsFileProcessor {
     if (!tobeFlushed.isSignalMemTable()) {
       totalMemTableSize += tobeFlushed.memSize();
     }
+    if (sequence && config.isEnableSlidingMemTable()) {
+      flushingMemTable = workMemTable;
+      isFlushMemTableAlive = true;
+    }
+    updateLatestFlushTimeCallback.call(this);
     workMemTable = null;
     shouldFlush = false;
     FlushManager.getInstance().registerTsFileProcessor(this);
   }
 
-
   /**
    * put back the memtable to MemTablePool and make metadata in writer visible
    */
@@ -1004,4 +1028,25 @@ public class TsFileProcessor {
   public void addCloseFileListeners(Collection<CloseFileListener> listeners) {
     closeFileListeners.addAll(listeners);
   }
+
+  public void setFlushingMemTable(IMemTable flushingMemTable) {
+    this.flushingMemTable = flushingMemTable;
+  }
+
+
+  public UpdateEndTimeCallBack getUpdateLatestFlushTimeCallback() {
+    return updateLatestFlushTimeCallback;
+  }
+
+  public boolean isFlushMemTableAlive() {
+    return isFlushMemTableAlive;
+  }
+
+  public void setFlushMemTableAlive(boolean flushMemTableAlive) {
+    isFlushMemTableAlive = flushMemTableAlive;
+  }
+
+  public boolean isShouldClose() {
+    return shouldClose;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index b41311e..65ee5fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -61,6 +61,9 @@ public class InsertRowPlan extends InsertPlan {
 
   private List<Object> failedValues;
 
+  // judge whether a insert should use flushingMemTable or not
+  private boolean toFlushingMemTable = false;
+
   public InsertRowPlan() {
     super(OperatorType.INSERT);
   }
@@ -464,4 +467,12 @@ public class InsertRowPlan extends InsertPlan {
     failedValues = null;
     return this;
   }
+
+  public boolean isToFlushingMemTable() {
+    return toFlushingMemTable;
+  }
+
+  public void setToFlushingMemTable(boolean toFlushingMemTable) {
+    this.toFlushingMemTable = toFlushingMemTable;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 74c00f0..c7eacff 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -45,6 +45,7 @@ public class SystemInfo {
   private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new ConcurrentHashMap<>();
 
   private static final double FLUSH_PROPORTION = config.getFlushProportion();
+  private static final double FORCE_FLUSH_PROPORTION = config.getForceFlushProportion();
   private static final double REJECT_PROPORTION = config.getRejectProportion();
 
   /**
@@ -202,4 +203,8 @@ public class SystemInfo {
 
     private static SystemInfo instance = new SystemInfo();
   }
+
+  public boolean forceFlush(){
+    return totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FORCE_FLUSH_PROPORTION;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 6e0efbc..87bf8c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -137,6 +137,31 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
+  public void testSlidingMemTable()
+      throws WriteProcessException, IOException, MetadataException {
+    TSRecord record;
+
+    for (int j = 5; j <= 10; j++) {
+      record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      processor.insert(new InsertRowPlan(record));
+    }
+
+    for (TsFileProcessor tsfileProcessor : processor.getWorkSequenceTsFileProcessors()) {
+      tsfileProcessor.asyncFlush();
+    }
+
+    for (int j = 1; j <= 3; j++) {
+      record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      processor.insert(new InsertRowPlan(record));
+    }
+//    System.exit(0);
+    processor.syncCloseAllWorkingTsFileProcessors();
+
+  }
+
+  @Test
   public void testSequenceSyncClose()
       throws WriteProcessException, QueryProcessException, IllegalPathException {
     for (int j = 1; j <= 10; j++) {