You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/05/14 04:08:01 UTC

[incubator-iotdb] branch refactor_overflow updated: partially complete TsFileProcessor

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/refactor_overflow by this push:
     new 817f141  partially complete TsFileProcessor
817f141 is described below

commit 817f14157dd607c9f2239062ffb8e5aa8f4ab2de
Author: 江天 <jt...@163.com>
AuthorDate: Tue May 14 12:06:27 2019 +0800

    partially complete TsFileProcessor
---
 .../java/org/apache/iotdb/db/engine/Processor.java |   3 +-
 .../engine/bufferwrite/BufferWriteProcessor.java   |   7 +-
 .../db/engine/filenode/FileNodeProcessor.java      |  32 +--
 .../db/engine/overflow/io/OverflowProcessor.java   |  10 +-
 .../db/engine/overflowdata/OverflowProcessor.java  |  28 +--
 .../db/engine/querycontext/QueryDataSource.java    |   6 +-
 ...SeriesDataSource.java => SeriesDataSource.java} |   4 +-
 .../iotdb/db/engine/sgmanager/OperationResult.java |   4 +-
 .../db/engine/sgmanager/StorageGroupProcessor.java | 105 ++++----
 .../db/engine/tsfiledata/TsFileProcessor.java      | 270 ++++++++++++---------
 .../iotdb/db/exception/TooManyChunksException.java |  19 ++
 .../TsFileProcessorException.java}                 |  27 ++-
 .../db/query/control/QueryResourceManager.java     |   4 +-
 .../query/reader/sequence/SequenceDataReader.java  |   6 +-
 .../sequence/SequenceDataReaderByTimestamp.java    |   4 +-
 .../writelog/manager/MultiFileLogNodeManager.java  |   7 +-
 .../db/writelog/manager/WriteLogNodeManager.java   |   3 +-
 .../db/writelog/node/ExclusiveWriteLogNode.java    |   6 +-
 .../recover/ExclusiveLogRecoverPerformer.java      |  38 +--
 19 files changed, 304 insertions(+), 279 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index 5c515ff..223523a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
 import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 
 /**
  * Processor is used for implementing different processor with different operation.<br>
@@ -185,7 +186,7 @@ public abstract class Processor {
    * @throws IOException
    * @throws ProcessorException
    */
-  public abstract void close() throws ProcessorException;
+  public abstract void close() throws TsFileProcessorException;
 
   public abstract long memoryUsage();
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 1809cfb..b99d0a2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
@@ -399,7 +400,7 @@ public class BufferWriteProcessor extends Processor {
   }
 
   @Override
-  public void close() throws BufferWriteProcessorException {
+  public void close() throws TsFileProcessorException {
     if (isClosed) {
       return;
     }
@@ -431,11 +432,11 @@ public class BufferWriteProcessor extends Processor {
     } catch (IOException e) {
       LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
           getProcessorName(), e);
-      throw new BufferWriteProcessorException(e);
+      throw new TsFileProcessorException(e);
     } catch (Exception e) {
       LOGGER
           .error("Failed to close the bufferwrite processor when calling the action function.", e);
-      throw new BufferWriteProcessorException(e);
+      throw new TsFileProcessorException(e);
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 209c1f1..acaea24 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -58,7 +58,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
 import org.apache.iotdb.db.engine.pool.MergeManager;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -72,6 +72,7 @@ import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.IStatistic;
 import org.apache.iotdb.db.monitor.MonitorConstants;
@@ -143,7 +144,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    */
   private long lastMergeTime = -1;
   private BufferWriteProcessor bufferWriteProcessor = null;
-  private OverflowProcessor overflowProcessor = null;
+  private OverflowProcessor overflowProcessor1 = null;
   private Set<Integer> oldMultiPassTokenSet = null;
   private Set<Integer> newMultiPassTokenSet = new HashSet<>();
 
@@ -484,12 +485,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     // restore the overflow processor
     LOGGER.info("The filenode processor {} will recovery the overflow processor.",
         getProcessorName());
-    parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
-    parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
     try {
-      overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema,
-          versionController);
-    } catch (IOException e) {
+      overflowProcessor1 = new OverflowProcessor(getProcessorName(), overflowFlushAction,
+          flushFileNodeProcessorAction, ()->{}, versionController, fileSchema);
+    } catch (TsFileProcessorException | IOException e) {
       LOGGER.error("The filenode processor {} failed to recovery the overflow processor.",
           getProcessorName());
       throw new FileNodeProcessorException(e);
@@ -554,19 +553,20 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   /**
    * get overflow processor by processor name.
    */
-  public OverflowProcessor getOverflowProcessor(String processorName) throws IOException {
-    if (overflowProcessor == null) {
+  public OverflowProcessor getOverflowProcessor(String processorName)
+      throws IOException, TsFileProcessorException {
+    if (overflowProcessor1 == null) {
       Map<String, Action> params = new HashMap<>();
       // construct processor or restore
       params.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
       params
           .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
-      overflowProcessor = new OverflowProcessor(processorName, params, fileSchema,
-          versionController);
-    } else if (overflowProcessor.isClosed()) {
-      overflowProcessor.reopen();
+      overflowProcessor1 = new OverflowProcessor(getProcessorName(), overflowFlushAction,
+          flushFileNodeProcessorAction, ()->{}, versionController, fileSchema);
+    } else if (overflowProcessor1.isClosed()) {
+      overflowProcessor1.reopen();
     }
-    return overflowProcessor;
+    return overflowProcessor1;
   }
 
   /**
@@ -833,10 +833,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 
       unsealedTsFile.setTimeSeriesChunkMetaDatas(bufferwritedata.right);
     }
-    GlobalSortedSeriesDataSource globalSortedSeriesDataSource = new GlobalSortedSeriesDataSource(
+    SeriesDataSource seriesDataSource = new SeriesDataSource(
         new Path(deviceId + "." + measurementId), bufferwriteDataInFiles, unsealedTsFile,
         bufferwritedata.left);
-    return new QueryDataSource(globalSortedSeriesDataSource, overflowSeriesDataSource);
+    return new QueryDataSource(seriesDataSource, overflowSeriesDataSource);
 
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 9d04990..f052876 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
@@ -122,8 +123,7 @@ public class OverflowProcessor extends Processor {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       logNode = MultiFileLogNodeManager.getInstance().getNode(
           processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX,
-          getOverflowRestoreFile(),
-          FileNodeManager.getInstance().getRestoreFilePath(processorName));
+          getOverflowRestoreFile());
     }
   }
 
@@ -587,7 +587,7 @@ public class OverflowProcessor extends Processor {
   }
 
   @Override
-  public void close() throws OverflowProcessorException {
+  public void close() throws TsFileProcessorException {
     if (isClosed) {
       return;
     }
@@ -602,7 +602,7 @@ public class OverflowProcessor extends Processor {
           getProcessorName(), e);
       Thread.currentThread().interrupt();
     } catch (IOException e) {
-      throw new OverflowProcessorException(e);
+      throw new TsFileProcessorException(e);
     }
     if (LOGGER.isInfoEnabled()) {
       LOGGER.info("The overflow processor {} ends close operation.", getProcessorName());
@@ -618,7 +618,7 @@ public class OverflowProcessor extends Processor {
     try {
       clear();
     } catch (IOException e) {
-      throw new OverflowProcessorException(e);
+      throw new TsFileProcessorException(e);
     }
     isClosed = true;
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
index 22f28dd..eae9799 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
@@ -20,25 +20,12 @@
 package org.apache.iotdb.db.engine.overflowdata;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
-import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.sgmanager.OperationResult;
 import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
 
 public class OverflowProcessor extends TsFileProcessor {
@@ -48,16 +35,12 @@ public class OverflowProcessor extends TsFileProcessor {
    *
    * @param processorName processor name
    * @param fileSchemaRef file schema
-   * @throws BufferWriteProcessorException BufferWriteProcessorException
+   * @throws TsFileProcessorException TsFileProcessorException
    */
-  public OverflowProcessor(String processorName,
-      Action beforeFlushAction,
-      Action afterFlushAction,
-      Action afterCloseAction,
-      VersionController versionController,
+  public OverflowProcessor(String processorName, VersionController versionController,
       FileSchema fileSchemaRef)
-      throws BufferWriteProcessorException, IOException {
-    super(processorName, beforeFlushAction, afterFlushAction, afterCloseAction, versionController,
+      throws TsFileProcessorException, IOException {
+    super(processorName, versionController,
         fileSchemaRef);
   }
 
@@ -80,5 +63,4 @@ public class OverflowProcessor extends TsFileProcessor {
   protected String getLogSuffix() {
     return IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX;
   }
-
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index f63915e..84dac5f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -21,18 +21,18 @@ package org.apache.iotdb.db.engine.querycontext;
 public class QueryDataSource {
 
   // sequence data source
-  private GlobalSortedSeriesDataSource seriesDataSource;
+  private SeriesDataSource seriesDataSource;
 
   // unSequence data source
   private OverflowSeriesDataSource overflowSeriesDataSource;
 
-  public QueryDataSource(GlobalSortedSeriesDataSource seriesDataSource,
+  public QueryDataSource(SeriesDataSource seriesDataSource,
       OverflowSeriesDataSource overflowSeriesDataSource) {
     this.seriesDataSource = seriesDataSource;
     this.overflowSeriesDataSource = overflowSeriesDataSource;
   }
 
-  public GlobalSortedSeriesDataSource getSeqDataSource() {
+  public SeriesDataSource getSeqDataSource() {
     return seriesDataSource;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java
similarity index 94%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java
index 176a30e..933c0bc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java
@@ -22,7 +22,7 @@ import java.util.List;
 import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.tsfile.read.common.Path;
 
-public class GlobalSortedSeriesDataSource {
+public class SeriesDataSource {
 
   private Path seriesPath;
 
@@ -35,7 +35,7 @@ public class GlobalSortedSeriesDataSource {
   // seq mem-table
   private ReadOnlyMemChunk readableChunk;
 
-  public GlobalSortedSeriesDataSource(Path seriesPath, List<TsFileResource> sealedTsFiles,
+  public SeriesDataSource(Path seriesPath, List<TsFileResource> sealedTsFiles,
       UnsealedTsFile unsealedTsFile,
       ReadOnlyMemChunk readableChunk) {
     this.seriesPath = seriesPath;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java
index 27ce0ff..1a2deb0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java
@@ -27,5 +27,7 @@ public enum OperationResult {
   //the write operation is reject because of the timestamp is not allowed
   WRITE_REJECT_BY_TIME,
   //the write operation is reject because there is no available memory
-  WRITE_REJECT_BY_MEM;
+  WRITE_REJECT_BY_MEM,
+  //attempts to write a closed FileProcessor
+  WRITE_REJECT_BY_CLOSED_PROCESSOR,
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
index ed2fdca..07674f5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
@@ -20,35 +20,21 @@
 package org.apache.iotdb.db.engine.sgmanager;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStore;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
-import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.overflowdata.OverflowProcessor;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
-import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
 import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
-import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.IStatistic;
 import org.apache.iotdb.db.monitor.MonitorConstants;
@@ -56,13 +42,8 @@ import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -73,23 +54,19 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(StorageGroupProcessor.class);
 
-  TsFileProcessor tsFileProcessor;
-  OverflowProcessor overflowProcessor;
+  private TsFileProcessor tsFileProcessor;
+  private OverflowProcessor overflowProcessor;
 
   //the version controller is shared by tsfile and overflow processor.
   private VersionController versionController;
 
   private FileSchema fileSchema;
-
-  Action beforeFlushAction = () -> {};
-  Action afterFlushAction = () -> {};
-  Action afterCloseAction = () -> {};
-
   /**
    * Construct processor using name space seriesPath
    */
+  @SuppressWarnings("ResultOfMethodCallIgnored")
   public StorageGroupProcessor(String processorName)
-      throws IOException, BufferWriteProcessorException, WriteProcessException, FileNodeProcessorException {
+      throws IOException, WriteProcessException, TsFileProcessorException {
     super(processorName);
 
     this.fileSchema = constructFileSchema(processorName);
@@ -102,12 +79,10 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
           getProcessorName(), systemFolder.getAbsolutePath());
     }
     versionController = new SimpleFileVersionController(systemFolder.getAbsolutePath());
-    tsFileProcessor = new TsFileProcessor(processorName, beforeFlushAction, afterFlushAction,
-        afterCloseAction, versionController, fileSchema);
-    overflowProcessor = new OverflowProcessor(processorName, beforeFlushAction, afterFlushAction,
-        afterCloseAction, versionController, fileSchema);
+    tsFileProcessor = new TsFileProcessor(processorName, versionController, fileSchema);
+    overflowProcessor = new OverflowProcessor(processorName, versionController, fileSchema);
 
-    // RegistStatService
+    // RegisterStatService
     if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
       String statStorageDeltaName =
           MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
@@ -119,31 +94,49 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
     }
   }
 
-  public OperationResult insert(InsertPlan insertPlan) throws IOException, BufferWriteProcessorException {
-    OperationResult result = tsFileProcessor.insert(insertPlan);
+  public OperationResult insert(InsertPlan insertPlan) throws IOException, TsFileProcessorException {
+    OperationResult result = getTsFileProcessor().insert(insertPlan);
     if (result == OperationResult.WRITE_REJECT_BY_TIME) {
-      result = overflowProcessor.insert(insertPlan);
+      result = getOverflowProcessor().insert(insertPlan);
     }
     return result;
   }
 
-  public void update(UpdatePlan plan) {
-    tsFileProcessor.update(plan);
-    overflowProcessor.update(plan);
+  public void update(UpdatePlan plan) throws TsFileProcessorException {
+    getTsFileProcessor().update(plan);
+    getOverflowProcessor().update(plan);
   }
 
-  public void delete(String device, String measurementId, long timestamp) throws IOException {
-    tsFileProcessor.delete(device, measurementId, timestamp);
-    overflowProcessor.delete(device, measurementId, timestamp);
+  public void delete(String device, String measurementId, long timestamp)
+      throws TsFileProcessorException {
+    try {
+      getTsFileProcessor().delete(device, measurementId, timestamp);
+    } catch (IOException e) {
+      throw new TsFileProcessorException(e);
+    }
+    try {
+      getOverflowProcessor().delete(device, measurementId, timestamp);
+    } catch (IOException e) {
+      throw new TsFileProcessorException(e);
+    }
   }
 
   /**
    * query data.
    */
   public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context)
-      throws FileNodeManagerException, IOException {
-    GlobalSortedSeriesDataSource tsfileData = tsFileProcessor.query(seriesExpression, context);
-    GlobalSortedSeriesDataSource overflowData = overflowProcessor.query(seriesExpression, context);
+      throws TsFileProcessorException {
+    try {
+      SeriesDataSource tsfileData = getTsFileProcessor().query(seriesExpression, context);
+    } catch (IOException e) {
+      throw new TsFileProcessorException(e);
+    }
+    try {
+      SeriesDataSource overflowData = getOverflowProcessor().query(seriesExpression, context);
+    } catch (IOException e) {
+      throw new TsFileProcessorException(e);
+    }
+
     return null;
   }
 
@@ -163,12 +156,12 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
   }
 
   @Override
-  public Future<Boolean> flush() throws IOException {
+  public Future<Boolean> flush() {
     return null;
   }
 
   @Override
-  public void close() throws ProcessorException {
+  public void close()  {
 
   }
 
@@ -199,7 +192,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
     return null;
   }
 
-  private FileSchema constructFileSchema(String processorName) throws WriteProcessException {
+  private FileSchema constructFileSchema(String processorName) {
     List<MeasurementSchema> columnSchemaList;
     columnSchemaList = MManager.getInstance().getSchemaForFileName(processorName);
 
@@ -210,4 +203,18 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
     return schema;
 
   }
+
+  public TsFileProcessor getTsFileProcessor() throws TsFileProcessorException {
+    if (tsFileProcessor.isClosed()) {
+      tsFileProcessor.reopen();
+    }
+    return tsFileProcessor;
+  }
+
+  public OverflowProcessor getOverflowProcessor() throws TsFileProcessorException {
+    if (overflowProcessor.isClosed()) {
+      overflowProcessor.reopen();
+    }
+    return overflowProcessor;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index 85af489..ce2e803 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -30,7 +30,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -42,10 +41,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
 import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
@@ -56,12 +53,12 @@ import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.pool.FlushManager;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
 import org.apache.iotdb.db.engine.sgmanager.OperationResult;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
@@ -76,7 +73,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
@@ -118,10 +114,7 @@ public class TsFileProcessor extends Processor {
 
   private IMemTable workMemTable;
   private IMemTable flushMemTable;
-  private RestorableTsFileIOWriter writer;
-  private Action beforeFlushAction;
-  private Action afterCloseAction;
-  private Action afterFlushAction;
+  protected RestorableTsFileIOWriter writer;
   private File insertFile;
   private TsFileResource currentResource;
 
@@ -139,26 +132,59 @@ public class TsFileProcessor extends Processor {
   private WriteLogNode logNode;
   private VersionController versionController;
 
+  private boolean isClosed = true;
+
   /**
    * constructor of BufferWriteProcessor. data will be stored in baseDir/processorName/ folder.
    *
    * @param processorName processor name
    * @param fileSchemaRef file schema
-   * @throws BufferWriteProcessorException BufferWriteProcessorException
+   * @throws TsFileProcessorException TsFileProcessorException
    */
   @SuppressWarnings({"squid:S2259", "squid:S3776"})
-  public TsFileProcessor(String processorName,
-      Action beforeFlushAction, Action afterFlushAction, Action afterCloseAction,
-      VersionController versionController,
-      FileSchema fileSchemaRef) throws BufferWriteProcessorException, IOException {
+  public TsFileProcessor(String processorName, VersionController versionController,
+      FileSchema fileSchemaRef) throws TsFileProcessorException {
     super(processorName);
     this.fileSchemaRef = fileSchemaRef;
     this.processorName = processorName;
 
-    this.beforeFlushAction = beforeFlushAction;
-    this.afterCloseAction = afterCloseAction;
-    this.afterFlushAction = afterFlushAction;
-    workMemTable = new PrimitiveMemTable();
+    reopen();
+
+    this.versionController = versionController;
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+      try {
+        logNode = MultiFileLogNodeManager.getInstance().getNode(
+            processorName + getLogSuffix(),
+            writer.getRestoreFilePath());
+      } catch (IOException e) {
+        throw new TsFileProcessorException(e);
+      }
+    }
+  }
+
+  public void reopen() throws TsFileProcessorException {
+    if (!isClosed) {
+      return;
+    }
+
+    if (workMemTable == null) {
+      workMemTable = new PrimitiveMemTable();
+    } else {
+      workMemTable.clear();
+    }
+
+    boolean noResources = tsFileResources == null;
+    if (noResources) {
+      initResources();
+    } else {
+      initCurrentTsFile(generateNewTsFilePath());
+    }
+
+    isClosed = false;
+  }
+
+  @SuppressWarnings({"ResultOfMethodCallIgnored"})
+  private void initResources() throws TsFileProcessorException {
     tsFileResources = new ArrayList<>();
     inverseIndexOfResource = new HashMap<>();
     lastFlushedTimeForEachDevice = new HashMap<>();
@@ -169,76 +195,88 @@ public class TsFileProcessor extends Processor {
     int unclosedFileCount = 0;
     for (String folderPath : getAllDataFolders()) {
       File dataFolder = new File(folderPath, processorName);
-      if (dataFolder.exists()) {
+      if (!dataFolder.exists()) {
         // we do not add the unclosed tsfile into tsFileResources.
         File[] unclosedFiles = dataFolder
             .listFiles(x -> x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX));
-        unclosedFileCount += unclosedFiles.length;
+        if (unclosedFiles != null) {
+          unclosedFileCount += unclosedFiles.length;
+        }
         if (unclosedFileCount > 1) {
           break;
-        } else if (unclosedFileCount == 1) {
+        } else if (unclosedFiles != null) {
           unclosedFileName = unclosedFiles[0].getName()
               .split(RestorableTsFileIOWriter.RESTORE_SUFFIX)[0];
           unclosedFile = new File(unclosedFiles[0].getParentFile(), unclosedFileName);
         }
-        File[] datas = dataFolder
-            .listFiles(x -> !x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX)
-                && x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
-        Arrays.sort(datas, Comparator.comparingLong(x -> Long
-            .parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
-        for (File tsfile : datas) {
-          //TODO we'd better define a file suffix for TsFile, e.g., .ts
-          String[] names = tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
-          long time = Long.parseLong(names[0]);
-          if (fileNamePrefix < time) {
-            fileNamePrefix = time;
-          }
-          if (unclosedFileCount == 0 || !tsfile.getName().equals(unclosedFileName)) {
-            TsFileResource resource = new TsFileResource(tsfile, true);
-            tsFileResources.add(resource);
-            //maintain the inverse index and fileNamePrefix
-            for (String device : resource.getDevices()) {
-              inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
-              lastFlushedTimeForEachDevice
-                  .merge(device, resource.getEndTime(device), (x, y) -> x > y ? x : y);
-            }
-          }
-        }
+        addResources(dataFolder, unclosedFileName);
+
       } else {
         //processor folder does not exist
         dataFolder.mkdirs();
       }
     }
     if (unclosedFileCount > 1) {
-      throw new BufferWriteProcessorException(String
+      throw new TsFileProcessorException(String
           .format("TsProcessor %s has more than one unclosed TsFile. please repair it",
               processorName));
-    } else if (unclosedFileCount == 0) {
+    }
+    if (unclosedFile == null) {
       unclosedFile = generateNewTsFilePath();
     }
 
     initCurrentTsFile(unclosedFile);
+  }
 
-    this.versionController = versionController;
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+  // add TsFiles in dataFolder to tsFileResources and update device inserted time map
+  private void addResources(File dataFolder, String unclosedFileName)
+      throws TsFileProcessorException {
+    File[] tsFiles = dataFolder
+        .listFiles(x -> !x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX)
+            && x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
+    if (tsFiles == null || tsFiles.length == 0) {
+      return;
+    }
+    Arrays.sort(tsFiles, Comparator.comparingLong(x -> Long
+        .parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
+
+    for (File tsfile : tsFiles) {
+      addResource(tsfile, unclosedFileName);
+    }
+  }
+
+  // add one TsFiles to tsFileResources and update device inserted time map
+  private void addResource(File tsfile, String unclosedFileName) throws TsFileProcessorException {
+    //TODO we'd better define a file suffix for TsFile, e.g., .ts
+    String[] names = tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
+    long time = Long.parseLong(names[0]);
+    if (fileNamePrefix < time) {
+      fileNamePrefix = time;
+    }
+    if (!tsfile.getName().equals(unclosedFileName)) {
+      TsFileResource resource;
       try {
-        logNode = MultiFileLogNodeManager.getInstance().getNode(
-            processorName + getLogSuffix(),
-            writer.getRestoreFilePath(),
-            FileNodeManager.getInstance().getRestoreFilePath(processorName));
+        resource = new TsFileResource(tsfile, true);
       } catch (IOException e) {
-        throw new BufferWriteProcessorException(e);
+        throw new TsFileProcessorException(e);
+      }
+      tsFileResources.add(resource);
+      //maintain the inverse index and fileNamePrefix
+      for (String device : resource.getDevices()) {
+        inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
+        lastFlushedTimeForEachDevice
+            .merge(device, resource.getEndTime(device), (x, y) -> x > y ? x : y);
       }
     }
   }
 
 
-  private File generateNewTsFilePath() throws BufferWriteProcessorException {
+  private File generateNewTsFilePath() throws TsFileProcessorException {
     String dataDir = getNextDataFolder();
     File dataFolder = new File(dataDir, processorName);
     if (!dataFolder.exists()) {
       if (!dataFolder.mkdirs()) {
-        throw new BufferWriteProcessorException(
+        throw new TsFileProcessorException(
             String.format("Can not create TsFileProcess related folder: %s", dataFolder));
       }
       LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.",
@@ -250,13 +288,13 @@ public class TsFileProcessor extends Processor {
   }
 
 
-  private void initCurrentTsFile(File file) throws BufferWriteProcessorException {
+  private void initCurrentTsFile(File file) throws TsFileProcessorException {
     this.insertFile = file;
     try {
       writer = new RestorableTsFileIOWriter(processorName, insertFile.getAbsolutePath());
       this.currentResource = new TsFileResource(insertFile, writer);
     } catch (IOException e) {
-      throw new BufferWriteProcessorException(e);
+      throw new TsFileProcessorException(e);
     }
 
     minWrittenTimeForEachDeviceInCurrentFile.clear();
@@ -275,9 +313,12 @@ public class TsFileProcessor extends Processor {
    * @param plan data to be written
    * @return OperationResult (WRITE_SUCCESS, WRITE_REJECT_BY_TIME, WRITE_IN_WARNING_MEM and
    * WRITE_REJECT_BY_MEM)
-   * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
+   * @throws TsFileProcessorException if a flushing operation occurs and failed.
    */
-  public OperationResult insert(InsertPlan plan) throws BufferWriteProcessorException, IOException {
+  public OperationResult insert(InsertPlan plan) throws TsFileProcessorException, IOException {
+    if (isClosed) {
+      return OperationResult.WRITE_REJECT_BY_CLOSED_PROCESSOR;
+    }
     if (!canWrite(plan.getDeviceId(), plan.getTime())) {
       return OperationResult.WRITE_REJECT_BY_TIME;
     }
@@ -293,11 +334,13 @@ public class TsFileProcessor extends Processor {
       memUsage += MemUtils.getPointSize(type, measurement);
     }
     UsageLevel level = BasicMemController.getInstance().acquireUsage(this, memUsage);
+    OperationResult result;
     switch (level) {
       case SAFE:
         doInsert(plan);
         checkMemThreshold4Flush(memUsage);
-        return OperationResult.WRITE_SUCCESS;
+        result = OperationResult.WRITE_SUCCESS;
+        break;
       case WARNING:
         if(LOGGER.isWarnEnabled()) {
           LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",
@@ -307,21 +350,25 @@ public class TsFileProcessor extends Processor {
         try {
           flush();
         } catch (IOException e) {
-          throw new BufferWriteProcessorException(e);
+          throw new TsFileProcessorException(e);
         }
-        return OperationResult.WRITE_IN_WARNING_MEM;
+        result = OperationResult.WRITE_IN_WARNING_MEM;
+        break;
       case DANGEROUS:
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",
               MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
         }
-        return OperationResult.WRITE_REJECT_BY_MEM;
+        result = OperationResult.WRITE_REJECT_BY_MEM;
+        break;
       default:
-        return OperationResult.WRITE_REJECT_BY_MEM;
+        result = OperationResult.WRITE_REJECT_BY_MEM;
     }
+    return result;
   }
 
-  private void doInsert(InsertPlan plan) {
+  private void doInsert(InsertPlan plan) throws TsFileProcessorException {
+    writeLog(plan);
     String deviceId = plan.getDeviceId();
     long time = plan.getTime();
     TSDataType type;
@@ -342,11 +389,11 @@ public class TsFileProcessor extends Processor {
   }
 
   public OperationResult update(UpdatePlan plan) {
-    String device = plan.getPath().getDevice();
-    String measurement = plan.getPath().getMeasurement();
-    List<Pair<Long, Long>> intervals = plan.getIntervals();
+//    String device = plan.getPath().getDevice();
+//    String measurement = plan.getPath().getMeasurement();
+//    List<Pair<Long, Long>> intervals = plan.getIntervals();
     //TODO modify workMemtable, flushMemtable, and existing TsFiles
-    return OperationResult.WRITE_REJECT_BY_MEM;
+    throw new UnsupportedOperationException("Update unimplemented!");
   }
 
   /**
@@ -389,7 +436,7 @@ public class TsFileProcessor extends Processor {
   }
 
 
-  private void checkMemThreshold4Flush(long addedMemory) throws BufferWriteProcessorException {
+  private void checkMemThreshold4Flush(long addedMemory) throws TsFileProcessorException {
     long newMem = memSize.addAndGet(addedMemory);
     if (newMem > TSFileConfig.groupSizeInByte) {
       if (LOGGER.isInfoEnabled()) {
@@ -402,7 +449,7 @@ public class TsFileProcessor extends Processor {
         flush();
       } catch (IOException e) {
         LOGGER.error("Flush bufferwrite error.", e);
-        throw new BufferWriteProcessorException(e);
+        throw new TsFileProcessorException(e);
       }
     }
   }
@@ -443,13 +490,6 @@ public class TsFileProcessor extends Processor {
     }
     fileNamePrefix = System.nanoTime();
 
-    // update the lastUpdatetime, prepare for flush
-    try {
-      beforeFlushAction.act();
-    } catch (Exception e) {
-      LOGGER.error("Failed to flush memtable into tsfile when calling the action function.");
-      throw new IOException(e);
-    }
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       logNode.notifyStartFlush();
     }
@@ -486,7 +526,6 @@ public class TsFileProcessor extends Processor {
         return true;
       }
 
-      afterFlushAction.act();
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         logNode.notifyEndFlush(null);
       }
@@ -499,7 +538,7 @@ public class TsFileProcessor extends Processor {
     } finally {
       try {
         switchFlushToWork();
-      } catch (BufferWriteProcessorException e) {
+      } catch (TsFileProcessorException e) {
         LOGGER.error(e.getMessage());
         result = false;
       }
@@ -533,7 +572,7 @@ public class TsFileProcessor extends Processor {
     }
   }
 
-  private void switchFlushToWork() throws BufferWriteProcessorException {
+  private void switchFlushToWork() throws TsFileProcessorException {
     flushQueryLock.lock();
     try {
       //we update the index of currentTsResource.
@@ -562,15 +601,15 @@ public class TsFileProcessor extends Processor {
 
   /**
    * this method do not call flush() to flush data in memory to disk.
-   * @throws BufferWriteProcessorException
+   * @throws TsFileProcessorException
    */
-  private void closeCurrentTsFileAndOpenNewOne() throws BufferWriteProcessorException {
+  private void closeCurrentTsFileAndOpenNewOne() throws TsFileProcessorException {
     closeCurrentFile();
     initCurrentTsFile(generateNewTsFilePath());
   }
 
   //very dangerous, how to make sure this function is thread safe (no other functions are running)
-  private void closeCurrentFile() throws BufferWriteProcessorException {
+  private void closeCurrentFile() throws TsFileProcessorException {
     try {
       long closeStartTime = System.currentTimeMillis();
       // end file
@@ -580,10 +619,6 @@ public class TsFileProcessor extends Processor {
         Files.delete(Paths.get(insertFile.getAbsolutePath()));
       } else {
         writer.endFile(fileSchemaRef);
-        // update the IntervalFile for interval list
-        afterCloseAction.act();
-        // flush the changed information for filenode
-        afterFlushAction.act();
 
         tsFileResources.add(currentResource);
         //maintain the inverse index
@@ -592,6 +627,7 @@ public class TsFileProcessor extends Processor {
               .add(currentResource);
         }
       }
+      writer = null;
 
       // delete the restore for this bufferwrite processor
       if (LOGGER.isInfoEnabled()) {
@@ -615,11 +651,11 @@ public class TsFileProcessor extends Processor {
     } catch (IOException e) {
       LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
           getProcessorName(), e);
-      throw new BufferWriteProcessorException(e);
+      throw new TsFileProcessorException(e);
     } catch (Exception e) {
       LOGGER
           .error("Failed to close the bufferwrite processor when calling the action function.", e);
-      throw new BufferWriteProcessorException(e);
+      throw new TsFileProcessorException(e);
     }
   }
 
@@ -647,7 +683,7 @@ public class TsFileProcessor extends Processor {
   /**
    * query data.
    */
-  public GlobalSortedSeriesDataSource query(SingleSeriesExpression expression,
+  public SeriesDataSource query(SingleSeriesExpression expression,
       QueryContext context) throws IOException {
     MeasurementSchema mSchema;
     TSDataType dataType;
@@ -659,11 +695,13 @@ public class TsFileProcessor extends Processor {
     dataType = mSchema.getType();
 
     // tsfile dataØØ
+    //TODO in the old version, tsfile is deep copied. I do not know why
     List<TsFileResource> dataFiles = new ArrayList<>();
-    for (TsFileResource tsfile : tsFileResources) {
-      //TODO in the old version, tsfile is deep copied. I do not know why
-      dataFiles.add(tsfile);
-    }
+    tsFileResources.forEach(k -> {
+      if (k.containsDevice(deviceId)) {
+        dataFiles.add(k);
+      }
+    });
     // bufferwrite data
     //TODO unsealedTsFile class is a little redundant.
     UnsealedTsFile unsealedTsFile = null;
@@ -680,9 +718,9 @@ public class TsFileProcessor extends Processor {
 
       unsealedTsFile.setTimeSeriesChunkMetaDatas(chunks);
     }
-    return new GlobalSortedSeriesDataSource(
-        new Path(deviceId + "." + measurementId), dataFiles, unsealedTsFile,
-        queryDataInMemtable(deviceId, measurementId, dataType, mSchema.getProps()));
+    return new SeriesDataSource(
+        new Path(deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId), dataFiles,
+        unsealedTsFile, queryDataInMemtable(deviceId, measurementId, dataType, mSchema.getProps()));
   }
 
   /**
@@ -712,14 +750,6 @@ public class TsFileProcessor extends Processor {
   }
 
 
-  public String getInsertFilePath() {
-    return insertFile.getAbsolutePath();
-  }
-
-  public WriteLogNode getLogNode() {
-    return logNode;
-  }
-
   /**
    * used for test. We can know when the flush() is called.
    *
@@ -739,7 +769,7 @@ public class TsFileProcessor extends Processor {
   }
 
   @Override
-  public void close() throws BufferWriteProcessorException {
+  public void close() throws TsFileProcessorException {
     closeCurrentFile();
     try {
       if (currentResource != null) {
@@ -750,14 +780,14 @@ public class TsFileProcessor extends Processor {
       }
 
     } catch (IOException e) {
-      throw new BufferWriteProcessorException(e);
+      throw new TsFileProcessorException(e);
     }
   }
 
   /**
    * remove all data of this processor. Used For UT
    */
-  public void removeMe() throws BufferWriteProcessorException, IOException {
+  void removeMe() throws TsFileProcessorException, IOException {
     try {
       flushFuture.get(10000, TimeUnit.MILLISECONDS);
     } catch (ExecutionException | TimeoutException e) {
@@ -769,8 +799,9 @@ public class TsFileProcessor extends Processor {
     close();
     for (String folder : Directories.getInstance().getAllTsFileFolders()) {
       File dataFolder = new File(folder, processorName);
-      if (dataFolder.exists()) {
-        for (File file: dataFolder.listFiles()) {
+      File[] files;
+      if (dataFolder.exists() && (files = dataFolder.listFiles()) != null) {
+        for (File file: files) {
           Files.deleteIfExists(Paths.get(file.getAbsolutePath()));
         }
       }
@@ -817,4 +848,19 @@ public class TsFileProcessor extends Processor {
   public int hashCode() {
     return super.hashCode();
   }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  private void writeLog(InsertPlan plan)
+      throws TsFileProcessorException {
+    try {
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+        logNode.write(plan);
+      }
+    } catch (IOException e) {
+      throw new TsFileProcessorException(e);
+    }
+  }
 }
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java
index c0062f1..1e21c89 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java
@@ -1,3 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iotdb.db.exception;
 
 import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
similarity index 67%
copy from iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java
copy to iotdb/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
index 27ce0ff..88ffba3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
@@ -17,15 +17,22 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.engine.sgmanager;
+package org.apache.iotdb.db.exception;
 
-public enum OperationResult {
-  //success
-  WRITE_SUCCESS,
-  //success but the memory is in warning status
-  WRITE_IN_WARNING_MEM,
-  //the write operation is reject because of the timestamp is not allowed
-  WRITE_REJECT_BY_TIME,
-  //the write operation is reject because there is no available memory
-  WRITE_REJECT_BY_MEM;
+public class TsFileProcessorException extends Exception {
+
+  public TsFileProcessorException() {
+  }
+
+  public TsFileProcessorException(String message) {
+    super(message);
+  }
+
+  public TsFileProcessorException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public TsFileProcessorException(Throwable cause) {
+    super(cause);
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 42be491..cb55dfd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -28,7 +28,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
@@ -212,7 +212,7 @@ public class QueryResourceManager {
     overflowSeriesDataSource.setOverflowInsertFileList(Collections.EMPTY_LIST);
 
     SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null);
-    GlobalSortedSeriesDataSource dataSource =processor.query(singleSeriesExpression, context);
+    SeriesDataSource dataSource =processor.query(singleSeriesExpression, context);
     QueryDataSource queryDataSource = new QueryDataSource(dataSource, overflowSeriesDataSource);
     // add used files to current thread request cached map
     filePathsManager.addUsedFilesForGivenJob(context.getJobId(), queryDataSource);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
index b691d7f..668d81e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.reader.sequence;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.reader.IAggregateReader;
 import org.apache.iotdb.db.query.reader.IBatchReader;
@@ -53,7 +53,7 @@ public class SequenceDataReader implements IBatchReader, IAggregateReader {
    * @param isReverse true-traverse chunks from behind forward, false-traverse chunks from front to
    * back.
    */
-  public SequenceDataReader(GlobalSortedSeriesDataSource sources, Filter filter,
+  public SequenceDataReader(SeriesDataSource sources, Filter filter,
       QueryContext context, boolean isReverse) throws IOException {
     seriesReaders = new ArrayList<>();
 
@@ -100,7 +100,7 @@ public class SequenceDataReader implements IBatchReader, IAggregateReader {
   /**
    * init with globalSortedSeriesDataSource, filter, context and isReverse.
    */
-  public SequenceDataReader(GlobalSortedSeriesDataSource sources, Filter filter,
+  public SequenceDataReader(SeriesDataSource sources, Filter filter,
       QueryContext context) throws IOException {
     this(sources, filter, context, false);
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java
index a5fdc44..0eff726 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.reader.sequence;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
@@ -36,7 +36,7 @@ public class SequenceDataReaderByTimestamp implements EngineReaderByTimeStamp {
   /**
    * init with globalSortedSeriesDataSource and filter.
    */
-  public SequenceDataReaderByTimestamp(GlobalSortedSeriesDataSource sources, QueryContext context)
+  public SequenceDataReaderByTimestamp(SeriesDataSource sources, QueryContext context)
       throws IOException {
     seriesReaders = new ArrayList<>();
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index 8bf9fb9..ec8527a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -106,12 +106,11 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
 
 
   @Override
-  public WriteLogNode getNode(String identifier, String restoreFilePath,
-      String processorStoreFilePath)
+  public WriteLogNode getNode(String identifier, String restoreFilePath)
       throws IOException {
     WriteLogNode node = nodeMap.get(identifier);
-    if (node == null && restoreFilePath != null && processorStoreFilePath != null) {
-      node = new ExclusiveWriteLogNode(identifier, restoreFilePath, processorStoreFilePath);
+    if (node == null && restoreFilePath != null) {
+      node = new ExclusiveWriteLogNode(identifier, restoreFilePath);
       WriteLogNode oldNode = nodeMap.putIfAbsent(identifier, node);
       if (oldNode != null) {
         return oldNode;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
index 57b3e44..a2823bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
@@ -35,9 +35,8 @@ public interface WriteLogNodeManager {
    *
    * @param identifier -identifier, the format: "{storageGroupName}-bufferwrite/overflow"
    * @param restoreFilePath -restore file path of the data file. e.g, data/settled/{storageGroupName}/{tsfileName}.restore
-   * @param processorStoreFilePath -processor store file path. e.g., data/system/info/{storageGroupName}/{storageGroupName}.restore
    */
-  WriteLogNode getNode(String identifier, String restoreFilePath, String processorStoreFilePath)
+  WriteLogNode getNode(String identifier, String restoreFilePath)
       throws IOException;
 
   /**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 2c1fd72..434e901 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -72,15 +72,13 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
    *
    * @param identifier             ExclusiveWriteLogNode identifier
    * @param restoreFilePath        restore file path
-   * @param processorStoreFilePath processor store file path
    */
-  public ExclusiveWriteLogNode(String identifier, String restoreFilePath,
-                               String processorStoreFilePath) {
+  public ExclusiveWriteLogNode(String identifier, String restoreFilePath) {
     this.identifier = identifier;
     this.logDirectory = config.getWalFolder() + File.separator + this.identifier;
     new File(logDirectory).mkdirs();
 
-    recoverPerformer = new ExclusiveLogRecoverPerformer(restoreFilePath, processorStoreFilePath,
+    recoverPerformer = new ExclusiveLogRecoverPerformer(restoreFilePath,
         this);
     currentFileWriter = new LogWriter(logDirectory + File.separator + WAL_FILE_NAME);
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
index 15e88dd..893bb1a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
@@ -55,7 +55,6 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
   private ExclusiveWriteLogNode writeLogNode;
   private String recoveryFlagPath;
   private String restoreFilePath;
-  private String processorStoreFilePath;
   private RecoverStage currStage;
   private LogReplayer replayer = new ConcreteLogReplayer();
   private RecoverPerformer fileNodeRecoverPerformer;
@@ -65,10 +64,8 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
   /**
    * constructor of ExclusiveLogRecoverPerformer.
    */
-  public ExclusiveLogRecoverPerformer(String restoreFilePath, String processorStoreFilePath,
-      ExclusiveWriteLogNode logNode) {
+  public ExclusiveLogRecoverPerformer(String restoreFilePath, ExclusiveWriteLogNode logNode) {
     this.restoreFilePath = restoreFilePath;
-    this.processorStoreFilePath = processorStoreFilePath;
     this.writeLogNode = logNode;
     this.fileNodeRecoverPerformer = new FileNodeRecoverPerformer(writeLogNode.getIdentifier());
     this.isOverflow = logNode.getFileNodeName().contains(IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
@@ -207,19 +204,6 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
       }
     }
 
-    String recoverProcessorStoreFilePath = processorStoreFilePath + RECOVER_SUFFIX;
-    File recoverProcessorStoreFile = new File(recoverProcessorStoreFilePath);
-    File processorStoreFile = new File(processorStoreFilePath);
-    if (!recoverProcessorStoreFile.exists() && processorStoreFile.exists()) {
-      try {
-        FileUtils.copyFile(processorStoreFile, recoverProcessorStoreFile);
-      } catch (Exception e) {
-        logger.error("Log node {} cannot backup processor file",
-            writeLogNode.getLogDirectory(), e);
-        throw new RecoverException("Cannot backup processor file, recovery aborted.");
-      }
-    }
-
     setFlag(BACK_UP);
     currStage = RECOVER_FILE;
     logger.info("Log node {} backup ended", writeLogNode.getLogDirectory());
@@ -239,19 +223,6 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
       throw new RecoverException("Cannot recover restore file, recovery aborted.");
     }
 
-    String recoverProcessorStoreFilePath = processorStoreFilePath + RECOVER_SUFFIX;
-    File recoverProcessorStoreFile = new File(recoverProcessorStoreFilePath);
-    try {
-      if (recoverProcessorStoreFile.exists()) {
-        FileUtils.copyFile(recoverProcessorStoreFile, new File(processorStoreFilePath));
-      }
-    } catch (Exception e) {
-      logger.error("Log node {} cannot recover processor file, because{}",
-          writeLogNode.getLogDirectory(),
-          e.getMessage());
-      throw new RecoverException("Cannot recover processor file, recovery aborted.");
-    }
-
     fileNodeRecoverPerformer.recover();
 
     currStage = REPLAY_LOG;
@@ -336,13 +307,6 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
             .error("Log node {} cannot delete backup restore file", writeLogNode.getLogDirectory());
         failedFiles.add(recoverRestoreFilePath);
     }
-    String recoverProcessorStoreFilePath = processorStoreFilePath + RECOVER_SUFFIX;
-    File recoverProcessorStoreFile = new File(recoverProcessorStoreFilePath);
-    if (recoverProcessorStoreFile.exists() && !recoverProcessorStoreFile.delete()) {
-        logger.error("Log node {} cannot delete backup processor store file",
-            writeLogNode.getLogDirectory());
-        failedFiles.add(recoverProcessorStoreFilePath);
-    }
     // clean log file
     File oldLogFile = new File(
         writeLogNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME