You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/02/21 07:08:59 UTC

[incubator-iotdb] branch master updated: Fix add future for flush operation (#43)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b534412  Fix add future for flush operation (#43)
b534412 is described below

commit b534412aba1cf4143995e578c593aae7402bb190
Author: Xiangdong Huang <hx...@qq.com>
AuthorDate: Thu Feb 21 15:08:55 2019 +0800

    Fix add future for flush operation (#43)
    
    * add Future for async flushing memtable, replace FlushStatus by Future
    
    * using flushMemTable!=null to determine whehter a flushTask is running rather than using future.isDone()
    
    * replace assertEquals with assertTrue or assertFalse; add logger.error for some exception; change the time unit of lastFlushTime as nanoTime
---
 .../java/org/apache/iotdb/db/engine/Processor.java |   8 +-
 .../engine/bufferwrite/BufferWriteProcessor.java   | 181 +++++++++---------
 .../db/engine/filenode/FileNodeFlushFuture.java    |  91 +++++++++
 .../iotdb/db/engine/filenode/FileNodeManager.java  |   4 +-
 .../db/engine/filenode/FileNodeProcessor.java      |  12 +-
 .../db/engine/overflow/io/OverflowProcessor.java   | 211 +++++++++++----------
 .../apache/iotdb/db/engine/pool/FlushManager.java  |  10 +-
 .../apache/iotdb/db/qp/constant/DatetimeUtils.java |   6 +
 .../ImmediateFuture.java}                          |  43 +++--
 .../org/apache/iotdb/db/engine/ProcessorTest.java  |   7 +-
 .../bufferwrite/BufferWriteProcessorNewTest.java   |  37 ++--
 .../bufferwrite/BufferWriteProcessorTest.java      |  48 +++--
 .../engine/overflow/io/OverflowProcessorTest.java  |   1 -
 13 files changed, 421 insertions(+), 238 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index 57e1523..cba7463 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
@@ -170,7 +171,12 @@ public abstract class Processor {
    */
   public abstract boolean canBeClosed();
 
-  public abstract boolean flush() throws IOException;
+  /**
+   * call flush operation asynchronously
+   * @return a future that returns true if successfully, otherwise false.
+   * @throws IOException
+   */
+  public abstract Future<Boolean> flush() throws IOException;
 
   /**
    * Close the processor.<br>
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 13dcf57..e2cf818 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -20,10 +20,9 @@ package org.apache.iotdb.db.engine.bufferwrite;
 
 import java.io.File;
 import java.io.IOException;
-import java.time.Instant;
-import java.time.ZonedDateTime;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
@@ -39,9 +38,10 @@ import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.utils.FlushStatus;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.qp.constant.DatetimeUtils;
+import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -60,8 +60,7 @@ public class BufferWriteProcessor extends Processor {
   private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessor.class);
   private RestorableTsFileIOWriter writer;
   private FileSchema fileSchema;
-  private volatile FlushStatus flushStatus = new FlushStatus();
-  private volatile boolean isFlush;
+  private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
   private ReentrantLock flushQueryLock = new ReentrantLock();
   private AtomicLong memSize = new AtomicLong();
   private long memThreshold = TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
@@ -71,6 +70,7 @@ public class BufferWriteProcessor extends Processor {
   private Action bufferwriteCloseAction;
   private Action filenodeFlushAction;
 
+  //lastFlushTime time unit: nanosecond
   private long lastFlushTime = -1;
   private long valueCount = 0;
 
@@ -224,7 +224,7 @@ public class BufferWriteProcessor extends Processor {
     flushQueryLock.lock();
     try {
       MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
-      if (isFlush) {
+      if (flushMemTable != null) {
         memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId, measurementId, dataType));
       }
       memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, measurementId, dataType));
@@ -244,7 +244,6 @@ public class BufferWriteProcessor extends Processor {
         workMemTable = new PrimitiveMemTable();
       }
     } finally {
-      isFlush = true;
       flushQueryLock.unlock();
     }
   }
@@ -256,15 +255,24 @@ public class BufferWriteProcessor extends Processor {
       flushMemTable = null;
       writer.appendMetadata();
     } finally {
-      isFlush = false;
       flushQueryLock.unlock();
     }
   }
 
-  private void flushOperation(String flushFunction, long version) {
+
+  /**
+   * the caller mast guarantee no other concurrent caller entering this function.
+   *
+   * @param displayMessage message that will appear in system log.
+   * @param version the operation version that will tagged on the to be flushed memtable
+   * (i.e., ChunkGroup)
+   * @return true if successfully.
+   */
+  private boolean flushTask(String displayMessage, long version) {
+    boolean result;
     long flushStartTime = System.currentTimeMillis();
     LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(),
-        flushFunction);
+        displayMessage);
     try {
       if (flushMemTable != null && !flushMemTable.isEmpty()) {
         // flush data
@@ -278,64 +286,56 @@ public class BufferWriteProcessor extends Processor {
       if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
         logNode.notifyEndFlush(null);
       }
-    } catch (IOException e) {
-      LOGGER.error("The bufferwrite processor {} failed to flush {}.", getProcessorName(),
-          flushFunction, e);
+      result = true;
     } catch (Exception e) {
       LOGGER.error(
           "The bufferwrite processor {} failed to flush {}, when calling the filenodeFlushAction.",
-          getProcessorName(), flushFunction, e);
+          getProcessorName(), displayMessage, e);
+      result = false;
     } finally {
-      synchronized (flushStatus) {
-        flushStatus.setUnFlushing();
-        switchFlushToWork();
-        flushStatus.notifyAll();
-        LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(),
-            flushFunction);
-      }
+      switchFlushToWork();
+      LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(),
+            displayMessage);
+    }
+    if (LOGGER.isInfoEnabled()) {
+      long flushEndTime = System.currentTimeMillis();
+      LOGGER.info(
+          "The bufferwrite processor {} flush {}, start time is {}, flush end time is {}, "
+              + "flush time consumption is {}ms",
+          getProcessorName(), displayMessage,
+          DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
+          DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
+          flushEndTime - flushStartTime);
     }
-    long flushEndTime = System.currentTimeMillis();
-    long flushInterval = flushEndTime - flushStartTime;
-    ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushStartTime),
-        IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushEndTime),
-        IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    LOGGER.info(
-        "The bufferwrite processor {} flush {}, start time is {}, flush end time is {}, "
-            + "flush time consumption is {}ms",
-        getProcessorName(), flushFunction, startDateTime, endDateTime, flushInterval);
+    return result;
   }
 
-  private Future<?> flush(boolean synchronization) throws IOException {
+  // keyword synchronized is added in this method, so that only one flush task can be submitted now.
+  @Override
+  public synchronized Future<Boolean> flush() throws IOException {
     // statistic information for flush
     if (lastFlushTime > 0) {
-      long thisFlushTime = System.currentTimeMillis();
-      long flushTimeInterval = thisFlushTime - lastFlushTime;
-      ZonedDateTime lastDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastFlushTime),
-          IoTDBDescriptor.getInstance().getConfig().getZoneID());
-      ZonedDateTime thisDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(thisFlushTime),
-          IoTDBDescriptor.getInstance().getConfig().getZoneID());
-      LOGGER.info(
-          "The bufferwrite processor {}: last flush time is {}, this flush time is {}, "
-              + "flush time interval is {}s",
-          getProcessorName(), lastDateTime, thisDateTime, flushTimeInterval / 1000);
+      if (LOGGER.isInfoEnabled()) {
+        long thisFlushTime = System.currentTimeMillis();
+        LOGGER.info(
+            "The bufferwrite processor {}: last flush time is {}, this flush time is {}, "
+                + "flush time interval is {}s", getProcessorName(),
+            DatetimeUtils.convertMillsecondToZonedDateTime(lastFlushTime / 1000),
+            DatetimeUtils.convertMillsecondToZonedDateTime(thisFlushTime),
+            (thisFlushTime - lastFlushTime / 1000) / 1000);
+      }
     }
-    lastFlushTime = System.currentTimeMillis();
+    lastFlushTime = System.nanoTime();
     // check value count
     if (valueCount > 0) {
       // waiting for the end of last flush operation.
-      synchronized (flushStatus) {
-        while (flushStatus.isFlushing()) {
-          try {
-            flushStatus.wait();
-          } catch (InterruptedException e) {
-            LOGGER.error(
-                "Encounter an interrupt error when waitting for the flushing, "
-                    + "the bufferwrite processor is {}.",
-                getProcessorName(), e);
-            Thread.currentThread().interrupt();
-          }
-        }
+      try {
+        flushFuture.get();
+      } catch (InterruptedException | ExecutionException e) {
+        LOGGER.error("Encounter an interrupt error when waitting for the flushing, "
+                + "the bufferwrite processor is {}.",
+            getProcessorName(), e);
+        Thread.currentThread().interrupt();
       }
       // update the lastUpdatetime, prepare for flush
       try {
@@ -348,26 +348,17 @@ public class BufferWriteProcessor extends Processor {
         logNode.notifyStartFlush();
       }
       valueCount = 0;
-      flushStatus.setFlushing();
       switchWorkToFlush();
       long version = versionController.nextVersion();
       BasicMemController.getInstance().reportFree(this, memSize.get());
       memSize.set(0);
       // switch
-      if (synchronization) {
-        flushOperation("synchronously", version);
-      } else {
-        FlushManager.getInstance().submit(() -> flushOperation("asynchronously", version));
-      }
+      flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
+          version));
+    } else {
+      flushFuture = new ImmediateFuture<>(true);
     }
-    // TODO return a meaningful Future
-    return null;
-  }
-
-  @Override
-  public boolean flush() throws IOException {
-    flush(false);
-    return false;
+    return flushFuture;
   }
 
   @Override
@@ -379,8 +370,8 @@ public class BufferWriteProcessor extends Processor {
   public void close() throws BufferWriteProcessorException {
     try {
       long closeStartTime = System.currentTimeMillis();
-      // flush data
-      flush(true);
+      // flush data and wait for finishing flush
+      flush().get();
       // end file
       writer.endFile(fileSchema);
       // update the IntervalFile for interval list
@@ -388,16 +379,16 @@ public class BufferWriteProcessor extends Processor {
       // flush the changed information for filenode
       filenodeFlushAction.act();
       // delete the restore for this bufferwrite processor
-      long closeEndTime = System.currentTimeMillis();
-      long closeInterval = closeEndTime - closeStartTime;
-      ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
-          IoTDBDescriptor.getInstance().getConfig().getZoneID());
-      ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeEndTime),
-          IoTDBDescriptor.getInstance().getConfig().getZoneID());
-      LOGGER.info(
-          "Close bufferwrite processor {}, the file name is {}, start time is {}, end time is {}, "
-              + "time consumption is {}ms",
-          getProcessorName(), fileName, startDateTime, endDateTime, closeInterval);
+      if (LOGGER.isInfoEnabled()) {
+        long closeEndTime = System.currentTimeMillis();
+        LOGGER.info(
+            "Close bufferwrite processor {}, the file name is {}, start time is {}, end time is {}, "
+                + "time consumption is {}ms",
+            getProcessorName(), fileName,
+            DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
+            DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
+            closeEndTime - closeStartTime);
+      }
     } catch (IOException e) {
       LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
           getProcessorName(), e);
@@ -420,9 +411,13 @@ public class BufferWriteProcessor extends Processor {
    * @return True if flushing
    */
   public boolean isFlush() {
-    synchronized (flushStatus) {
-      return flushStatus.isFlushing();
-    }
+    // starting a flush task has two steps: set the flushMemtable, and then set the flushFuture
+    // So, the following case exists: flushMemtable != null but flushFuture is done (because the
+    // flushFuture refers to the last finished flush.
+    // And, the following case exists,too: flushMemtable == null, but flushFuture is not done.
+    // (flushTask() is not finished, but switchToWork() has done)
+    // So, checking flushMemTable is more meaningful than flushFuture.isDone().
+    return  flushMemTable != null;
   }
 
   /**
@@ -507,6 +502,22 @@ public class BufferWriteProcessor extends Processor {
   }
 
   /**
+   * used for test. We can know when the flush() is called.
+   * @return the last flush() time. Time unit: nanosecond.
+   */
+  public long getLastFlushTime() {
+    return lastFlushTime;
+  }
+
+  /**
+   * used for test. We can block to wait for finishing flushing.
+   * @return the future of the flush() task.
+   */
+  public Future<Boolean> getFlushFuture() {
+    return flushFuture;
+  }
+
+  /**
    * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
    * Delete data in both working MemTable and flushing MemTable.
    *
@@ -516,7 +527,7 @@ public class BufferWriteProcessor extends Processor {
    */
   public void delete(String deviceId, String measurementId, long timestamp) {
     workMemTable.delete(deviceId, measurementId, timestamp);
-    if (isFlush) {
+    if (isFlush()) {
       // flushing MemTable cannot be directly modified since another thread is reading it
       flushMemTable = flushMemTable.copy();
       flushMemTable.delete(deviceId, measurementId, timestamp);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFuture.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFuture.java
new file mode 100644
index 0000000..660c949
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFuture.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.filenode;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.iotdb.db.utils.ImmediateFuture;
+
+public class FileNodeFlushFuture implements Future<Boolean> {
+  Future<Boolean> bufferWriteFlushFuture;
+  Future<Boolean> overflowFlushFuture;
+  boolean hasOverflowFlushTask;
+
+  public FileNodeFlushFuture(Future<Boolean> bufferWriteFlushFuture, Future<Boolean> overflowFlushFuture){
+    if(bufferWriteFlushFuture != null) {
+      this.bufferWriteFlushFuture = bufferWriteFlushFuture;
+    } else {
+      this.bufferWriteFlushFuture = new ImmediateFuture<>(true);
+    }
+    if(overflowFlushFuture !=null) {
+      this.overflowFlushFuture = overflowFlushFuture;
+      hasOverflowFlushTask = true;
+    } else {
+      this.overflowFlushFuture = new ImmediateFuture<>(true);
+      hasOverflowFlushTask = false;
+    }
+  }
+
+  /**
+   * @param mayInterruptIfRunning true if the thread executing this task should be interrupted;
+   * otherwise, in-progress tasks are allowed to complete
+   * @return true if both of the two sub-future are canceled successfully.
+   * @see Future#cancel(boolean) The difference is that this Future consists of two sub-Futures. If
+   * the first sub-future is canceled successfully but the second sub-future fails, the result is
+   * false.
+   */
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    boolean result = bufferWriteFlushFuture.cancel(mayInterruptIfRunning);
+    result = result & overflowFlushFuture.cancel(mayInterruptIfRunning);
+    return result;
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return bufferWriteFlushFuture.isCancelled() && overflowFlushFuture.isCancelled();
+  }
+
+  @Override
+  public boolean isDone() {
+    return bufferWriteFlushFuture.isDone() && overflowFlushFuture.isDone();
+  }
+
+  @Override
+  public Boolean get() throws InterruptedException, ExecutionException {
+    boolean result = bufferWriteFlushFuture.get();
+    result = result & overflowFlushFuture.get();
+    return result;
+  }
+
+  @Override
+  public Boolean get(long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    boolean result = bufferWriteFlushFuture.get(timeout, unit);
+    result = result && overflowFlushFuture.get(timeout, unit);
+    return result;
+  }
+
+  public boolean isHasOverflowFlushTask() {
+    return hasOverflowFlushTask;
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 7f6de86..c2f4126 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -1067,7 +1067,7 @@ public class FileNodeManager implements IStatistic, IService {
         continue;
       }
       try {
-        boolean isMerge = processor.flush();
+        boolean isMerge = processor.flush().isHasOverflowFlushTask();
         if (isMerge) {
           processor.submitToMerge();
         }
@@ -1093,7 +1093,7 @@ public class FileNodeManager implements IStatistic, IService {
       }
       processor.writeLock();
       try {
-        boolean isMerge = processor.flush();
+        boolean isMerge = processor.flush().isHasOverflowFlushTask();
         if (isMerge) {
           processor.submitToMerge();
         }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index a4fe3ee..b0921f3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -928,7 +928,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * @return null -can't submit the merge task, because this filenode is not overflowed or it is
    * merging now. Future - submit the merge task successfully.
    */
-  public Future submitToMerge() {
+  Future submitToMerge() {
     ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
     if (lastMergeTime > 0) {
       long thisMergeTime = System.currentTimeMillis();
@@ -1695,14 +1695,16 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   @Override
-  public boolean flush() throws IOException {
+  public FileNodeFlushFuture flush() throws IOException {
+    Future<Boolean> bufferWriteFlushFuture = null;
+    Future<Boolean> overflowFlushFuture = null;
     if (bufferWriteProcessor != null) {
-      bufferWriteProcessor.flush();
+      bufferWriteFlushFuture = bufferWriteProcessor.flush();
     }
     if (overflowProcessor != null) {
-      return overflowProcessor.flush();
+      overflowFlushFuture = overflowProcessor.flush();
     }
-    return false;
+    return new FileNodeFlushFuture(bufferWriteFlushFuture, overflowFlushFuture);
   }
 
   /**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 01801d9..331fa33 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -27,10 +27,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -47,15 +47,15 @@ import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.utils.FlushStatus;
-import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.qp.constant.DatetimeUtils;
+import org.apache.iotdb.db.utils.ImmediateFuture;
+import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -70,14 +70,13 @@ public class OverflowProcessor extends Processor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(OverflowProcessor.class);
   private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
-  private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
   private OverflowResource workResource;
   private OverflowResource mergeResource;
 
   private OverflowSupport workSupport;
   private OverflowSupport flushSupport;
 
-  private volatile FlushStatus flushStatus = new FlushStatus();
+  private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
   private volatile boolean isMerge;
   private int valueCount;
   private String parentPath;
@@ -243,7 +242,7 @@ public class OverflowProcessor extends Processor {
       List<ModificationFile> updatedModFiles) throws IOException {
     workResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
     workSupport.delete(deviceId, measurementId, timestamp, false);
-    if (flushStatus.isFlushing()) {
+    if (isFlush()) {
       mergeResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
       flushSupport.delete(deviceId, measurementId, timestamp, true);
     }
@@ -300,15 +299,20 @@ public class OverflowProcessor extends Processor {
       TSDataType dataType) {
 
     MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
-    if (flushStatus.isFlushing()) {
+    queryFlushLock.lock();
+    try {
+      if (flushSupport != null && isFlush()) {
+        memSeriesLazyMerger
+            .addMemSeries(
+                flushSupport.queryOverflowInsertInMemory(deviceId, measurementId, dataType));
+      }
       memSeriesLazyMerger
-          .addMemSeries(
-              flushSupport.queryOverflowInsertInMemory(deviceId, measurementId, dataType));
+          .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, measurementId,
+              dataType));
+      return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger);
+    } finally {
+      queryFlushLock.unlock();
     }
-    memSeriesLazyMerger
-        .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, measurementId,
-            dataType));
-    return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger);
   }
 
   /**
@@ -416,17 +420,16 @@ public class OverflowProcessor extends Processor {
   }
 
   public boolean isFlush() {
-    synchronized (flushStatus) {
-      return flushStatus.isFlushing();
-    }
+    //see BufferWriteProcess.isFlush()
+    return  flushSupport != null;
   }
 
-  private void flushOperation(String flushFunction) {
+  private boolean flushTask(String displayMessage) {
+    boolean result;
     long flushStartTime = System.currentTimeMillis();
     try {
-      LOGGER
-          .info("The overflow processor {} starts flushing {}.", getProcessorName(),
-              flushFunction);
+      LOGGER.info("The overflow processor {} starts flushing {}.", getProcessorName(),
+                  displayMessage);
       // flush data
       workResource
           .flush(fileSchema, flushSupport.getMemTabale(),
@@ -436,35 +439,37 @@ public class OverflowProcessor extends Processor {
       if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
         logNode.notifyEndFlush(null);
       }
+      result = true;
     } catch (IOException e) {
       LOGGER.error("Flush overflow processor {} rowgroup to file error in {}. Thread {} exits.",
-          getProcessorName(), flushFunction, Thread.currentThread().getName(), e);
+          getProcessorName(), displayMessage, Thread.currentThread().getName(), e);
+      result = false;
     } catch (Exception e) {
       LOGGER.error("FilenodeFlushAction action failed. Thread {} exits.",
           Thread.currentThread().getName(), e);
+      result = false;
     } finally {
-      synchronized (flushStatus) {
-        flushStatus.setUnFlushing();
         // switch from flush to work.
         switchFlushToWork();
-        flushStatus.notifyAll();
-      }
     }
     // log flush time
-    LOGGER.info("The overflow processor {} ends flushing {}.", getProcessorName(), flushFunction);
-    long flushEndTime = System.currentTimeMillis();
-    long timeInterval = flushEndTime - flushStartTime;
-    ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushStartTime),
-        IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushEndTime),
-        IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    LOGGER.info(
-        "The overflow processor {} flush {}, start time is {}, flush end time is {}," +
-            " time consumption is {}ms",
-        getProcessorName(), flushFunction, startDateTime, endDateTime, timeInterval);
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER
+          .info("The overflow processor {} ends flushing {}.", getProcessorName(), displayMessage);
+      long flushEndTime = System.currentTimeMillis();
+      LOGGER.info(
+          "The overflow processor {} flush {}, start time is {}, flush end time is {}," +
+              " time consumption is {}ms",
+          getProcessorName(), displayMessage,
+          DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
+          DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
+          flushEndTime - flushStartTime);
+    }
+    return result;
   }
 
-  private Future<?> flush(boolean synchronization) throws OverflowProcessorException {
+  @Override
+  public synchronized Future<Boolean> flush() throws IOException {
     // statistic information for flush
     if (lastFlushTime > 0) {
       long thisFLushTime = System.currentTimeMillis();
@@ -481,21 +486,20 @@ public class OverflowProcessor extends Processor {
     lastFlushTime = System.currentTimeMillis();
     // value count
     if (valueCount > 0) {
-      synchronized (flushStatus) {
-        while (flushStatus.isFlushing()) {
-          try {
-            flushStatus.wait();
-          } catch (InterruptedException e) {
-            LOGGER.error("Waiting the flushstate error in flush row group to store.", e);
-          }
-        }
+      try {
+        flushFuture.get();
+      } catch (InterruptedException | ExecutionException e) {
+        LOGGER.error("Encounter an interrupt error when waitting for the flushing, "
+                + "the bufferwrite processor is {}.",
+            getProcessorName(), e);
+        Thread.currentThread().interrupt();
       }
       try {
         // backup newIntervalFile list and emptyIntervalFileNode
         overflowFlushAction.act();
       } catch (Exception e) {
         LOGGER.error("Flush the overflow rowGroup to file faied, when overflowFlushAction act");
-        throw new OverflowProcessorException(e);
+        throw new IOException(e);
       }
 
       if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
@@ -510,30 +514,14 @@ public class OverflowProcessor extends Processor {
       memSize.set(0);
       valueCount = 0;
       // switch from work to flush
-      flushStatus.setFlushing();
       switchWorkToFlush();
-      if (synchronization) {
-        flushOperation("synchronously");
-      } else {
-        FlushManager.getInstance().submit(new Runnable() {
-          @Override
-          public void run() {
-            flushOperation("asynchronously");
-          }
-        });
-      }
+      flushFuture = FlushManager.getInstance().submit( () ->
+          flushTask("asynchronously"));
+    } else {
+      flushFuture = new ImmediateFuture(true);
     }
-    return null;
-  }
+    return flushFuture;
 
-  @Override
-  public boolean flush() throws IOException {
-    try {
-      flush(false);
-    } catch (OverflowProcessorException e) {
-      throw new IOException(e);
-    }
-    return false;
   }
 
   @Override
@@ -541,17 +529,27 @@ public class OverflowProcessor extends Processor {
     LOGGER.info("The overflow processor {} starts close operation.", getProcessorName());
     long closeStartTime = System.currentTimeMillis();
     // flush data
-    flush(true);
-    LOGGER.info("The overflow processor {} ends close operation.", getProcessorName());
-    // log close time
-    long closeEndTime = System.currentTimeMillis();
-    LOGGER.info(
-        "The close operation of overflow processor {} starts at {} and ends at {}."
-            + " It comsumes {}ms.",
-        getProcessorName(), ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
-            IoTDBDescriptor.getInstance().getConfig().getZoneID()),
-        ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
-            IoTDBDescriptor.getInstance().getConfig().getZoneID()), closeEndTime - closeStartTime);
+    try {
+      flush().get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOGGER.error("Encounter an interrupt error when waitting for the flushing, "
+              + "the bufferwrite processor is {}.",
+          getProcessorName(), e);
+      Thread.currentThread().interrupt();
+    } catch (IOException e) {
+      throw new OverflowProcessorException(e);
+    }
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER.info("The overflow processor {} ends close operation.", getProcessorName());
+      // log close time
+      long closeEndTime = System.currentTimeMillis();
+      LOGGER.info(
+          "The close operation of overflow processor {} starts at {} and ends at {}."
+              + " It comsumes {}ms.",
+          getProcessorName(), DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
+          DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
+          closeEndTime - closeStartTime);
+    }
   }
 
   public void clear() throws IOException {
@@ -641,29 +639,46 @@ public class OverflowProcessor extends Processor {
     }
     OverflowProcessor that = (OverflowProcessor) o;
     return isMerge == that.isMerge &&
-        valueCount == that.valueCount &&
-        lastFlushTime == that.lastFlushTime &&
-        memThreshold == that.memThreshold &&
-        Objects.equals(workResource, that.workResource) &&
-        Objects.equals(mergeResource, that.mergeResource) &&
-        Objects.equals(workSupport, that.workSupport) &&
-        Objects.equals(flushSupport, that.flushSupport) &&
-        Objects.equals(flushStatus, that.flushStatus) &&
-        Objects.equals(parentPath, that.parentPath) &&
-        Objects.equals(dataPathCount, that.dataPathCount) &&
-        Objects.equals(queryFlushLock, that.queryFlushLock) &&
-        Objects.equals(overflowFlushAction, that.overflowFlushAction) &&
-        Objects.equals(filenodeFlushAction, that.filenodeFlushAction) &&
-        Objects.equals(fileSchema, that.fileSchema) &&
-        Objects.equals(memSize, that.memSize) &&
-        Objects.equals(logNode, that.logNode);
+            valueCount == that.valueCount &&
+            lastFlushTime == that.lastFlushTime &&
+            memThreshold == that.memThreshold &&
+            Objects.equals(workResource, that.workResource) &&
+            Objects.equals(mergeResource, that.mergeResource) &&
+            Objects.equals(workSupport, that.workSupport) &&
+            Objects.equals(flushSupport, that.flushSupport) &&
+            Objects.equals(flushFuture, that.flushFuture) &&
+            Objects.equals(parentPath, that.parentPath) &&
+            Objects.equals(dataPathCount, that.dataPathCount) &&
+            Objects.equals(queryFlushLock, that.queryFlushLock) &&
+            Objects.equals(overflowFlushAction, that.overflowFlushAction) &&
+            Objects.equals(filenodeFlushAction, that.filenodeFlushAction) &&
+            Objects.equals(fileSchema, that.fileSchema) &&
+            Objects.equals(memSize, that.memSize) &&
+            Objects.equals(logNode, that.logNode) &&
+            Objects.equals(flushFuture, that.flushFuture);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(super.hashCode(), workResource, mergeResource, workSupport,
-        flushSupport, flushStatus, isMerge, valueCount, parentPath, lastFlushTime,
-        dataPathCount, queryFlushLock, overflowFlushAction, filenodeFlushAction, fileSchema,
-        memThreshold, memSize, logNode);
+            flushSupport, flushFuture, isMerge, valueCount, parentPath, lastFlushTime,
+            dataPathCount, queryFlushLock, overflowFlushAction, filenodeFlushAction, fileSchema,
+            memThreshold, memSize, logNode, flushFuture);
+  }
+
+  /**
+   * used for test. We can block to wait for finishing flushing.
+   * @return the future of the flush() task.
+   */
+  public Future<Boolean> getFlushFuture() {
+    return flushFuture;
+  }
+
+  /**
+   * used for test. We can know when the flush() is called.
+   * @return the last flush() time.
+   */
+  public long getLastFlushTime() {
+    return lastFlushTime;
   }
 }
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
index 0e91b61..9a1e323 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
@@ -18,8 +18,10 @@
  */
 package org.apache.iotdb.db.engine.pool;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
@@ -111,8 +113,12 @@ public class FlushManager {
     }
   }
 
-  public synchronized void submit(Runnable task) {
-    pool.execute(task);
+  public synchronized Future<?> submit(Runnable task) {
+    return pool.submit(task);
+  }
+
+  public synchronized <T>Future<T> submit(Callable<T> task){
+    return pool.submit(task);
   }
 
   public int getActiveCnt() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
index 5011071..266c439 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
@@ -27,6 +27,7 @@ import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
 import java.time.format.SignStyle;
 import java.time.temporal.ChronoField;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 
 public class DatetimeUtils {
@@ -206,4 +207,9 @@ public class DatetimeUtils {
   public static ZoneOffset toZoneOffset(ZoneId zoneId) {
     return zoneId.getRules().getOffset(Instant.now());
   }
+
+  public static ZonedDateTime convertMillsecondToZonedDateTime(long millisecond) {
+    return ZonedDateTime.ofInstant(Instant.ofEpochMilli(millisecond),
+        IoTDBDescriptor.getInstance().getConfig().getZoneID());
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/ImmediateFuture.java
similarity index 51%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java
rename to iotdb/src/main/java/org/apache/iotdb/db/utils/ImmediateFuture.java
index f437c96..340a427 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/ImmediateFuture.java
@@ -16,30 +16,43 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.engine.utils;
 
-/**
- * This class is used to represent the state of flush. It's can be used in the bufferwrite
- * flush{@code SequenceFileManager} and overflow flush{@code OverFlowProcessor}.
- */
-public class FlushStatus {
+package org.apache.iotdb.db.utils;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-  private boolean isFlushing;
+public class ImmediateFuture<T> implements Future<T> {
 
-  public FlushStatus() {
-    this.isFlushing = false;
+  T result;
+  public ImmediateFuture(T result){
+    this.result = result;
+  }
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    return true;
   }
 
-  public boolean isFlushing() {
-    return isFlushing;
+  @Override
+  public boolean isCancelled() {
+    return true;
   }
 
-  public void setFlushing() {
-    this.isFlushing = true;
+  @Override
+  public boolean isDone() {
+    return true;
   }
 
-  public void setUnFlushing() {
-    this.isFlushing = false;
+  @Override
+  public T get() throws InterruptedException, ExecutionException {
+    return result;
   }
 
+  @Override
+  public T get(long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    return result;
+  }
 }
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java
index 22cc44a..75fa7d1 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java
@@ -22,8 +22,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -106,9 +108,8 @@ public class ProcessorTest {
     }
 
     @Override
-    public boolean flush() throws IOException {
-      // TODO Auto-generated method stub
-      return false;
+    public Future<Boolean> flush() throws IOException {
+      return new ImmediateFuture<>(true);
     }
 
     @Override
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
index cdd9ec3..3b218f9 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.engine.bufferwrite;
 
+import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.io.File;
 import java.io.IOException;
@@ -27,7 +29,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -41,11 +45,14 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BufferWriteProcessorNewTest {
-
+  private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessorNewTest.class);
   Action bfflushaction = new Action() {
 
     @Override
@@ -99,15 +106,15 @@ public class BufferWriteProcessorNewTest {
         FileSchemaUtils.constructFileSchema(processorName));
     assertEquals(filename, bufferwrite.getFileName());
     assertEquals(processorName + File.separator + filename, bufferwrite.getFileRelativePath());
-    assertEquals(true, bufferwrite.isNewProcessor());
+    assertTrue(bufferwrite.isNewProcessor());
     bufferwrite.setNewProcessor(false);
-    assertEquals(false, bufferwrite.isNewProcessor());
+    assertFalse(bufferwrite.isNewProcessor());
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferwrite
         .queryBufferWriteData(processorName,
             measurementId, dataType);
     ReadOnlyMemChunk left = pair.left;
     List<ChunkMetaData> right = pair.right;
-    assertEquals(true, left.isEmpty());
+    assertTrue(left.isEmpty());
     assertEquals(0, right.size());
     for (int i = 1; i <= 100; i++) {
       bufferwrite.write(processorName, measurementId, i, dataType, String.valueOf(i));
@@ -116,7 +123,7 @@ public class BufferWriteProcessorNewTest {
     pair = bufferwrite.queryBufferWriteData(processorName, measurementId, dataType);
     left = pair.left;
     right = pair.right;
-    assertEquals(false, left.isEmpty());
+    assertFalse(left.isEmpty());
     int num = 1;
     Iterator<TimeValuePair> iterator = left.getIterator();
     for (; num <= 100; num++) {
@@ -125,19 +132,25 @@ public class BufferWriteProcessorNewTest {
       assertEquals(num, timeValuePair.getTimestamp());
       assertEquals(num, timeValuePair.getValue().getInt());
     }
-    assertEquals(false, bufferwrite.isFlush());
+    assertFalse(bufferwrite.isFlush());
+    long lastFlushTime = bufferwrite.getLastFlushTime();
     // flush asynchronously
     bufferwrite.flush();
-    assertEquals(true, bufferwrite.isFlush());
-    assertEquals(true, bufferwrite.canBeClosed());
+    assertTrue(bufferwrite.getLastFlushTime() != lastFlushTime);
+    assertTrue(bufferwrite.canBeClosed());
     // waiting for the end of flush.
-    while (bufferwrite.isFlush()) {
-      TimeUnit.SECONDS.sleep(1);
+    try {
+      bufferwrite.getFlushFuture().get(10, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      //because UT uses a mock flush operation, 10 seconds should be enough.
+      LOGGER.error(e.getMessage(),e);
+      Assert.fail("mock flush spends more than 10 seconds... "
+          + "Please modify the value or change a better test environment");
     }
     pair = bufferwrite.queryBufferWriteData(processorName, measurementId, dataType);
     left = pair.left;
     right = pair.right;
-    assertEquals(true, left.isEmpty());
+    assertTrue(left.isEmpty());
     assertEquals(1, right.size());
     assertEquals(measurementId, right.get(0).getMeasurementUid());
     assertEquals(dataType, right.get(0).getTsDataType());
@@ -150,7 +163,7 @@ public class BufferWriteProcessorNewTest {
     pair = bufferWriteProcessor.queryBufferWriteData(processorName, measurementId, dataType);
     left = pair.left;
     right = pair.right;
-    assertEquals(true, left.isEmpty());
+    assertTrue(left.isEmpty());
     assertEquals(1, right.size());
     assertEquals(measurementId, right.get(0).getMeasurementUid());
     assertEquals(dataType, right.get(0).getTsDataType());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
index 06d47f1..ea2d743 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
@@ -20,7 +20,11 @@
 package org.apache.iotdb.db.engine.bufferwrite;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 
+import ch.qos.logback.core.util.TimeUtil;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -28,7 +32,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.PathUtils;
@@ -49,9 +55,13 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BufferWriteProcessorTest {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessorTest.class);
+
   Action bfflushaction = new Action() {
 
     @Override
@@ -124,7 +134,7 @@ public class BufferWriteProcessorTest {
     // check file
     String restoreFilePath = insertPath + ".restore";
     File restoreFile = new File(dataFile, restoreFilePath);
-    assertEquals(true, restoreFile.exists());
+    assertTrue(restoreFile.exists());
     File insertFile = new File(dataFile, insertPath);
     long insertFileLength = insertFile.length();
     FileOutputStream fileOutputStream = new FileOutputStream(insertFile.getPath(), true);
@@ -140,18 +150,18 @@ public class BufferWriteProcessorTest {
         directories.getFolderForTest(), deviceId,
         insertPath, parameters, SysTimeVersionController.INSTANCE,
         FileSchemaUtils.constructFileSchema(deviceId));
-    assertEquals(true, insertFile.exists());
+    assertTrue(insertFile.exists());
     assertEquals(insertFileLength, insertFile.length());
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferWriteProcessor
         .queryBufferWriteData(deviceId,
             measurementId, dataType);
-    assertEquals(true, pair.left.isEmpty());
+    assertTrue(pair.left.isEmpty());
     assertEquals(1, pair.right.size());
     ChunkMetaData chunkMetaData = pair.right.get(0);
     assertEquals(measurementId, chunkMetaData.getMeasurementUid());
     assertEquals(dataType, chunkMetaData.getTsDataType());
     bufferWriteProcessor.close();
-    assertEquals(false, restoreFile.exists());
+    assertFalse(restoreFile.exists());
   }
 
   @Test
@@ -169,7 +179,7 @@ public class BufferWriteProcessorTest {
     // check file
     String restoreFilePath = insertPath + ".restore";
     File restoreFile = new File(dataFile, restoreFilePath);
-    assertEquals(true, restoreFile.exists());
+    assertTrue(restoreFile.exists());
     BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
         directories.getFolderForTest(), deviceId,
         insertPath, parameters, SysTimeVersionController.INSTANCE,
@@ -177,14 +187,14 @@ public class BufferWriteProcessorTest {
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferWriteProcessor
         .queryBufferWriteData(deviceId,
             measurementId, dataType);
-    assertEquals(true, pair.left.isEmpty());
+    assertTrue(pair.left.isEmpty());
     assertEquals(1, pair.right.size());
     ChunkMetaData chunkMetaData = pair.right.get(0);
     assertEquals(measurementId, chunkMetaData.getMeasurementUid());
     assertEquals(dataType, chunkMetaData.getTsDataType());
     bufferWriteProcessor.close();
     bufferwrite.close();
-    assertEquals(false, restoreFile.exists());
+    assertFalse(restoreFile.exists());
   }
 
   @Test
@@ -193,26 +203,36 @@ public class BufferWriteProcessorTest {
     bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath,
         parameters, SysTimeVersionController.INSTANCE,
         FileSchemaUtils.constructFileSchema(deviceId));
-    assertEquals(false, bufferwrite.isFlush());
-    assertEquals(true, bufferwrite.canBeClosed());
+    assertFalse(bufferwrite.isFlush());
+    assertTrue(bufferwrite.canBeClosed());
     assertEquals(0, bufferwrite.memoryUsage());
     assertEquals(TsFileIOWriter.magicStringBytes.length, bufferwrite.getFileSize());
     assertEquals(0, bufferwrite.getMetaSize());
+    long lastFlushTime = bufferwrite.getLastFlushTime();
     for (int i = 1; i <= 85; i++) {
       bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i));
       assertEquals(i * 12, bufferwrite.memoryUsage());
     }
+    assertEquals(lastFlushTime, bufferwrite.getLastFlushTime());
     bufferwrite.write(deviceId, measurementId, 86, dataType, String.valueOf(86));
-    assertEquals(true, bufferwrite.isFlush());
+    //assert a flush() is called.
+    assertNotEquals(bufferwrite.getLastFlushTime(), lastFlushTime);
     // sleep to the end of flush
-    TimeUnit.SECONDS.sleep(2);
-    assertEquals(false, bufferwrite.isFlush());
+    try {
+      bufferwrite.getFlushFuture().get(10, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      //because UT uses a mock flush operation, 10 seconds should be enough.
+      LOGGER.error(e.getMessage(), e);
+      Assert.fail("mock flush spends more than 10 seconds... "
+          + "Please modify the value or change a better test environment");
+    }
+    assertFalse(bufferwrite.isFlush());
     assertEquals(0, bufferwrite.memoryUsage());
     // query result
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferwrite
         .queryBufferWriteData(deviceId, measurementId,
             dataType);
-    assertEquals(true, pair.left.isEmpty());
+    assertTrue(pair.left.isEmpty());
     assertEquals(1, pair.right.size());
     ChunkMetaData chunkMetaData = pair.right.get(0);
     assertEquals(measurementId, chunkMetaData.getMeasurementUid());
@@ -223,7 +243,7 @@ public class BufferWriteProcessorTest {
     }
     pair = bufferwrite.queryBufferWriteData(deviceId, measurementId, dataType);
     ReadOnlyMemChunk rawSeriesChunk = (ReadOnlyMemChunk) pair.left;
-    assertEquals(false, rawSeriesChunk.isEmpty());
+    assertFalse(rawSeriesChunk.isEmpty());
     assertEquals(87, rawSeriesChunk.getMinTimestamp());
     Assert.assertEquals(87, rawSeriesChunk.getValueAtMinTime().getInt());
     assertEquals(100, rawSeriesChunk.getMaxTimestamp());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
index 4c4ee05..867e276 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
@@ -95,7 +95,6 @@ public class OverflowProcessorTest {
     assertEquals(0,
         overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
     processor.flush();
-    assertEquals(false, processor.isFlush());
     assertEquals(false, processor.isMerge());
     // write insert data
     OverflowTestUtils.produceInsertData(processor);