You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/04/18 14:27:09 UTC

[incubator-iotdb] branch add_disabled_mem_control updated: reuse BufferWriteProcessor and OverflowProcessor to reduce memory waste.

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch add_disabled_mem_control
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/add_disabled_mem_control by this push:
     new 2b5016f  reuse BufferWriteProcessor and OverflowProcessor to reduce memory waste.
2b5016f is described below

commit 2b5016f5c2f8078a600e0bfe491df75b6d2d4d7b
Author: 江天 <jt...@163.com>
AuthorDate: Thu Apr 18 22:25:01 2019 +0800

    reuse BufferWriteProcessor and OverflowProcessor to reduce memory waste.
---
 .../engine/bufferwrite/BufferWriteProcessor.java   | 133 +++++++++++----------
 .../db/engine/filenode/FileNodeProcessor.java      |  70 +++++------
 .../db/engine/overflow/io/OverflowProcessor.java   |  74 ++++++++++--
 .../db/exception/FileNodeProcessorException.java   |   4 +
 4 files changed, 167 insertions(+), 114 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 2bc394b..a0932ee 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -84,6 +84,9 @@ public class BufferWriteProcessor extends Processor {
   private WriteLogNode logNode;
   private VersionController versionController;
 
+  private boolean isClosed = false;
+  private boolean isFlush = false;
+
   /**
    * constructor of BufferWriteProcessor.
    *
@@ -100,13 +103,37 @@ public class BufferWriteProcessor extends Processor {
     super(processorName);
     this.fileSchema = fileSchema;
     this.baseDir = baseDir;
-    this.fileName = fileName;
 
+    bufferwriteFlushAction = parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
+    bufferwriteCloseAction = parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
+    filenodeFlushAction = parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+
+
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+      try {
+        logNode = MultiFileLogNodeManager.getInstance().getNode(
+            processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
+            getBufferwriteRestoreFilePath(),
+            FileNodeManager.getInstance().getRestoreFilePath(processorName));
+      } catch (IOException e) {
+        throw new BufferWriteProcessorException(e);
+      }
+    }
+    this.versionController = versionController;
+
+    reopen(fileName);
+  }
+
+  public void reopen(String fileName) throws BufferWriteProcessorException {
+    if (!isClosed) {
+      return;
+    }
+    this.fileName = fileName;
     String bDir = baseDir;
     if (bDir.length() > 0 && bDir.charAt(bDir.length() - 1) != File.separatorChar) {
       bDir = bDir + File.separatorChar;
     }
-    String dataDirPath = bDir + processorName;
+    String dataDirPath = bDir + getProcessorName();
     File dataDir = new File(dataDirPath);
     if (!dataDir.exists()) {
       dataDir.mkdirs();
@@ -114,29 +141,23 @@ public class BufferWriteProcessor extends Processor {
           dataDirPath);
     }
     this.insertFilePath = new File(dataDir, fileName).getPath();
-    bufferWriteRelativePath = processorName + File.separatorChar + fileName;
+    bufferWriteRelativePath = getProcessorName() + File.separatorChar + fileName;
     try {
-      writer = new RestorableTsFileIOWriter(processorName, insertFilePath);
+      writer = new RestorableTsFileIOWriter(getProcessorName(), insertFilePath);
     } catch (IOException e) {
       throw new BufferWriteProcessorException(e);
     }
+    if (workMemTable == null) {
+      workMemTable = new PrimitiveMemTable();
+    } else {
+      workMemTable.clear();
+    }
+  }
 
-    bufferwriteFlushAction = parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
-    bufferwriteCloseAction = parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
-    filenodeFlushAction = parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
-    workMemTable = new PrimitiveMemTable();
-
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      try {
-        logNode = MultiFileLogNodeManager.getInstance().getNode(
-            processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
-            getBufferwriteRestoreFilePath(),
-            FileNodeManager.getInstance().getRestoreFilePath(processorName));
-      } catch (IOException e) {
-        throw new BufferWriteProcessorException(e);
-      }
+  public void checkOpen() throws BufferWriteProcessorException {
+    if (isClosed) {
+      throw new BufferWriteProcessorException("BufferWriteProcessor already closed");
     }
-    this.versionController = versionController;
   }
 
   /**
@@ -153,6 +174,7 @@ public class BufferWriteProcessor extends Processor {
   public boolean write(String deviceId, String measurementId, long timestamp, TSDataType dataType,
       String value)
       throws BufferWriteProcessorException {
+    checkOpen();
     TSRecord record = new TSRecord(timestamp, deviceId);
     DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, value);
     record.addTuple(dataPoint);
@@ -168,6 +190,7 @@ public class BufferWriteProcessor extends Processor {
    * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
    */
   public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException {
+    checkOpen();
     long memUsage = MemUtils.getRecordSize(tsRecord);
     BasicMemController.UsageLevel level = BasicMemController.getInstance()
         .acquireUsage(this, memUsage);
@@ -233,7 +256,9 @@ public class BufferWriteProcessor extends Processor {
    * @return corresponding chunk data and chunk metadata in memory
    */
   public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> queryBufferWriteData(String deviceId,
-      String measurementId, TSDataType dataType, Map<String, String> props) {
+      String measurementId, TSDataType dataType, Map<String, String> props)
+      throws BufferWriteProcessorException {
+    checkOpen();
     flushQueryLock.lock();
     try {
       MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
@@ -255,10 +280,10 @@ public class BufferWriteProcessor extends Processor {
   private void switchWorkToFlush() {
     flushQueryLock.lock();
     try {
-      if (flushMemTable == null) {
-        flushMemTable = workMemTable;
-        workMemTable = new PrimitiveMemTable();
-      }
+      IMemTable temp = flushMemTable == null ? new PrimitiveMemTable() : flushMemTable;
+      flushMemTable = workMemTable;
+      workMemTable = temp;
+      isFlush = true;
     } finally {
       flushQueryLock.unlock();
     }
@@ -268,8 +293,8 @@ public class BufferWriteProcessor extends Processor {
     flushQueryLock.lock();
     try {
       flushMemTable.clear();
-      flushMemTable = null;
       writer.appendMetadata();
+      isClosed = false;
     } finally {
       flushQueryLock.unlock();
     }
@@ -329,6 +354,9 @@ public class BufferWriteProcessor extends Processor {
   // keyword synchronized is added in this method, so that only one flush task can be submitted now.
   @Override
   public synchronized Future<Boolean> flush() throws IOException {
+    if (isClosed) {
+      throw new IOException("BufferWriteProcessor closed");
+    }
     // statistic information for flush
     if (lastFlushTime > 0) {
       if (LOGGER.isInfoEnabled()) {
@@ -381,12 +409,18 @@ public class BufferWriteProcessor extends Processor {
 
   @Override
   public void close() throws BufferWriteProcessorException {
+    if (isClosed) {
+      return;
+    }
     try {
       long closeStartTime = System.currentTimeMillis();
       // flush data and wait for finishing flush
       flush().get();
       // end file
       writer.endFile(fileSchema);
+      writer = null;
+      workMemTable.clear();
+
       // update the IntervalFile for interval list
       bufferwriteCloseAction.act();
       // flush the changed information for filenode
@@ -402,6 +436,7 @@ public class BufferWriteProcessor extends Processor {
             DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
             closeEndTime - closeStartTime);
       }
+      isClosed = true;
     } catch (IOException e) {
       LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
           getProcessorName(), e);
@@ -424,13 +459,7 @@ public class BufferWriteProcessor extends Processor {
    * @return True if flushing
    */
   public boolean isFlush() {
-    // starting a flush task has two steps: set the flushMemtable, and then set the flushFuture
-    // So, the following case exists: flushMemtable != null but flushFuture is done (because the
-    // flushFuture refers to the last finished flush.
-    // And, the following case exists,too: flushMemtable == null, but flushFuture is not done.
-    // (flushTask() is not finished, but switchToWork() has done)
-    // So, checking flushMemTable is more meaningful than flushFuture.isDone().
-    return  flushMemTable != null;
+    return isFlush;
   }
 
   /**
@@ -454,38 +483,6 @@ public class BufferWriteProcessor extends Processor {
     return file.length() + memoryUsage();
   }
 
-  /**
-   * Close current TsFile and open a new one for future writes. Block new writes and wait until
-   * current writes finish.
-   */
-  public void rollToNewFile() {
-    // TODO : [MemControl] implement this
-  }
-
-  /**
-   * Check if this TsFile has too big metadata or file. If true, close current file and open a new
-   * one.
-   */
-  private boolean checkSize() {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    long metaSize = getMetaSize();
-    long fileSize = getFileSize();
-    if (metaSize >= config.getBufferwriteMetaSizeThreshold()
-        || fileSize >= config.getBufferwriteFileSizeThreshold()) {
-      LOGGER.info(
-          "The bufferwrite processor {}, size({}) of the file {} reaches threshold {}, "
-              + "size({}) of metadata reaches threshold {}.",
-          getProcessorName(), MemUtils.bytesCntToStr(fileSize), this.fileName,
-          MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()),
-          MemUtils.bytesCntToStr(metaSize),
-          MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()));
-
-      rollToNewFile();
-      return true;
-    }
-    return false;
-  }
-
   public String getBaseDir() {
     return baseDir;
   }
@@ -538,7 +535,9 @@ public class BufferWriteProcessor extends Processor {
    * @param measurementId the measurementId of the timeseries to be deleted.
    * @param timestamp the upper-bound of deletion time.
    */
-  public void delete(String deviceId, String measurementId, long timestamp) {
+  public void delete(String deviceId, String measurementId, long timestamp)
+      throws BufferWriteProcessorException {
+    checkOpen();
     workMemTable.delete(deviceId, measurementId, timestamp);
     if (isFlush()) {
       // flushing MemTable cannot be directly modified since another thread is reading it
@@ -572,4 +571,8 @@ public class BufferWriteProcessor extends Processor {
   public String toString() {
     return "BufferWriteProcessor in " + insertFilePath;
   }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 47aca8e..488f456 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -534,9 +534,16 @@ public class FileNodeProcessor extends Processor implements IStatistic {
                 + System.currentTimeMillis(),
             params, versionController, fileSchema);
       } catch (BufferWriteProcessorException e) {
-        LOGGER.error("The filenode processor {} failed to get the bufferwrite processor.",
-            processorName, e);
-        throw new FileNodeProcessorException(e);
+        throw new FileNodeProcessorException(String
+            .format("The filenode processor %s failed to get the bufferwrite processor.",
+                processorName),e);
+      }
+    } else if (bufferWriteProcessor.isClosed()){
+      try {
+        bufferWriteProcessor.reopen(insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
+            + System.currentTimeMillis());
+      } catch (BufferWriteProcessorException e) {
+        throw new FileNodeProcessorException("Cannot reopen BufferWriteProcessor", e);
       }
     }
     return bufferWriteProcessor;
@@ -565,6 +572,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
       overflowProcessor = new OverflowProcessor(processorName, params, fileSchema,
           versionController);
+    } else if (overflowProcessor.isClosed()){
+      overflowProcessor.reopen();
     }
     return overflowProcessor;
   }
@@ -573,8 +582,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * get overflow processor.
    */
   public OverflowProcessor getOverflowProcessor() {
-    if (overflowProcessor == null) {
-      LOGGER.error("The overflow processor is null when getting the overflowProcessor");
+    if (overflowProcessor == null || overflowProcessor.isClosed()) {
+      LOGGER.error("The overflow processor is null or closed when getting the overflowProcessor");
     }
     return overflowProcessor;
   }
@@ -583,15 +592,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     return overflowProcessor != null;
   }
 
-  public void setBufferwriteProcessroToClosed() {
-
-    bufferWriteProcessor = null;
-  }
-
-  public boolean hasBufferwriteProcessor() {
-
-    return bufferWriteProcessor != null;
-  }
 
   /**
    * set last update time.
@@ -797,18 +797,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         && !newFileNodes.get(newFileNodes.size() - 1).getStartTimeMap().isEmpty()) {
       unsealedTsFile = new UnsealedTsFile();
       unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() - 1).getFilePath());
-      if (bufferWriteProcessor == null) {
-        LOGGER.error(
-            "The last of tsfile {} in filenode processor {} is not closed, "
-                + "but the bufferwrite processor is null.",
-            newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName());
-        throw new FileNodeProcessorException(String.format(
-            "The last of tsfile %s in filenode processor %s is not closed, "
-                + "but the bufferwrite processor is null.",
-            newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName()));
+
+      try {
+        bufferwritedata = bufferWriteProcessor
+            .queryBufferWriteData(deviceId, measurementId, dataType, mSchema.getProps());
+      } catch (BufferWriteProcessorException e) {
+        throw new FileNodeProcessorException(e);
       }
-      bufferwritedata = bufferWriteProcessor
-          .queryBufferWriteData(deviceId, measurementId, dataType, mSchema.getProps());
 
       try {
         List<Modification> pathModifications = context.getPathModifications(
@@ -954,7 +949,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     }
     lastMergeTime = System.currentTimeMillis();
 
-    if (overflowProcessor != null) {
+    if (overflowProcessor != null && !overflowProcessor.isClosed()) {
       if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
           .getConfig().getOverflowFileSizeThreshold()) {
         if (LOGGER.isInfoEnabled()) {
@@ -1714,10 +1709,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   public FileNodeFlushFuture flush() throws IOException {
     Future<Boolean> bufferWriteFlushFuture = null;
     Future<Boolean> overflowFlushFuture = null;
-    if (bufferWriteProcessor != null) {
+    if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
       bufferWriteFlushFuture = bufferWriteProcessor.flush();
     }
-    if (overflowProcessor != null) {
+    if (overflowProcessor != null && !overflowProcessor.isClosed()) {
       overflowFlushFuture = overflowProcessor.flush();
     }
     return new FileNodeFlushFuture(bufferWriteFlushFuture, overflowFlushFuture);
@@ -1727,7 +1722,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * Close the bufferwrite processor.
    */
   public void closeBufferWrite() throws FileNodeProcessorException {
-    if (bufferWriteProcessor == null) {
+    if (bufferWriteProcessor == null || bufferWriteProcessor.isClosed()) {
       return;
     }
     try {
@@ -1735,7 +1730,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         waitForBufferWriteClose();
       }
       bufferWriteProcessor.close();
-      bufferWriteProcessor = null;
     } catch (BufferWriteProcessorException e) {
       throw new FileNodeProcessorException(e);
     }
@@ -1756,7 +1750,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * Close the overflow processor.
    */
   public void closeOverflow() throws FileNodeProcessorException {
-    if (overflowProcessor == null) {
+    if (overflowProcessor == null || overflowProcessor.isClosed()) {
       return;
     }
     try {
@@ -1764,9 +1758,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         waitForOverflowClose();
       }
       overflowProcessor.close();
-      overflowProcessor.clear();
-      overflowProcessor = null;
-    } catch (OverflowProcessorException | IOException e) {
+    } catch (OverflowProcessorException e) {
       throw new FileNodeProcessorException(e);
     }
   }
@@ -1897,7 +1889,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       // delete data in memory
       OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
       ofProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
-      if (bufferWriteProcessor != null) {
+      if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
         bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
       }
     } catch (Exception e) {
@@ -1945,8 +1937,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       }
       throw e;
     }
-    if (bufferWriteProcessor != null) {
-      bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+    if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
+      try {
+        bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+      } catch (BufferWriteProcessorException e) {
+        throw new IOException(e);
+      }
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 2e7302f..1d98d66 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -96,6 +96,9 @@ public class OverflowProcessor extends Processor {
   private WriteLogNode logNode;
   private VersionController versionController;
 
+  private boolean isClosed = false;
+  private boolean isFlush = false;
+
   public OverflowProcessor(String processorName, Map<String, Action> parameters,
       FileSchema fileSchema, VersionController versionController)
       throws IOException {
@@ -108,14 +111,7 @@ public class OverflowProcessor extends Processor {
       overflowDirPath = overflowDirPath + File.separatorChar;
     }
     this.parentPath = overflowDirPath + processorName;
-    File processorDataDir = new File(parentPath);
-    if (!processorDataDir.exists()) {
-      processorDataDir.mkdirs();
-    }
-    // recover file
-    recovery(processorDataDir);
-    // memory
-    workSupport = new OverflowMemtable();
+
     overflowFlushAction = parameters.get(FileNodeConstants.OVERFLOW_FLUSH_ACTION);
     filenodeFlushAction = parameters
         .get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
@@ -126,8 +122,36 @@ public class OverflowProcessor extends Processor {
           getOverflowRestoreFile(),
           FileNodeManager.getInstance().getRestoreFilePath(processorName));
     }
+
+    reopen();
   }
 
+  public void reopen() throws IOException {
+    if (!isClosed) {
+      return;
+    }
+    // recover file
+    File processorDataDir = new File(parentPath);
+    if (!processorDataDir.exists()) {
+      processorDataDir.mkdirs();
+    }
+    recovery(processorDataDir);
+
+    // memory
+    if (workSupport == null) {
+      workSupport = new OverflowMemtable();
+    } else {
+      workSupport.clear();
+    }
+
+  }
+  public void checkOpen() throws OverflowProcessorException {
+    if (isClosed) {
+      throw new OverflowProcessorException("OverflowProcessor already closed");
+    }
+  }
+
+
   private void recovery(File parentFile) throws IOException {
     String[] subFilePaths = clearFile(parentFile.list());
     if (subFilePaths.length == 0) {
@@ -173,6 +197,11 @@ public class OverflowProcessor extends Processor {
    * insert one time-series record
    */
   public void insert(TSRecord tsRecord) throws IOException {
+    try {
+      checkOpen();
+    } catch (OverflowProcessorException e) {
+      throw new IOException(e);
+    }
     // memory control
     long memUage = MemUtils.getRecordSize(tsRecord);
     UsageLevel usageLevel = BasicMemController.getInstance().acquireUsage(this, memUage);
@@ -261,6 +290,11 @@ public class OverflowProcessor extends Processor {
    */
   public void delete(String deviceId, String measurementId, long timestamp, long version,
       List<ModificationFile> updatedModFiles) throws IOException {
+    try {
+      checkOpen();
+    } catch (OverflowProcessorException e) {
+      throw new IOException(e);
+    }
     workResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
     workSupport.delete(deviceId, measurementId, timestamp, false);
     if (isFlush()) {
@@ -400,8 +434,10 @@ public class OverflowProcessor extends Processor {
   private void switchWorkToFlush() {
     queryFlushLock.lock();
     try {
+      OverflowMemtable temp = flushSupport == null ? new OverflowMemtable() : flushSupport;
       flushSupport = workSupport;
-      workSupport = new OverflowMemtable();
+      workSupport = temp;
+      isFlush = true;
     } finally {
       queryFlushLock.unlock();
     }
@@ -412,7 +448,7 @@ public class OverflowProcessor extends Processor {
     try {
       flushSupport.clear();
       workResource.appendMetadatas();
-      flushSupport = null;
+      isFlush = false;
     } finally {
       queryFlushLock.unlock();
     }
@@ -444,8 +480,7 @@ public class OverflowProcessor extends Processor {
   }
 
   public boolean isFlush() {
-    //see BufferWriteProcess.isFlush()
-    return  flushSupport != null;
+    return isFlush;
   }
 
   private boolean flushTask(String displayMessage) {
@@ -546,6 +581,9 @@ public class OverflowProcessor extends Processor {
 
   @Override
   public void close() throws OverflowProcessorException {
+    if (isClosed) {
+      return;
+    }
     LOGGER.info("The overflow processor {} starts close operation.", getProcessorName());
     long closeStartTime = System.currentTimeMillis();
     // flush data
@@ -570,14 +608,22 @@ public class OverflowProcessor extends Processor {
           DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
           closeEndTime - closeStartTime);
     }
+    try {
+      clear();
+    } catch (IOException e) {
+      throw new OverflowProcessorException(e);
+    }
+    isClosed = true;
   }
 
   public void clear() throws IOException {
     if (workResource != null) {
       workResource.close();
+      workResource = null;
     }
     if (mergeResource != null) {
       mergeResource.close();
+      mergeResource = null;
     }
   }
 
@@ -705,4 +751,8 @@ public class OverflowProcessor extends Processor {
   public String toString() {
     return "OverflowProcessor in " + parentPath;
   }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
 }
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
index 213d3c0..d3cf362 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
@@ -37,4 +37,8 @@ public class FileNodeProcessorException extends ProcessorException {
   public FileNodeProcessorException(Throwable throwable) {
     super(throwable.getMessage());
   }
+
+  public FileNodeProcessorException(String msg, Throwable e) {
+    super(msg, e);
+  }
 }