You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/11 09:03:58 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: resolve duplicated data in flushing memtable and writer, refactor flushing memtable data deletion

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 1d8a267  resolve duplicated data in flushing memtable and writer, refactor flushing memtable data deletion
     new 774a5f0  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
1d8a267 is described below

commit 1d8a2679f479968972b44b4539617c0a6742a595
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 11 17:00:07 2019 +0800

    resolve duplicated data in flushing memtable and writer, refactor flushing memtable data deletion
---
 .../engine/bufferwrite/BufferWriteProcessor.java   | 48 ++++---------
 .../db/engine/filenode/FileNodeProcessor.java      |  2 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 37 ++++++++--
 .../apache/iotdb/db/engine/memtable/IMemTable.java | 12 +++-
 .../db/engine/memtable/IWritableMemChunk.java      |  2 +
 .../db/engine/memtable/MemTableFlushTask.java      |  2 +-
 .../iotdb/db/engine/memtable/WritableMemChunk.java | 10 ++-
 .../db/engine/overflow/io/OverflowMemtable.java    |  6 +-
 .../db/engine/overflow/io/OverflowProcessor.java   |  4 +-
 .../db/writelog/node/ExclusiveWriteLogNode.java    | 16 +++--
 .../iotdb/db/writelog/node/WriteLogNode.java       |  6 +-
 .../recover/ExclusiveLogRecoverPerformer.java      |  2 +-
 .../writelog/recover/FileNodeRecoverPerformer.java | 55 --------------
 .../recover/StorageGroupRecoverPerformer.java      | 84 ++++++++++++++++++++++
 .../org/apache/iotdb/db/writelog/RecoverTest.java  |  2 +-
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java |  4 +-
 16 files changed, 177 insertions(+), 115 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index b331a57..4148359 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.engine.bufferwrite;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
@@ -45,11 +47,13 @@ import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.MemTablePool;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
+import org.apache.iotdb.db.utils.FileUtils;
 import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -76,7 +80,6 @@ public class BufferWriteProcessor extends Processor {
   private AtomicLong memSize = new AtomicLong();
   private long memThreshold = TSFileConfig.groupSizeInByte;
   private IMemTable workMemTable;
-  private IMemTable flushMemTable;
 
   // each flush task has a flushId, IO task should scheduled by this id
   private long flushId = -1;
@@ -277,7 +280,7 @@ public class BufferWriteProcessor extends Processor {
     flushQueryLock.lock();
     try {
       MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
-      if (flushMemTable != null) {
+      if (!flushingMemTables.isEmpty()) {
         memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId, measurementId, dataType, props));
       }
       memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props));
@@ -292,27 +295,12 @@ public class BufferWriteProcessor extends Processor {
     }
   }
 
-  private void switchWorkToFlush() {
-    LOGGER.info("BufferWrite Processor {} try to get flushQueryLock for switchWorkToFlush", getProcessorName());
-    flushQueryLock.lock();
-    LOGGER.info("BufferWrite Processor {} get flushQueryLock for switchWorkToFlush successfully", getProcessorName());
-    try {
-      IMemTable temp = flushMemTable == null ? MemTablePool.getInstance().getEmptyMemTable() : flushMemTable;
-      flushMemTable = workMemTable;
-      workMemTable = temp;
-      isFlush = true;
-    } finally {
-      flushQueryLock.unlock();
-      LOGGER.info("BufferWrite Processor {} release the flushQueryLock for switchWorkToFlush successfully", getProcessorName());
-    }
-  }
 
   private void switchFlushToWork() {
     LOGGER.info("BufferWrite Processor {} try to get flushQueryLock for switchFlushToWork", getProcessorName());
     flushQueryLock.lock();
     LOGGER.info("BufferWrite Processor {} get flushQueryLock for switchFlushToWork", getProcessorName());
     try {
-      flushMemTable.clear();
       writer.appendMetadata();
       isFlush = false;
     } finally {
@@ -361,7 +349,7 @@ public class BufferWriteProcessor extends Processor {
 
       filenodeFlushAction.act();
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-        logNode.notifyEndFlush(null, walTaskId);
+        logNode.notifyEndFlush(null, walTaskId, new File(writer.getInsertFilePath()).getName());
       }
       result = true;
     } catch (Exception e) {
@@ -413,17 +401,6 @@ public class BufferWriteProcessor extends Processor {
     }
     lastFlushTime = System.currentTimeMillis();
     // check value count
-    // waiting for the end of last flush operation.
-    try {
-      long startTime = System.currentTimeMillis();
-      flushFuture.get();
-      long timeCost = System.currentTimeMillis() - startTime;
-      if (timeCost > 10) {
-        LOGGER.info("BufferWrite Processor {} wait for the previous flushing task for {} ms.", getProcessorName(), timeCost);
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      throw new IOException(e);
-    }
     if (valueCount > 0) {
       // update the lastUpdatetime, prepare for flush
       try {
@@ -434,16 +411,14 @@ public class BufferWriteProcessor extends Processor {
       }
       final long walTaskId;
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-        walTaskId = logNode.notifyStartFlush();
+        walTaskId = logNode.notifyStartFlush(new File(writer.getInsertFilePath()).getName());
         LOGGER.info("BufferWrite Processor {} has notified WAL for flushing.", getProcessorName());
       } else {
         walTaskId = 0;
       }
       valueCount = 0;
 
-      synchronized (flushingMemTables) {
-        flushingMemTables.add(workMemTable);
-      }
+      flushingMemTables.add(workMemTable);
       IMemTable tmpMemTableToFlush = workMemTable;
       workMemTable = MemTablePool.getInstance().getEmptyMemTable();
 
@@ -638,8 +613,11 @@ public class BufferWriteProcessor extends Processor {
     workMemTable.delete(deviceId, measurementId, timestamp);
     if (isFlush()) {
       // flushing MemTable cannot be directly modified since another thread is reading it
-      flushMemTable = flushMemTable.copy();
-      flushMemTable.delete(deviceId, measurementId, timestamp);
+      for (IMemTable memTable : flushingMemTables) {
+        if (memTable.containSeries(deviceId, measurementId)) {
+          memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
+        }
+      }
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 78b41ae..ef36dfa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -115,7 +115,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 
   private static final String WARN_NO_SUCH_OVERFLOWED_FILE = "Can not find any tsfile which"
       + " will be overflowed in the filenode processor {}, ";
-  private static final String RESTORE_FILE_SUFFIX = ".restore";
+  public static final String RESTORE_FILE_SUFFIX = ".restore";
   private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeProcessor.class);
   private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
   private static final MManager mManager = MManager.getInstance();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 7e57a6b..1a4f755 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -18,9 +18,14 @@
  */
 package org.apache.iotdb.db.engine.memtable;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -28,6 +33,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 public abstract class AbstractMemTable implements IMemTable {
 
+  private List<Modification> modifications = new ArrayList<>();
+
   private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
 
   public AbstractMemTable() {
@@ -104,14 +111,33 @@ public abstract class AbstractMemTable implements IMemTable {
   @Override
   public ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
       Map<String, String> props) {
-    return new ReadOnlyMemChunk(dataType, getSeriesData(deviceId, measurement, dataType), props);
+
+    return new ReadOnlyMemChunk(dataType, getSeriesData(deviceId,
+        measurement, dataType), props);
+  }
+
+  private long findUndeletedTime(String deviceId, String measurement) {
+    String path = deviceId + PATH_SEPARATOR + measurement;
+    long undeletedTime = 0;
+    for (Modification modification : modifications) {
+      if (modification instanceof  Deletion) {
+        Deletion deletion = (Deletion) modification;
+        if (deletion.getPath().equals(path) && deletion.getTimestamp() > undeletedTime) {
+          undeletedTime = deletion.getTimestamp();
+        }
+      }
+    }
+    return undeletedTime + 1;
   }
 
   private TimeValuePairSorter getSeriesData(String deviceId, String measurement, TSDataType dataType) {
     if (!checkPath(deviceId, measurement)) {
       return new WritableMemChunk(dataType);
     }
-    return memTableMap.get(deviceId).get(measurement);
+    long undeletedTime = findUndeletedTime(deviceId, measurement);
+    IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
+    memChunk.setTimeOffset(undeletedTime);
+    return memChunk;
   }
 
   @Override
@@ -119,8 +145,6 @@ public abstract class AbstractMemTable implements IMemTable {
     Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId);
     if (deviceMap != null) {
       IWritableMemChunk chunk = deviceMap.get(measurementId);
-      //TODO: if the memtable is thread safe, then we do not need to copy data again,
-      // otherwise current implementation is error.
       IWritableMemChunk newChunk = filterChunk(chunk, timestamp);
       if (newChunk != null) {
         deviceMap.put(measurementId, newChunk);
@@ -130,6 +154,11 @@ public abstract class AbstractMemTable implements IMemTable {
     return false;
   }
 
+  @Override
+  public boolean delete(Deletion deletion) {
+    return this.modifications.add(deletion);
+  }
+
   /**
    * If chunk contains data with timestamp less than 'timestamp', create a copy and delete all those
    * data. Otherwise return null.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index bca8d05..bc28c56 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.memtable;
 
 import java.util.Map;
+import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -64,7 +65,7 @@ public interface IMemTable {
 
   /**
    * Delete data in it whose timestamp <= 'timestamp' and belonging to timeseries
-   * deviceId.measurementId.
+   * deviceId.measurementId. Only called for non-flushing MemTable.
    *
    * @param deviceId the deviceId of the timeseries to be deleted.
    * @param measurementId the measurementId of the timeseries to be deleted.
@@ -74,6 +75,15 @@ public interface IMemTable {
   boolean delete(String deviceId, String measurementId, long timestamp);
 
   /**
+   * Delete data in it whose timestamp <= 'timestamp' and belonging to timeseries
+   * deviceId.measurementId. Only called for flushing MemTable.
+   *
+   * @param deletion and object representing this deletion
+   * @return true if there is data that been deleted. otherwise false.
+   */
+  boolean delete(Deletion deletion);
+
+  /**
    * Make a copy of this MemTable.
    *
    * @return a MemTable with the same data as this one.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index fbb1fa5..267f26f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -44,4 +44,6 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
   int count();
 
   TSDataType getType();
+
+  void setTimeOffset(long offset);
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
index 27926c0..1d7b56b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
@@ -138,8 +138,8 @@ public class MemTableFlushTask {
         ioTime += System.currentTimeMillis() - starTime;
       }
     }
-    flushCallBack.afterFlush(memTable, tsFileIoWriter);
     MemTablePool.getInstance().release(memTable);
+    flushCallBack.afterFlush(memTable, tsFileIoWriter);
     tsFileIoWriter.getFlushID().getAndIncrement();
     LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk:  io cost {} ms.",
         processName, ioTime);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 3c36f98..62d3f11 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -34,6 +34,7 @@ public class WritableMemChunk implements IWritableMemChunk {
 
   private TSDataType dataType;
   private PrimitiveArrayList list;
+  private long timeOffset = 0;
 
   public WritableMemChunk(TSDataType dataType) {
     this.dataType = dataType;
@@ -128,7 +129,9 @@ public class WritableMemChunk implements IWritableMemChunk {
     int length = list.size();
     Map<Long, TsPrimitiveType> map = new HashMap<>(length, 1.0f);
     for (int i = 0; i < length; i++) {
-      map.put(list.getTimestamp(i), TsPrimitiveType.getByType(dataType, list.getValue(i)));
+      if (list.getTimestamp(i) >= timeOffset) {
+        map.put(list.getTimestamp(i), TsPrimitiveType.getByType(dataType, list.getValue(i)));
+      }
     }
     List<TimeValuePair> ret = new ArrayList<>(map.size());
     map.forEach((k, v) -> ret.add(new TimeValuePairInMemTable(k, v)));
@@ -151,4 +154,9 @@ public class WritableMemChunk implements IWritableMemChunk {
     return dataType;
   }
 
+  @Override
+  public void setTimeOffset(long offset) {
+    timeOffset = offset;
+  }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
index 5fadb54..5c3d3f5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
@@ -18,10 +18,13 @@
  */
 package org.apache.iotdb.db.engine.overflow.io;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics;
@@ -76,8 +79,7 @@ public class OverflowMemtable {
 
   public void delete(String deviceId, String measurementId, long timestamp, boolean isFlushing) {
     if (isFlushing) {
-      memTable = memTable.copy();
-      memTable.delete(deviceId, measurementId, timestamp);
+      memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
     } else {
       memTable.delete(deviceId, measurementId, timestamp);
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 8b19e5c..b6bab39 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -496,7 +496,7 @@ public class OverflowProcessor extends Processor {
       filenodeFlushAction.act();
       // write-ahead log
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-        logNode.notifyEndFlush(null, walTaskId);
+        logNode.notifyEndFlush(null, walTaskId, workResource.getInsertFile().getName());
       }
       result = true;
     } catch (IOException e) {
@@ -559,7 +559,7 @@ public class OverflowProcessor extends Processor {
       long taskId = 0;
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         try {
-          taskId = logNode.notifyStartFlush();
+          taskId = logNode.notifyStartFlush(workResource.getInsertFile().getName());
         } catch (IOException e) {
           LOGGER.error("Overflow processor {} encountered an error when notifying log node, {}",
               getProcessorName(), e);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 3c7fb4d..8ebddb4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -161,15 +161,16 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
    * Warning : caller must have lock.
    */
   @Override
-  public long notifyStartFlush() {
+  public long notifyStartFlush(String tsFileName) {
     close();
-    File oldLogFile = new File(logDirectory + File.separator + WAL_FILE_NAME);
+    File oldLogFile = new File(logDirectory + File.separator + tsFileName + WAL_FILE_NAME);
     if (!oldLogFile.exists()) {
       return 0;
     }
 
     long id = taskId.incrementAndGet();
-    File newLogFile = new File(logDirectory + File.separator + WAL_FILE_NAME + OLD_SUFFIX + id);
+    File newLogFile = new File(logDirectory + File.separator + tsFileName + WAL_FILE_NAME +
+        OLD_SUFFIX + id);
     if (!oldLogFile.renameTo(newLogFile)) {
       logger.error("Log node {} renaming log file failed!", identifier);
     } else {
@@ -183,8 +184,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
    * Warning : caller must have lock.
    */
   @Override
-  public void notifyEndFlush(List<LogPosition> logPositions, long taskId) {
-    discard(taskId);
+  public void notifyEndFlush(List<LogPosition> logPositions, long taskId, String tsFileName) {
+    discard(taskId, tsFileName);
   }
 
   @Override
@@ -270,8 +271,9 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     }
   }
 
-  private void discard(long id) {
-    File oldLogFile = new File(logDirectory + File.separator + WAL_FILE_NAME + OLD_SUFFIX + id);
+  private void discard(long id, String tsFileName) {
+    File oldLogFile = new File(logDirectory + File.separator + tsFileName + WAL_FILE_NAME
+        + OLD_SUFFIX + id);
     if (!oldLogFile.exists()) {
       logger.info("No old log to be deleted");
     } else {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
index b2e7fc3..dc40731 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
@@ -58,17 +58,19 @@ public interface WriteLogNode {
 
   /**
    * When a FileNode attempts to start a flush, this method must be called to rename log file.
+   * @param tsFileName the name of the tsfile to which the data in this wal belongs
    * @return the task id ( being used in the renamed log file)
    */
-  long notifyStartFlush() throws IOException;
+  long notifyStartFlush(String tsFileName) throws IOException;
 
   /**
    * When the flush of a FlieNode ends, this method must be called to check if log file needs
    * cleaning.
    * @param logPositions (deprecated)
+   * @param tsFileName the name of the tsfile to which the data in this wal belongs
    * @param  taskId the task id that notifyStartFlush() returns.
    */
-  void notifyEndFlush(List<LogPosition> logPositions, long taskId);
+  void notifyEndFlush(List<LogPosition> logPositions, long taskId, String tsFileName);
 
   /**
    * return identifier of the log node.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
index 9d4b958..65c29a8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
@@ -70,7 +70,7 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
     this.restoreFilePath = restoreFilePath;
     this.processorStoreFilePath = processorStoreFilePath;
     this.writeLogNode = logNode;
-    this.fileNodeRecoverPerformer = new FileNodeRecoverPerformer(writeLogNode.getIdentifier());
+    this.fileNodeRecoverPerformer = new StorageGroupRecoverPerformer(writeLogNode.getIdentifier());
     this.isOverflow = logNode.getFileNodeName().contains(IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/FileNodeRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/FileNodeRecoverPerformer.java
deleted file mode 100644
index de339f6..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/FileNodeRecoverPerformer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.writelog.recover;
-
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.RecoverException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileNodeRecoverPerformer implements RecoverPerformer {
-
-  private static final Logger logger = LoggerFactory.getLogger(FileNodeRecoverPerformer.class);
-
-  /**
-   * If the storage group is "root.a.b", then the identifier of a bufferwrite processor will be
-   * "root.a.b-bufferwrite", and the identifier of an overflow processor will be
-   * "root.a.b-overflow".
-   */
-  private String identifier;
-
-  public FileNodeRecoverPerformer(String identifier) {
-    this.identifier = identifier;
-  }
-
-  @Override
-  public void recover() throws RecoverException {
-    try {
-      FileNodeManager.getInstance().recoverFileNode(getFileNodeName());
-    } catch (FileNodeManagerException e) {
-      logger.error("Cannot recover filenode {}", identifier);
-      throw new RecoverException(e);
-    }
-  }
-
-  public String getFileNodeName() {
-    return identifier.split("-")[0];
-  }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/StorageGroupRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/StorageGroupRecoverPerformer.java
new file mode 100644
index 0000000..34ea50c
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/StorageGroupRecoverPerformer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.writelog.recover;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
+import org.apache.iotdb.db.exception.RecoverException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StorageGroupRecoverPerformer implements RecoverPerformer {
+
+  private static final Logger logger = LoggerFactory.getLogger(StorageGroupRecoverPerformer.class);
+
+  /**
+   * If the storage group is "root.a.b", then the identifier of a bufferwrite processor will be
+   * "root.a.b-bufferwrite", and the identifier of an overflow processor will be
+   * "root.a.b-overflow".
+   */
+  private String identifier;
+
+  private String storageGroupName;
+
+  public StorageGroupRecoverPerformer(String identifier) {
+    this.identifier = identifier;
+    this.storageGroupName = identifier.split("-")[0];
+  }
+
+  @Override
+  public void recover() {
+    recoverBufferWrite();
+  }
+
+  private void recoverBufferWrite() {
+    List<String> bufferWritePathList = Directories.getInstance().getAllTsFileFolders();
+    File logDirectory = new File(IoTDBDescriptor.getInstance().getConfig().getWalFolder()
+        + File.separator + this.identifier);
+    for (String bufferWritePath : bufferWritePathList) {
+      File bufferDir = new File(bufferWritePath, storageGroupName);
+      // free and close the streams under this bufferwrite directory
+      if (!bufferDir.exists()) {
+        continue;
+      }
+      // only TsFiles with a restore file should be recovered
+      File[] bufferFiles = bufferDir.listFiles((fileName) ->
+          fileName.getName().contains(FileNodeProcessor.RESTORE_FILE_SUFFIX));
+      if (bufferFiles != null) {
+        for (File bufferFile : bufferFiles) {
+          File[] walFiles = null;
+          if (logDirectory.exists()) {
+            walFiles = logDirectory.listFiles((filename) -> {
+              return filename.getName().contains(bufferFile.getName());
+            });
+          }
+        }
+      }
+    }
+  }
+
+  public String getStorageGroupName() {
+    return identifier.split("-")[0];
+  }
+}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
index 9a9617c..45bd737 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
@@ -96,7 +96,7 @@ public class RecoverTest {
 
       logNode.write(bwInsertPlan);
       logNode.write(updatePlan);
-      logNode.notifyStartFlush();
+      logNode.notifyStartFlush("test");
       logNode.write(deletePlan);
       logNode.forceSync();
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index 321387f..9a4fc61 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -164,13 +164,13 @@ public class WriteLogNodeTest {
         config.getWalFolder() + File.separator + "root.logTestDevice" + File.separator + "wal");
     assertTrue(walFile.exists());
 
-    long taskId = logNode.notifyStartFlush();
+    long taskId = logNode.notifyStartFlush("test");
     File oldWalFile = new File(
         config.getWalFolder() + File.separator + "root.logTestDevice" + File.separator + "wal-old"+taskId);
     assertTrue(oldWalFile.exists());
     assertTrue(oldWalFile.length() > 0);
 
-    logNode.notifyEndFlush(null, taskId);
+    logNode.notifyEndFlush(null, taskId, "test");
     assertTrue(!oldWalFile.exists());
     assertEquals(0, walFile.length());