You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/07/23 06:52:30 UTC

[incubator-iotdb] branch improve/recover created (now a6282dc)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch improve/recover
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at a6282dc  refactor recover process

This branch includes the following new commits:

     new f2d9c8b  combine the duplicate method
     new a6282dc  refactor recover process

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 02/02: refactor recover process

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch improve/recover
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit a6282dcc9e1b46fa78eb5632e03763f59403c219
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Jul 23 14:50:47 2020 +0800

    refactor recover process
---
 .../apache/iotdb/hadoop/fileSystem/HDFSOutput.java |  4 +-
 .../writelog/recover/TsFileRecoverPerformer.java   | 24 +++----
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 79 +++++++++++-----------
 .../tsfile/write/writer/LocalTsFileOutput.java     |  4 +-
 .../write/writer/RestorableTsFileIOWriter.java     | 42 +++++-------
 .../iotdb/tsfile/write/writer/TsFileOutput.java    |  4 +-
 .../write/writer/RestorableTsFileIOWriterTest.java | 14 ++--
 7 files changed, 83 insertions(+), 88 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index 8693eb2..aec1a58 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -86,11 +86,11 @@ public class HDFSOutput implements TsFileOutput {
   }
 
   @Override
-  public void truncate(long position) throws IOException {
+  public void truncate(long size) throws IOException {
     if (fs.exists(path)) {
       fsDataOutputStream.close();
     }
-    fs.truncate(path, position);
+    fs.truncate(path, size);
     if (fs.exists(path)) {
       fsDataOutputStream = fs.append(path);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 22d2904..38ab3c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -127,20 +127,20 @@ public class TsFileRecoverPerformer {
       // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
       // map must be updated first to avoid duplicated insertion
       recoverResourceFromWriter(restorableTsFileIOWriter);
-    }
 
-    // redo logs
-    redoLogs(restorableTsFileIOWriter);
+      // redo logs
+      redoLogs(restorableTsFileIOWriter);
 
-    // clean logs
-    try {
-      MultiFileLogNodeManager.getInstance()
-          .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
-    } catch (IOException e) {
-      throw new StorageGroupProcessorException(e);
-    }
+      // clean logs
+      try {
+        MultiFileLogNodeManager.getInstance()
+            .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
+      } catch (IOException e) {
+        throw new StorageGroupProcessorException(e);
+      }
 
-    return restorableTsFileIOWriter;
+      return restorableTsFileIOWriter;
+    }
   }
 
   private void recoverResourceFromFile() throws IOException {
@@ -206,10 +206,10 @@ public class TsFileRecoverPerformer {
         // end the file if it is not the last file or it is closed before crush
         restorableTsFileIOWriter.endFile();
         resource.cleanCloseFlag();
+        resource.serialize();
       }
       // otherwise this file is not closed before crush, do nothing so we can continue writing
       // into it
-      resource.serialize();
     } catch (IOException | InterruptedException | ExecutionException e) {
       throw new StorageGroupProcessorException(e);
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index d259f1d..1cb124c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -96,7 +96,7 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * construct function for TsFileSequenceReader.
    *
-   * @param file -given file name
+   * @param file             -given file name
    * @param loadMetadataSize -whether load meta data size
    */
   public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException {
@@ -137,7 +137,7 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * construct function for TsFileSequenceReader.
    *
-   * @param input -given input
+   * @param input            -given input
    * @param loadMetadataSize -load meta data size
    */
   public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException {
@@ -155,10 +155,10 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * construct function for TsFileSequenceReader.
    *
-   * @param input the input of a tsfile. The current position should be a markder and then a chunk
-   * Header, rather than the magic number
-   * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
-   * of the input to the current position
+   * @param input            the input of a tsfile. The current position should be a markder and
+   *                         then a chunk Header, rather than the magic number
+   * @param fileMetadataPos  the position of the file metadata in the TsFileInput from the beginning
+   *                         of the input to the current position
    * @param fileMetadataSize the byte size of the file metadata in the input
    */
   public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) {
@@ -508,9 +508,9 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
    *
-   * @param metadataIndex MetadataIndexEntry
-   * @param buffer byte buffer
-   * @param deviceId String
+   * @param metadataIndex         MetadataIndexEntry
+   * @param buffer                byte buffer
+   * @param deviceId              String
    * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list
    */
   private void generateMetadataIndex(MetadataIndexEntry metadataIndex, ByteBuffer buffer,
@@ -584,12 +584,13 @@ public class TsFileSequenceReader implements AutoCloseable {
    * Get target MetadataIndexEntry and its end offset
    *
    * @param metadataIndex given MetadataIndexNode
-   * @param name target device / measurement name
-   * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or INTERNAL_MEASUREMENT. When
-   * searching for a device node,  return when it is not INTERNAL_DEVICE. Likewise, when searching
-   * for a measurement node, return when it is not INTERNAL_MEASUREMENT. This works for the
-   * situation when the index tree does NOT have the device level and ONLY has the measurement
-   * level.
+   * @param name          target device / measurement name
+   * @param type          target MetadataIndexNodeType, either INTERNAL_DEVICE or
+   *                      INTERNAL_MEASUREMENT. When searching for a device node,  return when it is
+   *                      not INTERNAL_DEVICE. Likewise, when searching for a measurement node,
+   *                      return when it is not INTERNAL_MEASUREMENT. This works for the situation
+   *                      when the index tree does NOT have the device level and ONLY has the
+   *                      measurement level.
    * @return target MetadataIndexEntry, endOffset pair
    */
   private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(MetadataIndexNode metadataIndex,
@@ -616,7 +617,7 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER.
    *
-   * @param position the offset of the chunk group footer in the file
+   * @param position   the offset of the chunk group footer in the file
    * @param markerRead true if the offset does not contains the marker , otherwise false
    * @return a CHUNK_GROUP_FOOTER
    * @throws IOException io error
@@ -649,9 +650,9 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * read the chunk's header.
    *
-   * @param position the file offset of this chunk's header
+   * @param position        the file offset of this chunk's header
    * @param chunkHeaderSize the size of chunk's header
-   * @param markerRead true if the offset does not contains the marker , otherwise false
+   * @param markerRead      true if the offset does not contains the marker , otherwise false
    */
   private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead)
       throws IOException {
@@ -756,8 +757,8 @@ public class TsFileSequenceReader implements AutoCloseable {
    * changed.
    *
    * @param position the start position of data in the tsFileInput, or the current position if
-   * position = -1
-   * @param size the size of data that want to read
+   *                 position = -1
+   * @param size     the size of data that want to read
    * @return data that been read.
    */
   private ByteBuffer readData(long position, int size) throws IOException {
@@ -783,8 +784,8 @@ public class TsFileSequenceReader implements AutoCloseable {
    * position.
    *
    * @param start the start position of data in the tsFileInput, or the current position if position
-   * = -1
-   * @param end the end position of data that want to read
+   *              = -1
+   * @param end   the end position of data that want to read
    * @return data that been read.
    */
   private ByteBuffer readData(long start, long end) throws IOException {
@@ -801,11 +802,11 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * Self Check the file and return the position before where the data is safe.
    *
-   * @param newSchema the schema on each time series in the file
+   * @param newSchema              the schema on each time series in the file
    * @param chunkGroupMetadataList ChunkGroupMetadata List
-   * @param versionInfo version pair List
-   * @param fastFinish if true and the file is complete, then newSchema and chunkGroupMetadataList
-   * parameter will be not modified.
+   * @param versionInfo            version pair List
+   * @param fastFinish             if true and the file is complete, then newSchema and
+   *                               chunkGroupMetadataList parameter will be not modified.
    * @return the position of the file that is fine. All data after the position in the file should
    * be truncated.
    */
@@ -835,15 +836,15 @@ public class TsFileSequenceReader implements AutoCloseable {
     if (fileSize < headerLength) {
       return TsFileCheckStatus.INCOMPATIBLE_FILE;
     }
-    String magic = readHeadMagic();
-    tsFileInput.position(headerLength);
-    if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
+    if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) || !TSFileConfig.VERSION_NUMBER
+        .equals(readTailMagic())) {
       return TsFileCheckStatus.INCOMPATIBLE_FILE;
     }
 
+    tsFileInput.position(headerLength);
     if (fileSize == headerLength) {
-      return TsFileCheckStatus.ONLY_MAGIC_HEAD;
-    } else if (readTailMagic().equals(magic)) {
+      return headerLength;
+    } else if (isComplete()) {
       loadMetadataSize();
       if (fastFinish) {
         return TsFileCheckStatus.COMPLETE_FILE;
@@ -851,7 +852,7 @@ public class TsFileSequenceReader implements AutoCloseable {
     }
     boolean newChunkGroup = true;
     // not a complete file, we will recover it...
-    long truncatedPosition = headerLength;
+    long truncatedSize = headerLength;
     byte marker;
     int chunkCnt = 0;
     List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
@@ -900,7 +901,7 @@ public class TsFileSequenceReader implements AutoCloseable {
             }
             chunkGroupMetadataList.add(new ChunkGroupMetadata(deviceID, chunkMetadataList));
             newChunkGroup = true;
-            truncatedPosition = this.position();
+            truncatedSize = this.position();
 
             totalChunkNum += chunkCnt;
             chunkCnt = 0;
@@ -909,7 +910,7 @@ public class TsFileSequenceReader implements AutoCloseable {
           case MetaMarker.VERSION:
             long version = readVersion();
             versionInfo.add(new Pair<>(position(), version));
-            truncatedPosition = this.position();
+            truncatedSize = this.position();
             break;
           default:
             // the disk file is corrupted, using this file may be dangerous
@@ -918,14 +919,14 @@ public class TsFileSequenceReader implements AutoCloseable {
       }
       // now we read the tail of the data section, so we are sure that the last
       // ChunkGroupFooter is complete.
-      truncatedPosition = this.position() - 1;
+      truncatedSize = this.position() - 1;
     } catch (Exception e) {
       logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
           file, this.position(), e.getMessage());
     }
     // Despite the completeness of the data section, we will discard current FileMetadata
     // so that we can continue to write data into this tsfile.
-    return truncatedPosition;
+    return truncatedSize;
   }
 
   public int getTotalChunkNum() {
@@ -992,7 +993,7 @@ public class TsFileSequenceReader implements AutoCloseable {
    * get device names which has valid chunks in [start, end)
    *
    * @param start start of the partition
-   * @param end end of the partition
+   * @param end   end of the partition
    * @return device names in range
    */
   public List<String> getDeviceNameInRange(long start, long end) throws IOException {
@@ -1010,8 +1011,8 @@ public class TsFileSequenceReader implements AutoCloseable {
    * Check if the device has at least one Chunk in this partition
    *
    * @param seriesMetadataMap chunkMetaDataList of each measurement
-   * @param start the start position of the space partition
-   * @param end the end position of the space partition
+   * @param start             the start position of the space partition
+   * @param end               the end position of the space partition
    */
   private boolean hasDataInPartition(Map<String, List<ChunkMetadata>> seriesMetadataMap,
       long start, long end) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
index 1e6e105..8423dfb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
@@ -79,8 +79,8 @@ public class LocalTsFileOutput implements TsFileOutput {
   }
 
   @Override
-  public void truncate(long position) throws IOException {
-    outputStream.getChannel().truncate(position);
+  public void truncate(long size) throws IOException {
+    outputStream.getChannel().truncate(size);
   }
 
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 8a12b05..536f887 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -48,7 +47,7 @@ import org.slf4j.LoggerFactory;
 public class RestorableTsFileIOWriter extends TsFileIOWriter {
 
   private static final Logger logger = LoggerFactory.getLogger("FileMonitor");
-  private long truncatedPosition = -1;
+  private long truncatedSize = -1;
   private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>();
 
   private int lastFlushedChunkGroupIndex = 0;
@@ -74,36 +73,30 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
     // file doesn't exist
     if (file.length() == 0) {
       startFile();
+      canWrite = true;
+      crashed = true;
       return;
     }
 
     if (file.exists()) {
       try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
 
-        // this tsfile is complete
-        if (reader.isComplete()) {
+        truncatedSize = reader
+            .selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true);
+        totalChunkNum = reader.getTotalChunkNum();
+        if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
           crashed = false;
           canWrite = false;
           out.close();
-          return;
-        }
-
-        // uncompleted file
-        truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true);
-        totalChunkNum = reader.getTotalChunkNum();
-        if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
+        } else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
           out.close();
           throw new NotCompatibleTsFileException(
               String.format("%s is not in TsFile format.", file.getAbsolutePath()));
-        } else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) {
-          crashed = true;
-          out.truncate(
-              (long) TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
-                  .getBytes().length);
         } else {
           crashed = true;
+          canWrite = true;
           // remove broken data
-          out.truncate(truncatedPosition);
+          out.truncate(truncatedSize);
         }
       }
     }
@@ -140,8 +133,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
     return new RestorableTsFileIOWriter(file);
   }
 
-  long getTruncatedPosition() {
-    return truncatedPosition;
+  long getTruncatedSize() {
+    return truncatedSize;
   }
 
   public Map<Path, MeasurementSchema> getKnownSchema() {
@@ -162,7 +155,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
   public List<ChunkMetadata> getVisibleMetadataList(String deviceId, String measurementId,
       TSDataType dataType) {
     List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
-    if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId).containsKey(measurementId)) {
+    if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId)
+        .containsKey(measurementId)) {
       for (ChunkMetadata chunkMetaData : metadatasForQuery.get(deviceId).get(measurementId)) {
         // filter: if a device'measurement is defined as float type, and data has been persistent.
         // Then someone deletes the timeseries and recreate it with Int type. We have to ignore
@@ -180,8 +174,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
   }
 
   /**
-   * add all appendChunkMetadatas into memory. After calling this method, other classes can
-   * read these metadata.
+   * add all appendChunkMetadatas into memory. After calling this method, other classes can read
+   * these metadata.
    */
 
   public void makeMetadataVisible() {
@@ -212,8 +206,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
   }
 
   /**
-   * get all the chunk's metadata which are appended after the last calling of this method, or
-   * after the class instance is initialized if this is the first time to call the method.
+   * get all the chunk's metadata which are appended after the last calling of this method, or after
+   * the class instance is initialized if this is the first time to call the method.
    *
    * @return a list of Device ChunkMetadataList Pair
    */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
index 5b9fab1..8da1f17 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
@@ -78,8 +78,8 @@ public interface TsFileOutput {
   /**
    * The same with {@link java.nio.channels.FileChannel#truncate(long)}.
    *
-   * @param position -position
+   * @param size The new size, a non-negative byte count
    */
-  void truncate(long position) throws IOException;
+  void truncate(long size) throws IOException;
 
 }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
index 0c777c3..a11296f 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
@@ -81,10 +81,10 @@ public class RestorableTsFileIOWriterTest {
     RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
     writer = new TsFileWriter(rWriter);
     writer.close();
-    assertEquals(TsFileCheckStatus.ONLY_MAGIC_HEAD, rWriter.getTruncatedPosition());
+    assertEquals(TsFileCheckStatus.ONLY_MAGIC_HEAD, rWriter.getTruncatedSize());
 
     rWriter = new RestorableTsFileIOWriter(file);
-    assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedPosition());
+    assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedSize());
     assertFalse(rWriter.canWrite());
     rWriter.close();
     assertTrue(file.delete());
@@ -101,7 +101,7 @@ public class RestorableTsFileIOWriterTest {
     writer = new TsFileWriter(rWriter);
     writer.close();
     assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
-        rWriter.getTruncatedPosition());
+        rWriter.getTruncatedSize());
     assertTrue(file.delete());
   }
 
@@ -115,7 +115,7 @@ public class RestorableTsFileIOWriterTest {
     TsFileWriter writer = new TsFileWriter(rWriter);
     writer.close();
     assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
-        rWriter.getTruncatedPosition());
+        rWriter.getTruncatedSize());
     assertTrue(file.delete());
   }
 
@@ -133,7 +133,7 @@ public class RestorableTsFileIOWriterTest {
     writer = new TsFileWriter(rWriter);
     writer.close();
     assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
-        rWriter.getTruncatedPosition());
+        rWriter.getTruncatedSize());
     assertTrue(file.delete());
   }
 
@@ -159,7 +159,7 @@ public class RestorableTsFileIOWriterTest {
     writer = new TsFileWriter(rWriter);
     writer.close();
     // truncate version marker and version
-    assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedPosition());
+    assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedSize());
     assertTrue(file.delete());
   }
 
@@ -221,7 +221,7 @@ public class RestorableTsFileIOWriterTest {
     writer.close();
     assertNotEquals(
         TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
-        rWriter.getTruncatedPosition());
+        rWriter.getTruncatedSize());
     TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
     List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(new Path("d1.s1"));
     assertNotNull(chunkMetadataList);


[incubator-iotdb] 01/02: combine the duplicate method

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch improve/recover
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit f2d9c8b25b03aa231962e82e019243591982d8c3
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Jul 23 11:17:40 2020 +0800

    combine the duplicate method
---
 .../engine/storagegroup/StorageGroupProcessor.java | 226 ++++++++++-----------
 .../writelog/recover/TsFileRecoverPerformer.java   |   2 +-
 2 files changed, 106 insertions(+), 122 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f1818ec..a6cf002 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -72,7 +72,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.metadata.mnode.InternalMNode;
 import org.apache.iotdb.db.metadata.mnode.LeafMNode;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -228,22 +227,20 @@ public class StorageGroupProcessor {
   private TsFileFlushPolicy fileFlushPolicy;
 
   /**
-   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
-   * not including the files generated by merge) of each partition.
-   * As data file close is managed by the leader in the distributed version, the files with the
-   * same version(s) have the same data, despite that the inner structure (the size and
-   * organization of chunks) may be different, so we can easily find what remote files we do not
-   * have locally.
-   * partition number -> version number set
+   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not
+   * including the files generated by merge) of each partition. As data file close is managed by the
+   * leader in the distributed version, the files with the same version(s) have the same data,
+   * despite that the inner structure (the size and organization of chunks) may be different, so we
+   * can easily find what remote files we do not have locally. partition number -> version number
+   * set
    */
   private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
 
   /**
-   * The max file versions in each partition. By recording this, if several IoTDB instances have
-   * the same policy of closing file and their ingestion is identical, then files of the same
-   * version in different IoTDB instance will have identical data, providing convenience for data
-   * comparison across different instances.
-   * partition number -> max version number
+   * The max file versions in each partition. By recording this, if several IoTDB instances have the
+   * same policy of closing file and their ingestion is identical, then files of the same version in
+   * different IoTDB instance will have identical data, providing convenience for data comparison
+   * across different instances. partition number -> max version number
    */
   private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
 
@@ -271,27 +268,29 @@ public class StorageGroupProcessor {
     try {
       // collect candidate TsFiles from sequential and unsequential data directory
       Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair = getAllFiles(
-              DirectoryManager.getInstance().getAllSequenceFileFolders());
+          DirectoryManager.getInstance().getAllSequenceFileFolders());
       List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
       List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
       upgradeSeqFileList.addAll(oldSeqTsFiles);
       Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair = getAllFiles(
-              DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+          DirectoryManager.getInstance().getAllUnSequenceFileFolders());
       List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
       List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
       upgradeUnseqFileList.addAll(oldUnseqTsFiles);
 
-      recoverSeqFiles(tmpSeqTsFiles);
-      recoverUnseqFiles(tmpUnseqTsFiles);
+      recoverTsFiles(tmpSeqTsFiles, true, workSequenceTsFileProcessors, sequenceFileTreeSet);
+      recoverTsFiles(tmpUnseqTsFiles, false, workUnsequenceTsFileProcessors, unSequenceFileList);
 
       for (TsFileResource resource : sequenceFileTreeSet) {
         long partitionNum = resource.getTimePartition();
-        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+            .addAll(resource.getHistoricalVersions());
         updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
       }
       for (TsFileResource resource : unSequenceFileList) {
         long partitionNum = resource.getTimePartition();
-        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+            .addAll(resource.getHistoricalVersions());
         updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
       }
 
@@ -316,7 +315,6 @@ public class StorageGroupProcessor {
       throw new StorageGroupProcessorException(e);
     }
 
-
     for (TsFileResource resource : sequenceFileTreeSet) {
       long timePartitionId = resource.getTimePartition();
       Map<String, Long> endTimeMap = new HashMap<>();
@@ -345,11 +343,11 @@ public class StorageGroupProcessor {
   /**
    * use old seq file to update latestTimeForEachDevice, globalLatestFlushedTimeForEachDevice,
    * partitionLatestFlushedTimeForEachDevice and timePartitionIdVersionControllerMap
-   *
    */
   private void updateLastestFlushedTime() throws IOException {
 
-    VersionController versionController = new SimpleFileVersionController(storageGroupSysDir.getPath());
+    VersionController versionController = new SimpleFileVersionController(
+        storageGroupSysDir.getPath());
     long currentVersion = versionController.currVersion();
     for (TsFileResource resource : upgradeSeqFileList) {
       for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
@@ -358,24 +356,27 @@ public class StorageGroupProcessor {
         long endTime = resource.getEndTime(index);
         long endTimePartitionId = StorageEngine.getTimePartition(endTime);
         latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new HashMap<>())
-                .put(deviceId, endTime);
+            .put(deviceId, endTime);
         globalLatestFlushedTimeForEachDevice.put(deviceId, endTime);
 
         // set all the covered partition's LatestFlushedTime to Long.MAX_VALUE
         long partitionId = StorageEngine.getTimePartition(resource.getStartTime(index));
         while (partitionId <= endTimePartitionId) {
           partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId, l -> new HashMap<>())
-                  .put(deviceId, Long.MAX_VALUE);
+              .put(deviceId, Long.MAX_VALUE);
           if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
-            File directory = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId));
-            if(!directory.exists()){
+            File directory = SystemFileFactory.INSTANCE
+                .getFile(storageGroupSysDir, String.valueOf(partitionId));
+            if (!directory.exists()) {
               directory.mkdirs();
             }
-            File versionFile = SystemFileFactory.INSTANCE.getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
+            File versionFile = SystemFileFactory.INSTANCE
+                .getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
             if (!versionFile.createNewFile()) {
               logger.warn("Version file {} has already been created ", versionFile);
             }
-            timePartitionIdVersionControllerMap.put(partitionId, new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
+            timePartitionIdVersionControllerMap.put(partitionId,
+                new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
           }
           partitionId++;
         }
@@ -401,7 +402,8 @@ public class StorageGroupProcessor {
         });
   }
 
-  private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders) throws IOException {
+  private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders)
+      throws IOException {
     List<File> tsFiles = new ArrayList<>();
     List<File> upgradeFiles = new ArrayList<>();
     for (String baseDir : folders) {
@@ -419,9 +421,12 @@ public class StorageGroupProcessor {
       // the process was interrupted before the merged files could be named
       continueFailedRenames(fileFolder, MERGE_SUFFIX);
 
-      File[] oldTsfileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
-      File[] oldResourceFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
-      File[] oldModificationFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
+      File[] oldTsfileArray = fsFactory
+          .listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
+      File[] oldResourceFileArray = fsFactory
+          .listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
+      File[] oldModificationFileArray = fsFactory
+          .listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
       File upgradeFolder = fsFactory.getFile(fileFolder, IoTDBConstant.UPGRADE_FOLDER_NAME);
       // move the old files to upgrade folder if exists
       if (oldTsfileArray.length != 0 || oldResourceFileArray.length != 0) {
@@ -511,13 +516,14 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void recoverSeqFiles(List<TsFileResource> tsFiles) {
+  private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq,
+      TreeMap<Long, TsFileProcessor> treeMap, Collection<TsFileResource> tsFileResources) {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
       long timePartitionId = tsFileResource.getTimePartition();
 
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
-          getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false,
+          getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, !isSeq,
           i == tsFiles.size() - 1);
 
       RestorableTsFileIOWriter writer;
@@ -535,49 +541,14 @@ public class StorageGroupProcessor {
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
-            this::updateLatestFlushTimeCallback, true, writer);
-        workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
-        tsFileResource.setProcessor(tsFileProcessor);
-        tsFileResource.removeResourceFile();
-        tsFileProcessor.setTimeRangeId(timePartitionId);
-        writer.makeMetadataVisible();
-      }
-      sequenceFileTreeSet.add(tsFileResource);
-    }
-  }
-
-  private void recoverUnseqFiles(List<TsFileResource> tsFiles) {
-    for (int i = 0; i < tsFiles.size(); i++) {
-      TsFileResource tsFileResource = tsFiles.get(i);
-      long timePartitionId = tsFileResource.getTimePartition();
-
-      TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
-          getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
-          i == tsFiles.size() - 1);
-      RestorableTsFileIOWriter writer;
-      try {
-        writer = recoverPerformer.recover();
-      } catch (StorageGroupProcessorException e) {
-        logger.warn("Skip TsFile: {} because of error in recover: ", tsFileResource.getPath(), e);
-        continue;
-      }
-      if (i != tsFiles.size() - 1 || !writer.canWrite()) {
-        // not the last file or cannot write, just close it
-        tsFileResource.setClosed(true);
-      } else if (writer.canWrite()) {
-        // the last file is not closed, continue writing to it
-        TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
-            getVersionControllerByTimePartitionId(timePartitionId),
-            this::closeUnsealedTsFileProcessorCallBack,
-            this::unsequenceFlushCallback, false, writer);
-        workUnsequenceTsFileProcessors
-            .put(timePartitionId, tsFileProcessor);
+            this::updateLatestFlushTimeCallback, isSeq, writer);
+        treeMap.put(timePartitionId, tsFileProcessor);
         tsFileResource.setProcessor(tsFileProcessor);
         tsFileResource.removeResourceFile();
         tsFileProcessor.setTimeRangeId(timePartitionId);
         writer.makeMetadataVisible();
       }
-      unSequenceFileList.add(tsFileResource);
+      tsFileResources.add(tsFileResource);
     }
   }
 
@@ -609,7 +580,8 @@ public class StorageGroupProcessor {
       long timePartitionId = StorageEngine.getTimePartition(insertPlan.getTime());
 
       latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
-      partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
+      partitionLatestFlushedTimeForEachDevice
+          .computeIfAbsent(timePartitionId, id -> new HashMap<>());
 
       // insert to sequence or unSequence file
       insertToTsFileProcessor(insertPlan,
@@ -648,7 +620,8 @@ public class StorageGroupProcessor {
       // before is first start point
       int before = loc;
       // before time partition
-      long beforeTimePartition = StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
+      long beforeTimePartition = StorageEngine
+          .getTimePartition(insertTabletPlan.getTimes()[before]);
       // init map
       long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
           computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
@@ -709,15 +682,15 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * insert batch to tsfile processor thread-safety that the caller need to guarantee
-   * The rows to be inserted are in the range [start, end)
+   * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be
+   * inserted are in the range [start, end)
    *
    * @param insertTabletPlan insert a tablet of a device
-   * @param sequence whether is sequence
-   * @param start start index of rows to be inserted in insertTabletPlan
-   * @param end end index of rows to be inserted in insertTabletPlan
-   * @param results result array
-   * @param timePartitionId time partition id
+   * @param sequence         whether is sequence
+   * @param start            start index of rows to be inserted in insertTabletPlan
+   * @param end              end index of rows to be inserted in insertTabletPlan
+   * @param results          result array
+   * @param timePartitionId  time partition id
    */
   private void insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
       int start, int end, boolean sequence, TSStatus[] results, long timePartitionId)
@@ -845,10 +818,10 @@ public class StorageGroupProcessor {
   /**
    * get processor from hashmap, flush oldest processor if necessary
    *
-   * @param timeRangeId time partition range
+   * @param timeRangeId            time partition range
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
-   * @param fileList file list to add new processor
-   * @param sequence whether is sequence or not
+   * @param fileList               file list to add new processor
+   * @param sequence               whether is sequence or not
    */
   private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
       TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1228,7 +1201,7 @@ public class StorageGroupProcessor {
                   .query(deviceId, measurementId, schema.getType(), schema.getEncodingType(),
                       schema.getProps(), context);
 
-          tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(), 
+          tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(),
               tsFileResource.getDeviceToIndexMap(),
               tsFileResource.getStartTimes(), tsFileResource.getEndTimes(), pair.left,
               pair.right));
@@ -1265,7 +1238,8 @@ public class StorageGroupProcessor {
 
     int deviceIndex = tsFileResource.getDeviceToIndexMap().get(deviceId);
     long startTime = tsFileResource.getStartTime(deviceIndex);
-    long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex) : Long.MAX_VALUE;
+    long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex)
+        : Long.MAX_VALUE;
 
     if (!isAlive(endTime)) {
       return false;
@@ -1282,9 +1256,9 @@ public class StorageGroupProcessor {
    * Delete data whose timestamp <= 'timestamp' and belongs to the time series
    * deviceId.measurementId.
    *
-   * @param deviceId the deviceId of the timeseries to be deleted.
+   * @param deviceId      the deviceId of the timeseries to be deleted.
    * @param measurementId the measurementId of the timeseries to be deleted.
-   * @param timestamp the delete range is (0, timestamp].
+   * @param timestamp     the delete range is (0, timestamp].
    */
   public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
     // TODO: how to avoid partial deletion?
@@ -1339,7 +1313,8 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId)
+  private void logDeletion(long timestamp, String deviceId, String measurementId,
+      long timePartitionId)
       throws IOException {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId));
@@ -1421,7 +1396,8 @@ public class StorageGroupProcessor {
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
           .put(entry.getKey(), entry.getValue());
-      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(), entry.getKey(), entry.getValue());
+      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+          entry.getKey(), entry.getValue());
       if (globalLatestFlushedTimeForEachDevice
           .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
         globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
@@ -1434,10 +1410,11 @@ public class StorageGroupProcessor {
   /**
    * used for upgrading
    */
-  public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId, String deviceId, long time) {
+  public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId,
+      String deviceId, long time) {
     newlyFlushedPartitionLatestFlushedTimeForEachDevice
-            .computeIfAbsent(partitionId, id -> new HashMap<>())
-            .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
+        .computeIfAbsent(partitionId, id -> new HashMap<>())
+        .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
   }
 
   /**
@@ -1490,9 +1467,9 @@ public class StorageGroupProcessor {
     List<TsFileResource> upgradedResources = tsFileResource.getUpgradedResources();
     for (TsFileResource resource : upgradedResources) {
       long partitionId = resource.getTimePartition();
-      resource.getDeviceToIndexMap().forEach((device, index) -> 
-        updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device, 
-            resource.getEndTime(index))
+      resource.getDeviceToIndexMap().forEach((device, index) ->
+          updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
+              resource.getEndTime(index))
       );
     }
     insertLock.writeLock().lock();
@@ -1506,7 +1483,7 @@ public class StorageGroupProcessor {
     }
     mergeLock.writeLock().unlock();
     insertLock.writeLock().unlock();
-    
+
     // after upgrade complete, update partitionLatestFlushedTimeForEachDevice
     if (countUpgradeFiles() == 0) {
       for (Entry<Long, Map<String, Long>> entry : newlyFlushedPartitionLatestFlushedTimeForEachDevice
@@ -1678,7 +1655,7 @@ public class StorageGroupProcessor {
         if (seqFile.getWriteQueryLock().writeLock().isHeldByCurrentThread()) {
           seqFile.getWriteQueryLock().writeLock().unlock();
         }
-        if(mergeLock.writeLock().isHeldByCurrentThread()) {
+        if (mergeLock.writeLock().isHeldByCurrentThread()) {
           mergeLock.writeLock().unlock();
         }
       }
@@ -1717,7 +1694,7 @@ public class StorageGroupProcessor {
     mergeLock.writeLock().lock();
     try {
       if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
-          newFilePartitionId)){
+          newFilePartitionId)) {
         updateLatestTimeMap(newTsFileResource);
       }
     } catch (DiskSpaceInsufficientException e) {
@@ -1772,7 +1749,8 @@ public class StorageGroupProcessor {
           if (!newFileName.equals(tsfileToBeInserted.getName())) {
             logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
                 tsfileToBeInserted.getName(), newFileName);
-            newTsFileResource.setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
+            newTsFileResource
+                .setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
           }
         }
         loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
@@ -1784,7 +1762,8 @@ public class StorageGroupProcessor {
       long partitionNum = newTsFileResource.getTimePartition();
       partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
           .addAll(newTsFileResource.getHistoricalVersions());
-      updatePartitionFileVersion(partitionNum, Collections.max(newTsFileResource.getHistoricalVersions()));
+      updatePartitionFileVersion(partitionNum,
+          Collections.max(newTsFileResource.getHistoricalVersions()));
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1798,12 +1777,14 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
+   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into
+   * them.
+   *
    * @param newTsFileResource
    * @param newFilePartitionId
-   * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted
-   *         POS_OVERLAP(-3) if some file overlaps the new file
-   *         an insertion position i >= -1 if the new file can be inserted between [i, i+1]
+   * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
+   * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
+   * file can be inserted between [i, i+1]
    */
   private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId,
       List<TsFileResource> sequenceList) {
@@ -1844,11 +1825,11 @@ public class StorageGroupProcessor {
 
   /**
    * Compare each device in the two files to find the time relation of them.
+   *
    * @param fileA
    * @param fileB
-   * @return -1 if fileA is totally older than fileB (A < B)
-   *          0 if fileA is partially older than fileB and partially newer than fileB (A X B)
-   *          1 if fileA is totally newer than fileB (B < A)
+   * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
+   * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
    */
   private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
     boolean hasPre = false, hasSubsequence = false;
@@ -1885,9 +1866,9 @@ public class StorageGroupProcessor {
 
   /**
    * If the historical versions of a file is a sub-set of the given file's, remove it to reduce
-   * unnecessary merge. Only used when the file sender and the receiver share the same file
-   * close policy.
-   * Warning: DO NOT REMOVE
+   * unnecessary merge. Only used when the file sender and the receiver share the same file close
+   * policy. Warning: DO NOT REMOVE
+   *
    * @param resource
    */
   @SuppressWarnings("unused")
@@ -1948,9 +1929,9 @@ public class StorageGroupProcessor {
    * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
    * version number is the version number in the tsfile with a larger timestamp.
    *
-   * @param tsfileName origin tsfile name
-   * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex
-   *                   + 1]
+   * @param tsfileName  origin tsfile name
+   * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
+   *                    1]
    * @return appropriate filename
    */
   private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2014,12 +1995,12 @@ public class StorageGroupProcessor {
   /**
    * Execute the loading process by the type.
    *
-   * @param type load type
-   * @param tsFileResource tsfile resource to be loaded
+   * @param type            load type
+   * @param tsFileResource  tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
-   * @UsedBy sync module, load external tsfile module.
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.
+   * @UsedBy sync module, load external tsfile module.
    */
   private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
       TsFileResource tsFileResource, long filePartitionId)
@@ -2027,9 +2008,11 @@ public class StorageGroupProcessor {
     File targetFile;
     switch (type) {
       case LOAD_UNSEQUENCE:
-        targetFile = fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
-            storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
-                .getFile().getName());
+        targetFile = fsFactory
+            .getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+                storageGroupName + File.separatorChar + filePartitionId + File.separator
+                    + tsFileResource
+                    .getFile().getName());
         tsFileResource.setFile(targetFile);
         if (unSequenceFileList.contains(tsFileResource)) {
           logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
@@ -2088,7 +2071,8 @@ public class StorageGroupProcessor {
     }
     partitionDirectFileVersions.computeIfAbsent(filePartitionId,
         p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
-    updatePartitionFileVersion(filePartitionId, Collections.max(tsFileResource.getHistoricalVersions()));
+    updatePartitionFileVersion(filePartitionId,
+        Collections.max(tsFileResource.getHistoricalVersions()));
     return true;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index c4818e3..22d2904 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -76,7 +76,7 @@ public class TsFileRecoverPerformer {
    * 1. recover the TsFile by RestorableTsFileIOWriter and truncate the file to remaining corrected
    * data 2. redo the WALs to recover unpersisted data 3. flush and close the file 4. clean WALs
    *
-   * @return a RestorableTsFileIOWriter if the file is not closed before crush, so this writer can
+   * @return a RestorableTsFileIOWriter if the file is not closed before crash, so this writer can
    * be used to continue writing
    */
   public RestorableTsFileIOWriter recover() throws StorageGroupProcessorException {