You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/01/26 02:45:05 UTC

[incubator-iotdb] branch fix_sonar_jt updated: fix other problems in FileNodeProcessor

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_sonar_jt
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/fix_sonar_jt by this push:
     new 6e16e93  fix other problems in FileNodeProcessor
6e16e93 is described below

commit 6e16e93ea27b334bdbfd4a51c9c19b372637bcd4
Author: 江天 <jt...@163.com>
AuthorDate: Sat Jan 26 10:44:17 2019 +0800

    fix other problems in FileNodeProcessor
---
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  17 +-
 .../db/engine/filenode/FileNodeProcessor.java      | 194 +++++++++++----------
 2 files changed, 110 insertions(+), 101 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 0139e03..d206960 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -418,14 +418,15 @@ public class FileNodeManager implements IStatistic, IService {
     if (bufferWriteProcessor
             .getFileSize() > IoTDBDescriptor.getInstance()
             .getConfig().bufferwriteFileSizeThreshold) {
-      String memSize = MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize());
-      String memThrehold = MemUtils.bytesCntToStr(
-              IoTDBDescriptor.getInstance().getConfig().bufferwriteFileSizeThreshold);
-      LOGGER.info(
-              "The filenode processor {} will close the bufferwrite processor, "
-                      + "because the size[{}] of tsfile {} reaches the threshold {}",
-              filenodeName, memSize,
-              bufferWriteProcessor.getFileName(), memThrehold);
+      if (LOGGER.isInfoEnabled()) {
+        LOGGER.info(
+                "The filenode processor {} will close the bufferwrite processor, "
+                        + "because the size[{}] of tsfile {} reaches the threshold {}",
+                filenodeName, MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize()),
+                bufferWriteProcessor.getFileName(), MemUtils.bytesCntToStr(
+                        IoTDBDescriptor.getInstance().getConfig().bufferwriteFileSizeThreshold));
+      }
+
       fileNodeProcessor.closeBufferWrite();
     }
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 50f2b40..abac706 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *          http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -72,8 +72,6 @@ import org.apache.iotdb.db.utils.FileSchemaUtils;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.common.constant.JsonFormatConstant;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -88,7 +86,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
 import org.apache.iotdb.tsfile.write.schema.JsonConverter;
@@ -108,7 +105,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           + " will be overflowed in the filenode processor {}, ";
   private static final String RESTORE_FILE_SUFFIX = ".restore";
   private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeProcessor.class);
-  private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
   private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
   private static final MManager mManager = MManager.getInstance();
   private static final Directories directories = Directories.getInstance();
@@ -120,15 +116,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   private volatile boolean isOverflowed;
   private Map<String, Long> lastUpdateTimeMap;
   private Map<String, Long> flushLastUpdateTimeMap;
-  private Map<String, List<IntervalFileNode>> invertedindexOfFiles;
+  private Map<String, List<IntervalFileNode>> invertedIndexOfFiles;
   private IntervalFileNode emptyIntervalFileNode;
   private IntervalFileNode currentIntervalFileNode;
   private List<IntervalFileNode> newFileNodes;
   private FileNodeProcessorStatus isMerging;
   // this is used when work->merge operation
-  private int numOfMergeFile = 0;
-  private FileNodeProcessorStore fileNodeProcessorStore = null;
-  private String fileNodeRestoreFilePath = null;
+  private int numOfMergeFile;
+  private FileNodeProcessorStore fileNodeProcessorStore;
+  private String fileNodeRestoreFilePath;
   private final Object fileNodeRestoreLock = new Object();
   private String baseDirPath;
   // last merge time
@@ -142,41 +138,34 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   // system recovery
   private boolean shouldRecovery = false;
   // statistic monitor parameters
-  private Map<String, Action> parameters = null;
+  private Map<String, Action> parameters;
   private FileSchema fileSchema;
-  private Action flushFileNodeProcessorAction = new Action() {
-
-    @Override
-    public void act() throws ActionException {
-      synchronized (fileNodeProcessorStore) {
-        try {
-          writeStoreToDisk(fileNodeProcessorStore);
-        } catch (FileNodeProcessorException e) {
-          throw new ActionException(e);
-        }
+  private Action flushFileNodeProcessorAction = () -> {
+    synchronized (fileNodeProcessorStore) {
+      try {
+        writeStoreToDisk(fileNodeProcessorStore);
+      } catch (FileNodeProcessorException e) {
+        throw new ActionException(e);
       }
     }
   };
-  private Action bufferwriteFlushAction = new Action() {
-
-    @Override
-    public void act() throws ActionException {
-      // update the lastUpdateTime Notice: Thread safe
-      synchronized (fileNodeProcessorStore) {
-        // deep copy
-        Map<String, Long> tempLastUpdateMap = new HashMap<>(lastUpdateTimeMap);
-        // update flushLastUpdateTimeMap
-        for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
-          flushLastUpdateTimeMap.put(entry.getKey(), entry.getValue() + 1);
-        }
-        fileNodeProcessorStore.setLastUpdateTimeMap(tempLastUpdateMap);
+  private Action bufferwriteFlushAction = () -> {
+    // update the lastUpdateTime Notice: Thread safe
+    synchronized (fileNodeProcessorStore) {
+      // deep copy
+      Map<String, Long> tempLastUpdateMap = new HashMap<>(lastUpdateTimeMap);
+      // update flushLastUpdateTimeMap
+      for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
+        flushLastUpdateTimeMap.put(entry.getKey(), entry.getValue() + 1);
       }
+      fileNodeProcessorStore.setLastUpdateTimeMap(tempLastUpdateMap);
     }
   };
+
   private Action bufferwriteCloseAction = new Action() {
 
     @Override
-    public void act() throws ActionException {
+    public void act() {
       synchronized (fileNodeProcessorStore) {
         fileNodeProcessorStore.setLastUpdateTimeMap(lastUpdateTimeMap);
         addLastTimeToIntervalFile();
@@ -197,18 +186,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       }
     }
   };
-  private Action overflowFlushAction = new Action() {
-
-    @Override
-    public void act() throws ActionException {
+  private Action overflowFlushAction = () -> {
 
-      // update the new IntervalFileNode List and emptyIntervalFile.
-      // Notice: thread safe
-      synchronized (fileNodeProcessorStore) {
-        fileNodeProcessorStore.setOverflowed(isOverflowed);
-        fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
-        fileNodeProcessorStore.setNewFileNodes(newFileNodes);
-      }
+    // update the new IntervalFileNode List and emptyIntervalFile.
+    // Notice: thread safe
+    synchronized (fileNodeProcessorStore) {
+      fileNodeProcessorStore.setOverflowed(isOverflowed);
+      fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
+      fileNodeProcessorStore.setNewFileNodes(newFileNodes);
     }
   };
   // Token for query which used to
@@ -247,7 +232,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     if (!dataDir.exists()) {
       dataDir.mkdirs();
       LOGGER.info(
-              "The data directory of the filenode processor {} doesn't exist. Create new directory {}",
+              "The data directory of the filenode processor {} doesn't exist. Create new " +
+                      "directory {}",
               getProcessorName(), baseDirPath);
     }
     fileNodeRestoreFilePath = new File(dataDir, processorName + RESTORE_FILE_SUFFIX).getPath();
@@ -255,7 +241,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       fileNodeProcessorStore = readStoreFromDisk();
     } catch (FileNodeProcessorException e) {
       LOGGER.error(
-              "The fileNode processor {} encountered an error when recoverying restore information.",
+              "The fileNode processor {} encountered an error when recoverying restore " +
+                      "information.",
               processorName, e);
       throw new FileNodeProcessorException(e);
     }
@@ -265,7 +252,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     newFileNodes = fileNodeProcessorStore.getNewFileNodes();
     isMerging = fileNodeProcessorStore.getFileNodeProcessorStatus();
     numOfMergeFile = fileNodeProcessorStore.getNumOfMergeFile();
-    invertedindexOfFiles = new HashMap<>();
+    invertedIndexOfFiles = new HashMap<>();
     // deep clone
     flushLastUpdateTimeMap = new HashMap<>();
     for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
@@ -326,7 +313,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>();
     TSRecord tsRecord = new TSRecord(curTime, statStorageDeltaName);
     Map<String, AtomicLong> hashMap = getStatParamsHashMap();
-    tsRecord.dataPointList = new ArrayList<DataPoint>();
+    tsRecord.dataPointList = new ArrayList<>();
     for (Map.Entry<String, AtomicLong> entry : hashMap.entrySet()) {
       tsRecord.dataPointList.add(new LongDataPoint(entry.getKey(), entry.getValue().get()));
     }
@@ -357,10 +344,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   void setIntervalFileNodeStartTime(String deviceId) {
     if (currentIntervalFileNode.getStartTime(deviceId) == -1) {
       currentIntervalFileNode.setStartTime(deviceId, flushLastUpdateTimeMap.get(deviceId));
-      if (!invertedindexOfFiles.containsKey(deviceId)) {
-        invertedindexOfFiles.put(deviceId, new ArrayList<>());
+      if (!invertedIndexOfFiles.containsKey(deviceId)) {
+        invertedIndexOfFiles.put(deviceId, new ArrayList<>());
       }
-      invertedindexOfFiles.get(deviceId).add(currentIntervalFileNode);
+      invertedIndexOfFiles.get(deviceId).add(currentIntervalFileNode);
     }
   }
 
@@ -382,17 +369,17 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 
   private void addAllFileIntoIndex(List<IntervalFileNode> fileList) {
     // clear map
-    invertedindexOfFiles.clear();
+    invertedIndexOfFiles.clear();
     // add all file to index
     for (IntervalFileNode fileNode : fileList) {
       if (fileNode.getStartTimeMap().isEmpty()) {
         continue;
       }
       for (String deviceId : fileNode.getStartTimeMap().keySet()) {
-        if (!invertedindexOfFiles.containsKey(deviceId)) {
-          invertedindexOfFiles.put(deviceId, new ArrayList<>());
+        if (!invertedIndexOfFiles.containsKey(deviceId)) {
+          invertedIndexOfFiles.put(deviceId, new ArrayList<>());
         }
-        invertedindexOfFiles.get(deviceId).add(fileNode);
+        invertedIndexOfFiles.get(deviceId).add(fileNode);
       }
     }
   }
@@ -441,10 +428,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
               .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
       String baseDir = directories
               .getTsFileFolder(newFileNodes.get(newFileNodes.size() - 1).getBaseDirIndex());
-      LOGGER.info(
-              "The filenode processor {} will recovery the bufferwrite processor, "
-                      + "the bufferwrite file is {}",
-              getProcessorName(), fileNames[fileNames.length - 1]);
+      if (LOGGER.isInfoEnabled()) {
+        LOGGER.info(
+                "The filenode processor {} will recovery the bufferwrite processor, "
+                        + "the bufferwrite file is {}",
+                getProcessorName(), fileNames[fileNames.length - 1]);
+      }
+
       try {
         bufferWriteProcessor = new BufferWriteProcessor(baseDir, getProcessorName(),
                 fileNames[fileNames.length - 1], parameters, fileSchema);
@@ -538,12 +528,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    */
   public OverflowProcessor getOverflowProcessor(String processorName) throws IOException {
     if (overflowProcessor == null) {
-      Map<String, Action> parameters = new HashMap<>();
+      Map<String, Action> paramparams = new HashMap<>();
       // construct processor or restore
-      parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
-      parameters
+      paramparams.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
+      paramparams
               .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
-      overflowProcessor = new OverflowProcessor(processorName, parameters, fileSchema);
+      overflowProcessor = new OverflowProcessor(processorName, paramparams, fileSchema);
     }
     return overflowProcessor;
   }
@@ -611,7 +601,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * For insert overflow.
    */
   public void changeTypeToChanged(String deviceId, long timestamp) {
-    if (!invertedindexOfFiles.containsKey(deviceId)) {
+    if (!invertedIndexOfFiles.containsKey(deviceId)) {
       LOGGER.warn(
               WARN_NO_SUCH_OVERFLOWED_FILE
                       + "the data is [device:{},time:{}]",
@@ -620,7 +610,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
       emptyIntervalFileNode.changeTypeToChanged(isMerging);
     } else {
-      List<IntervalFileNode> temp = invertedindexOfFiles.get(deviceId);
+      List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
       int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp);
       changeTypeToChanged(temp.get(index), deviceId);
     }
@@ -637,7 +627,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * For update overflow.
    */
   public void changeTypeToChanged(String deviceId, long startTime, long endTime) {
-    if (!invertedindexOfFiles.containsKey(deviceId)) {
+    if (!invertedIndexOfFiles.containsKey(deviceId)) {
       LOGGER.warn(
               WARN_NO_SUCH_OVERFLOWED_FILE
                       + "the data is [device:{}, start time:{}, end time:{}]",
@@ -646,7 +636,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
       emptyIntervalFileNode.changeTypeToChanged(isMerging);
     } else {
-      List<IntervalFileNode> temp = invertedindexOfFiles.get(deviceId);
+      List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
       int left = searchIndexNodeByTimestamp(deviceId, startTime, temp);
       int right = searchIndexNodeByTimestamp(deviceId, endTime, temp);
       for (int i = left; i <= right; i++) {
@@ -659,7 +649,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * For delete overflow.
    */
   public void changeTypeToChangedForDelete(String deviceId, long timestamp) {
-    if (!invertedindexOfFiles.containsKey(deviceId)) {
+    if (!invertedIndexOfFiles.containsKey(deviceId)) {
       LOGGER.warn(
               WARN_NO_SUCH_OVERFLOWED_FILE
                       + "the data is [device:{}, delete time:{}]",
@@ -668,7 +658,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
       emptyIntervalFileNode.changeTypeToChanged(isMerging);
     } else {
-      List<IntervalFileNode> temp = invertedindexOfFiles.get(deviceId);
+      List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
       int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp);
       for (int i = 0; i <= index; i++) {
         temp.get(i).changeTypeToChanged(isMerging);
@@ -713,13 +703,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 
   /**
    * remove multiple pass lock.
+   * TODO: use the return value or remove it.
    */
   public boolean removeMultiPassLock(int token) {
     if (newMultiPassTokenSet.contains(token)) {
       newMultiPassLock.readLock().unlock();
       newMultiPassTokenSet.remove(token);
       LOGGER
-              .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token, getProcessorName(),
+              .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token,
+                      getProcessorName(),
                       newMultiPassTokenSet, newMultiPassLock);
       return true;
     } else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
@@ -743,7 +735,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId)
           throws FileNodeProcessorException {
     // query overflow data
-    TSDataType dataType = null;
+    TSDataType dataType;
     try {
       dataType = mManager.getSeriesType(deviceId + "." + measurementId);
     } catch (PathErrorException e) {
@@ -814,9 +806,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       }
       if (targetFile.exists()) {
         throw new FileNodeProcessorException(
-                String.format("The appended target file %s already exists.", appendFile.getFilePath()));
+                String.format("The appended target file %s already exists.",
+                        appendFile.getFilePath()));
+      }
+      if (!originFile.renameTo(targetFile)) {
+        LOGGER.warn("File renaming failed when appending new file. Origin: {}, target: {}",
+                originFile.getPath(),
+                targetFile.getPath());
       }
-      originFile.renameTo(targetFile);
       // append the new tsfile
       this.newFileNodes.add(appendFile);
       // update the lastUpdateTime
@@ -914,12 +911,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     if (overflowProcessor != null) {
       if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
               .getConfig().overflowFileSizeThreshold) {
-        LOGGER.info(
-                "Skip this merge taks submission, because the size{} of overflow processor {} "
-                        + "does not reaches the threshold {}.",
-                MemUtils.bytesCntToStr(overflowProcessor.getFileSize()), getProcessorName(),
-                MemUtils.bytesCntToStr(
-                        IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
+        if (LOGGER.isInfoEnabled()) {
+          LOGGER.info(
+                  "Skip this merge taks submission, because the size{} of overflow processor {} "
+                          + "does not reaches the threshold {}.",
+                  MemUtils.bytesCntToStr(overflowProcessor.getFileSize()), getProcessorName(),
+                  MemUtils.bytesCntToStr(
+                          IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
+        }
         return null;
       }
     } else {
@@ -937,7 +936,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     } else {
       if (!isOverflowed) {
         LOGGER.info(
-                "Skip this merge taks submission, because the filenode processor {} is not overflowed.",
+                "Skip this merge taks submission, because the filenode processor {} is not " +
+                        "overflowed.",
                 getProcessorName());
       } else {
         LOGGER.warn(
@@ -1110,8 +1110,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     while (iterator.hasNext()) {
       Entry<String, Long> entry = iterator.next();
       String deviceId = entry.getKey();
-      if (invertedindexOfFiles.containsKey(deviceId)) {
-        invertedindexOfFiles.get(deviceId).get(0).setOverflowChangeType(OverflowChangeType.CHANGED);
+      if (invertedIndexOfFiles.containsKey(deviceId)) {
+        invertedIndexOfFiles.get(deviceId).get(0).setOverflowChangeType(OverflowChangeType.CHANGED);
         startTimeMap.remove(deviceId);
         iterator.remove();
       }
@@ -1163,7 +1163,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       Map<String, Long> startTimeMap = new HashMap<>();
       Map<String, Long> endTimeMap = new HashMap<>();
       for (String deviceId : intervalFileNode.getEndTimeMap().keySet()) {
-        List<IntervalFileNode> temp = invertedindexOfFiles.get(deviceId);
+        List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
         int index = temp.indexOf(intervalFileNode);
         int size = temp.size();
         // start time
@@ -1245,7 +1245,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           writeStoreToDisk(fileNodeProcessorStore);
         } catch (FileNodeProcessorException e) {
           LOGGER.error(
-                  "Merge: failed to write filenode information to revocery file, the filenode is {}.",
+                  "Merge: failed to write filenode information to revocery file, the filenode is " +
+                          "{}.",
                   getProcessorName(), e);
           throw new FileNodeProcessorException(
                   "Merge: write filenode information to revocery file failed, the filenode is "
@@ -1259,8 +1260,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 
   private void updateEmpty(IntervalFileNode empty, List<IntervalFileNode> result) {
     for (String deviceId : empty.getStartTimeMap().keySet()) {
-      if (invertedindexOfFiles.containsKey(deviceId)) {
-        IntervalFileNode temp = invertedindexOfFiles.get(deviceId).get(0);
+      if (invertedIndexOfFiles.containsKey(deviceId)) {
+        IntervalFileNode temp = invertedIndexOfFiles.get(deviceId).get(0);
         if (temp.getMergeChanged().contains(deviceId)) {
           empty.setOverflowChangeType(OverflowChangeType.CHANGED);
           break;
@@ -1383,8 +1384,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         continue;
       }
       for (File file : files) {
-        if (!bufferFiles.contains(file.getPath())) {
-          file.delete();
+        if (!bufferFiles.contains(file.getPath()) && !file.delete()) {
+          LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
         }
       }
     }
@@ -1412,7 +1413,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       List<Path> pathList = new ArrayList<>();
       mergeIsRowGroupHasData = false;
       mergeStartPos = -1;
-      ChunkGroupFooter footer = null;
+      ChunkGroupFooter footer;
       int numOfChunk = 0;
       try {
         List<String> pathStrings = mManager.getLeafNodePathInNextLevel(deviceId);
@@ -1427,7 +1428,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         continue;
       }
       for (Path path : pathList) {
-        // query one measurenment in the special deviceId
+        // query one measurement in the special deviceId
         String measurementId = path.getMeasurement();
         TSDataType dataType = mManager.getSeriesType(path.getFullPath());
         OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor.queryMerge(deviceId,
@@ -1795,7 +1796,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
             Objects.equals(statParamsHashMap, that.statParamsHashMap) &&
             Objects.equals(lastUpdateTimeMap, that.lastUpdateTimeMap) &&
             Objects.equals(flushLastUpdateTimeMap, that.flushLastUpdateTimeMap) &&
-            Objects.equals(invertedindexOfFiles, that.invertedindexOfFiles) &&
+            Objects.equals(invertedIndexOfFiles, that.invertedIndexOfFiles) &&
             Objects.equals(emptyIntervalFileNode, that.emptyIntervalFileNode) &&
             Objects.equals(currentIntervalFileNode, that.currentIntervalFileNode) &&
             Objects.equals(newFileNodes, that.newFileNodes) &&
@@ -1819,7 +1820,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), statStorageDeltaName, statParamsHashMap, isOverflowed, lastUpdateTimeMap, flushLastUpdateTimeMap, invertedindexOfFiles, emptyIntervalFileNode, currentIntervalFileNode, newFileNodes, isMerging, numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath, baseDirPath, lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet, newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, shouldRecovery, parameters, fileSchema, flu [...]
+    return Objects.hash(super.hashCode(), statStorageDeltaName, statParamsHashMap, isOverflowed,
+            lastUpdateTimeMap, flushLastUpdateTimeMap, invertedIndexOfFiles,
+            emptyIntervalFileNode, currentIntervalFileNode, newFileNodes, isMerging,
+            numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath, baseDirPath,
+            lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet,
+            newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, shouldRecovery, parameters,
+            fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
+            bufferwriteCloseAction, overflowFlushAction, multiPassLockToken);
   }
 
   public class MergeRunnale implements Runnable {