You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/07/23 06:52:32 UTC

[incubator-iotdb] 02/02: refactor recover process

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch improve/recover
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit a6282dcc9e1b46fa78eb5632e03763f59403c219
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Jul 23 14:50:47 2020 +0800

    refactor recover process
---
 .../apache/iotdb/hadoop/fileSystem/HDFSOutput.java |  4 +-
 .../writelog/recover/TsFileRecoverPerformer.java   | 24 +++----
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 79 +++++++++++-----------
 .../tsfile/write/writer/LocalTsFileOutput.java     |  4 +-
 .../write/writer/RestorableTsFileIOWriter.java     | 42 +++++-------
 .../iotdb/tsfile/write/writer/TsFileOutput.java    |  4 +-
 .../write/writer/RestorableTsFileIOWriterTest.java | 14 ++--
 7 files changed, 83 insertions(+), 88 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index 8693eb2..aec1a58 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -86,11 +86,11 @@ public class HDFSOutput implements TsFileOutput {
   }
 
   @Override
-  public void truncate(long position) throws IOException {
+  public void truncate(long size) throws IOException {
     if (fs.exists(path)) {
       fsDataOutputStream.close();
     }
-    fs.truncate(path, position);
+    fs.truncate(path, size);
     if (fs.exists(path)) {
       fsDataOutputStream = fs.append(path);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 22d2904..38ab3c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -127,20 +127,20 @@ public class TsFileRecoverPerformer {
       // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
       // map must be updated first to avoid duplicated insertion
       recoverResourceFromWriter(restorableTsFileIOWriter);
-    }
 
-    // redo logs
-    redoLogs(restorableTsFileIOWriter);
+      // redo logs
+      redoLogs(restorableTsFileIOWriter);
 
-    // clean logs
-    try {
-      MultiFileLogNodeManager.getInstance()
-          .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
-    } catch (IOException e) {
-      throw new StorageGroupProcessorException(e);
-    }
+      // clean logs
+      try {
+        MultiFileLogNodeManager.getInstance()
+            .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
+      } catch (IOException e) {
+        throw new StorageGroupProcessorException(e);
+      }
 
-    return restorableTsFileIOWriter;
+      return restorableTsFileIOWriter;
+    }
   }
 
   private void recoverResourceFromFile() throws IOException {
@@ -206,10 +206,10 @@ public class TsFileRecoverPerformer {
         // end the file if it is not the last file or it is closed before crush
         restorableTsFileIOWriter.endFile();
         resource.cleanCloseFlag();
+        resource.serialize();
       }
       // otherwise this file is not closed before crush, do nothing so we can continue writing
       // into it
-      resource.serialize();
     } catch (IOException | InterruptedException | ExecutionException e) {
       throw new StorageGroupProcessorException(e);
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index d259f1d..1cb124c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -96,7 +96,7 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * construct function for TsFileSequenceReader.
    *
-   * @param file -given file name
+   * @param file             -given file name
    * @param loadMetadataSize -whether load meta data size
    */
   public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException {
@@ -137,7 +137,7 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * construct function for TsFileSequenceReader.
    *
-   * @param input -given input
+   * @param input            -given input
    * @param loadMetadataSize -load meta data size
    */
   public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException {
@@ -155,10 +155,10 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * construct function for TsFileSequenceReader.
    *
-   * @param input the input of a tsfile. The current position should be a markder and then a chunk
-   * Header, rather than the magic number
-   * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
-   * of the input to the current position
+   * @param input            the input of a tsfile. The current position should be a markder and
+   *                         then a chunk Header, rather than the magic number
+   * @param fileMetadataPos  the position of the file metadata in the TsFileInput from the beginning
+   *                         of the input to the current position
    * @param fileMetadataSize the byte size of the file metadata in the input
    */
   public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) {
@@ -508,9 +508,9 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
    *
-   * @param metadataIndex MetadataIndexEntry
-   * @param buffer byte buffer
-   * @param deviceId String
+   * @param metadataIndex         MetadataIndexEntry
+   * @param buffer                byte buffer
+   * @param deviceId              String
    * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list
    */
   private void generateMetadataIndex(MetadataIndexEntry metadataIndex, ByteBuffer buffer,
@@ -584,12 +584,13 @@ public class TsFileSequenceReader implements AutoCloseable {
    * Get target MetadataIndexEntry and its end offset
    *
    * @param metadataIndex given MetadataIndexNode
-   * @param name target device / measurement name
-   * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or INTERNAL_MEASUREMENT. When
-   * searching for a device node,  return when it is not INTERNAL_DEVICE. Likewise, when searching
-   * for a measurement node, return when it is not INTERNAL_MEASUREMENT. This works for the
-   * situation when the index tree does NOT have the device level and ONLY has the measurement
-   * level.
+   * @param name          target device / measurement name
+   * @param type          target MetadataIndexNodeType, either INTERNAL_DEVICE or
+   *                      INTERNAL_MEASUREMENT. When searching for a device node,  return when it is
+   *                      not INTERNAL_DEVICE. Likewise, when searching for a measurement node,
+   *                      return when it is not INTERNAL_MEASUREMENT. This works for the situation
+   *                      when the index tree does NOT have the device level and ONLY has the
+   *                      measurement level.
    * @return target MetadataIndexEntry, endOffset pair
    */
   private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(MetadataIndexNode metadataIndex,
@@ -616,7 +617,7 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER.
    *
-   * @param position the offset of the chunk group footer in the file
+   * @param position   the offset of the chunk group footer in the file
    * @param markerRead true if the offset does not contains the marker , otherwise false
    * @return a CHUNK_GROUP_FOOTER
    * @throws IOException io error
@@ -649,9 +650,9 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * read the chunk's header.
    *
-   * @param position the file offset of this chunk's header
+   * @param position        the file offset of this chunk's header
    * @param chunkHeaderSize the size of chunk's header
-   * @param markerRead true if the offset does not contains the marker , otherwise false
+   * @param markerRead      true if the offset does not contains the marker , otherwise false
    */
   private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead)
       throws IOException {
@@ -756,8 +757,8 @@ public class TsFileSequenceReader implements AutoCloseable {
    * changed.
    *
    * @param position the start position of data in the tsFileInput, or the current position if
-   * position = -1
-   * @param size the size of data that want to read
+   *                 position = -1
+   * @param size     the size of data that want to read
    * @return data that been read.
    */
   private ByteBuffer readData(long position, int size) throws IOException {
@@ -783,8 +784,8 @@ public class TsFileSequenceReader implements AutoCloseable {
    * position.
    *
    * @param start the start position of data in the tsFileInput, or the current position if position
-   * = -1
-   * @param end the end position of data that want to read
+   *              = -1
+   * @param end   the end position of data that want to read
    * @return data that been read.
    */
   private ByteBuffer readData(long start, long end) throws IOException {
@@ -801,11 +802,11 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * Self Check the file and return the position before where the data is safe.
    *
-   * @param newSchema the schema on each time series in the file
+   * @param newSchema              the schema on each time series in the file
    * @param chunkGroupMetadataList ChunkGroupMetadata List
-   * @param versionInfo version pair List
-   * @param fastFinish if true and the file is complete, then newSchema and chunkGroupMetadataList
-   * parameter will be not modified.
+   * @param versionInfo            version pair List
+   * @param fastFinish             if true and the file is complete, then newSchema and
+   *                               chunkGroupMetadataList parameter will be not modified.
    * @return the position of the file that is fine. All data after the position in the file should
    * be truncated.
    */
@@ -835,15 +836,15 @@ public class TsFileSequenceReader implements AutoCloseable {
     if (fileSize < headerLength) {
       return TsFileCheckStatus.INCOMPATIBLE_FILE;
     }
-    String magic = readHeadMagic();
-    tsFileInput.position(headerLength);
-    if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
+    if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) || !TSFileConfig.VERSION_NUMBER
+        .equals(readTailMagic())) {
       return TsFileCheckStatus.INCOMPATIBLE_FILE;
     }
 
+    tsFileInput.position(headerLength);
     if (fileSize == headerLength) {
-      return TsFileCheckStatus.ONLY_MAGIC_HEAD;
-    } else if (readTailMagic().equals(magic)) {
+      return headerLength;
+    } else if (isComplete()) {
       loadMetadataSize();
       if (fastFinish) {
         return TsFileCheckStatus.COMPLETE_FILE;
@@ -851,7 +852,7 @@ public class TsFileSequenceReader implements AutoCloseable {
     }
     boolean newChunkGroup = true;
     // not a complete file, we will recover it...
-    long truncatedPosition = headerLength;
+    long truncatedSize = headerLength;
     byte marker;
     int chunkCnt = 0;
     List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
@@ -900,7 +901,7 @@ public class TsFileSequenceReader implements AutoCloseable {
             }
             chunkGroupMetadataList.add(new ChunkGroupMetadata(deviceID, chunkMetadataList));
             newChunkGroup = true;
-            truncatedPosition = this.position();
+            truncatedSize = this.position();
 
             totalChunkNum += chunkCnt;
             chunkCnt = 0;
@@ -909,7 +910,7 @@ public class TsFileSequenceReader implements AutoCloseable {
           case MetaMarker.VERSION:
             long version = readVersion();
             versionInfo.add(new Pair<>(position(), version));
-            truncatedPosition = this.position();
+            truncatedSize = this.position();
             break;
           default:
             // the disk file is corrupted, using this file may be dangerous
@@ -918,14 +919,14 @@ public class TsFileSequenceReader implements AutoCloseable {
       }
       // now we read the tail of the data section, so we are sure that the last
       // ChunkGroupFooter is complete.
-      truncatedPosition = this.position() - 1;
+      truncatedSize = this.position() - 1;
     } catch (Exception e) {
       logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
           file, this.position(), e.getMessage());
     }
     // Despite the completeness of the data section, we will discard current FileMetadata
     // so that we can continue to write data into this tsfile.
-    return truncatedPosition;
+    return truncatedSize;
   }
 
   public int getTotalChunkNum() {
@@ -992,7 +993,7 @@ public class TsFileSequenceReader implements AutoCloseable {
    * get device names which has valid chunks in [start, end)
    *
    * @param start start of the partition
-   * @param end end of the partition
+   * @param end   end of the partition
    * @return device names in range
    */
   public List<String> getDeviceNameInRange(long start, long end) throws IOException {
@@ -1010,8 +1011,8 @@ public class TsFileSequenceReader implements AutoCloseable {
    * Check if the device has at least one Chunk in this partition
    *
    * @param seriesMetadataMap chunkMetaDataList of each measurement
-   * @param start the start position of the space partition
-   * @param end the end position of the space partition
+   * @param start             the start position of the space partition
+   * @param end               the end position of the space partition
    */
   private boolean hasDataInPartition(Map<String, List<ChunkMetadata>> seriesMetadataMap,
       long start, long end) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
index 1e6e105..8423dfb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
@@ -79,8 +79,8 @@ public class LocalTsFileOutput implements TsFileOutput {
   }
 
   @Override
-  public void truncate(long position) throws IOException {
-    outputStream.getChannel().truncate(position);
+  public void truncate(long size) throws IOException {
+    outputStream.getChannel().truncate(size);
   }
 
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 8a12b05..536f887 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -48,7 +47,7 @@ import org.slf4j.LoggerFactory;
 public class RestorableTsFileIOWriter extends TsFileIOWriter {
 
   private static final Logger logger = LoggerFactory.getLogger("FileMonitor");
-  private long truncatedPosition = -1;
+  private long truncatedSize = -1;
   private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>();
 
   private int lastFlushedChunkGroupIndex = 0;
@@ -74,36 +73,30 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
     // file doesn't exist
     if (file.length() == 0) {
       startFile();
+      canWrite = true;
+      crashed = true;
       return;
     }
 
     if (file.exists()) {
       try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
 
-        // this tsfile is complete
-        if (reader.isComplete()) {
+        truncatedSize = reader
+            .selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true);
+        totalChunkNum = reader.getTotalChunkNum();
+        if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
           crashed = false;
           canWrite = false;
           out.close();
-          return;
-        }
-
-        // uncompleted file
-        truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true);
-        totalChunkNum = reader.getTotalChunkNum();
-        if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
+        } else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
           out.close();
           throw new NotCompatibleTsFileException(
               String.format("%s is not in TsFile format.", file.getAbsolutePath()));
-        } else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) {
-          crashed = true;
-          out.truncate(
-              (long) TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
-                  .getBytes().length);
         } else {
           crashed = true;
+          canWrite = true;
           // remove broken data
-          out.truncate(truncatedPosition);
+          out.truncate(truncatedSize);
         }
       }
     }
@@ -140,8 +133,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
     return new RestorableTsFileIOWriter(file);
   }
 
-  long getTruncatedPosition() {
-    return truncatedPosition;
+  long getTruncatedSize() {
+    return truncatedSize;
   }
 
   public Map<Path, MeasurementSchema> getKnownSchema() {
@@ -162,7 +155,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
   public List<ChunkMetadata> getVisibleMetadataList(String deviceId, String measurementId,
       TSDataType dataType) {
     List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
-    if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId).containsKey(measurementId)) {
+    if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId)
+        .containsKey(measurementId)) {
       for (ChunkMetadata chunkMetaData : metadatasForQuery.get(deviceId).get(measurementId)) {
         // filter: if a device'measurement is defined as float type, and data has been persistent.
         // Then someone deletes the timeseries and recreate it with Int type. We have to ignore
@@ -180,8 +174,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
   }
 
   /**
-   * add all appendChunkMetadatas into memory. After calling this method, other classes can
-   * read these metadata.
+   * add all appendChunkMetadatas into memory. After calling this method, other classes can read
+   * these metadata.
    */
 
   public void makeMetadataVisible() {
@@ -212,8 +206,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
   }
 
   /**
-   * get all the chunk's metadata which are appended after the last calling of this method, or
-   * after the class instance is initialized if this is the first time to call the method.
+   * get all the chunk's metadata which are appended after the last calling of this method, or after
+   * the class instance is initialized if this is the first time to call the method.
    *
    * @return a list of Device ChunkMetadataList Pair
    */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
index 5b9fab1..8da1f17 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
@@ -78,8 +78,8 @@ public interface TsFileOutput {
   /**
    * The same with {@link java.nio.channels.FileChannel#truncate(long)}.
    *
-   * @param position -position
+   * @param size The new size, a non-negative byte count
    */
-  void truncate(long position) throws IOException;
+  void truncate(long size) throws IOException;
 
 }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
index 0c777c3..a11296f 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
@@ -81,10 +81,10 @@ public class RestorableTsFileIOWriterTest {
     RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
     writer = new TsFileWriter(rWriter);
     writer.close();
-    assertEquals(TsFileCheckStatus.ONLY_MAGIC_HEAD, rWriter.getTruncatedPosition());
+    assertEquals(TsFileCheckStatus.ONLY_MAGIC_HEAD, rWriter.getTruncatedSize());
 
     rWriter = new RestorableTsFileIOWriter(file);
-    assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedPosition());
+    assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedSize());
     assertFalse(rWriter.canWrite());
     rWriter.close();
     assertTrue(file.delete());
@@ -101,7 +101,7 @@ public class RestorableTsFileIOWriterTest {
     writer = new TsFileWriter(rWriter);
     writer.close();
     assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
-        rWriter.getTruncatedPosition());
+        rWriter.getTruncatedSize());
     assertTrue(file.delete());
   }
 
@@ -115,7 +115,7 @@ public class RestorableTsFileIOWriterTest {
     TsFileWriter writer = new TsFileWriter(rWriter);
     writer.close();
     assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
-        rWriter.getTruncatedPosition());
+        rWriter.getTruncatedSize());
     assertTrue(file.delete());
   }
 
@@ -133,7 +133,7 @@ public class RestorableTsFileIOWriterTest {
     writer = new TsFileWriter(rWriter);
     writer.close();
     assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
-        rWriter.getTruncatedPosition());
+        rWriter.getTruncatedSize());
     assertTrue(file.delete());
   }
 
@@ -159,7 +159,7 @@ public class RestorableTsFileIOWriterTest {
     writer = new TsFileWriter(rWriter);
     writer.close();
     // truncate version marker and version
-    assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedPosition());
+    assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedSize());
     assertTrue(file.delete());
   }
 
@@ -221,7 +221,7 @@ public class RestorableTsFileIOWriterTest {
     writer.close();
     assertNotEquals(
         TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
-        rWriter.getTruncatedPosition());
+        rWriter.getTruncatedSize());
     TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
     List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(new Path("d1.s1"));
     assertNotNull(chunkMetadataList);