You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/24 11:15:40 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix recover process

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new c59bf4a  fix recover process
c59bf4a is described below

commit c59bf4a443e4f210c5672274cc908bd6c851ebaf
Author: qiaojialin <64...@qq.com>
AuthorDate: Mon Jun 24 19:15:27 2019 +0800

    fix recover process
---
 .../db/engine/filenodeV2/FileNodeManagerV2.java    |  1 +
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 15 ++++--
 .../db/engine/filenodeV2/TsFileResourceV2.java     | 59 ++++++++++++++++++++++
 .../filenodeV2/UnsealedTsFileProcessorV2.java      |  3 ++
 .../recover/SeqTsFileRecoverPerformer.java         | 31 ++++++++----
 .../tsfile/common/constant/SystemConstant.java     |  1 +
 .../iotdb/tsfile/read/TsFileRestorableReader.java  |  2 +-
 .../apache/iotdb/tsfile/write/TsFileWriter.java    | 10 ----
 .../write/writer/NativeRestorableIOWriter.java     | 37 ++++++++------
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  1 -
 .../write/writer/NativeRestorableIOWriterTest.java |  4 +-
 11 files changed, 121 insertions(+), 43 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 3a8f4f0..b27e172 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -365,6 +365,7 @@ public class FileNodeManagerV2 implements IService {
   }
 
   /**
+   * flush command
    * Sync asyncCloseOneProcessor all file node processors.
    */
   public void syncCloseAllProcessor() throws FileNodeManagerException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 40a9151..890a9da 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.engine.filenodeV2;
 
+import static org.apache.iotdb.tsfile.common.constant.SystemConstant.TSFILE_SUFFIX;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
@@ -33,6 +35,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
@@ -144,7 +147,7 @@ public class FileNodeProcessorV2 {
       if (!fileFolder.exists()) {
         continue;
       }
-      for (File tsfile: fileFolder.listFiles(file->!file.getName().contains("mods"))) {
+      for (File tsfile: fileFolder.listFiles(file->file.getName().endsWith(TSFILE_SUFFIX))) {
         tsFiles.add(tsfile);
       }
     }
@@ -157,11 +160,16 @@ public class FileNodeProcessorV2 {
       if (!fileFolder.exists()) {
         continue;
       }
-      for (File tsfile: fileFolder.listFiles(file->!file.getName().contains("mods"))) {
+      for (File tsfile: fileFolder.listFiles(file->file.getName().endsWith(TSFILE_SUFFIX))) {
         tsFiles.add(tsfile);
       }
     }
     recoverUnseqFiles(tsFiles);
+
+    for (TsFileResourceV2 resource: sequenceFileList) {
+      latestTimeForEachDevice.putAll(resource.getEndTimeMap());
+      latestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
+    }
   }
 
   private void recoverSeqFiles(List<File> tsfiles) throws ProcessorException {
@@ -304,7 +312,8 @@ public class FileNodeProcessorV2 {
     new File(baseDir, storageGroupName).mkdirs();
 
     String filePath = Paths.get(baseDir, storageGroupName,
-        System.currentTimeMillis() + "-" + versionController.nextVersion()).toString();
+        System.currentTimeMillis() + "-" + versionController.nextVersion()).toString()
+        + TSFILE_SUFFIX;
 
     return new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
         fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback, this::updateLatestFlushTimeCallback);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index 86abcf2..2bfb04f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -18,19 +18,33 @@
  */
 package org.apache.iotdb.db.engine.filenodeV2;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 public class TsFileResourceV2 {
 
   private File file;
 
+  public static final String RESOURCE_SUFFIX = ".resource";
+
   /**
    * device -> start time
    */
@@ -81,6 +95,51 @@ public class TsFileResourceV2 {
     this.readOnlyMemChunk = readOnlyMemChunk;
   }
 
+  public TsFileResourceV2(File file, Map<String, Long> startTimeMap, Map<String, Long> endTimeMap) {
+    this.file = file;
+    this.startTimeMap = startTimeMap;
+    this.endTimeMap = endTimeMap;
+    this.closed = true;
+  }
+
+
+  public void serialize() throws IOException {
+    try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file + RESOURCE_SUFFIX))){
+      ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
+      for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+      ReadWriteIOUtils.write(this.endTimeMap.size(), outputStream);
+      for (Entry<String, Long> entry : this.endTimeMap.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+    }
+  }
+
+
+  public void deSerialize() throws IOException {
+    try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file + RESOURCE_SUFFIX))) {
+      int size = ReadWriteIOUtils.readInt(inputStream);
+      Map<String, Long> startTimes = new HashMap<>();
+      for (int i = 0; i < size; i++) {
+        String path = ReadWriteIOUtils.readString(inputStream);
+        long time = ReadWriteIOUtils.readLong(inputStream);
+        startTimes.put(path, time);
+      }
+      size = ReadWriteIOUtils.readInt(inputStream);
+      Map<String, Long> endTimes = new HashMap<>();
+      for (int i = 0; i < size; i++) {
+        String path = ReadWriteIOUtils.readString(inputStream);
+        long time = ReadWriteIOUtils.readLong(inputStream);
+        endTimes.put(path, time);
+      }
+      this.startTimeMap = startTimes;
+      this.endTimeMap = endTimes;
+    }
+  }
+
   public void updateStartTime(String device, long time) {
     startTimeMap.putIfAbsent(device, time);
     long startTime = startTimeMap.get(device);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index d3fde32..bac903d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -323,6 +323,9 @@ public class UnsealedTsFileProcessorV2 {
 
   private void endFile() throws IOException {
     long closeStartTime = System.currentTimeMillis();
+
+    tsFileResource.serialize();
+
     writer.endFile(fileSchema);
     //FIXME suppose the flush-thread-pool is 2.
     // then if a flush task and a endFile task are running in the same time
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
index 6f1f342..95703a0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
@@ -19,12 +19,12 @@
 
 package org.apache.iotdb.db.writelog.recover;
 
+import static org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2.RESOURCE_SUFFIX;
+
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.channels.FileChannel;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
+import org.apache.iotdb.db.engine.filenodeV2.UnsealedTsFileProcessorV2;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
@@ -78,17 +78,27 @@ public class SeqTsFileRecoverPerformer {
     // remove corrupted part of the TsFile
     NativeRestorableIOWriter restorableTsFileIOWriter = null;
     try {
-      restorableTsFileIOWriter = new NativeRestorableIOWriter(insertFile, true);
+      restorableTsFileIOWriter = new NativeRestorableIOWriter(insertFile);
     } catch (IOException e) {
       throw new ProcessorException(e);
     }
 
-    // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
-    // map must be updated first to avoid duplicated insertion
-    for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter.getChunkGroupMetaDatas()) {
-      for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
-        tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
-        tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+    if (!restorableTsFileIOWriter.hasCrashed()) {
+      try {
+        tsFileResource.deSerialize();
+        return;
+      } catch (IOException e) {
+        throw new ProcessorException("recover the resource file failed: " + insertFilePath
+            + RESOURCE_SUFFIX, e);
+      }
+    } else {
+      // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
+      // map must be updated first to avoid duplicated insertion
+      for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter.getChunkGroupMetaDatas()) {
+        for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
+          tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
+          tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+        }
       }
     }
 
@@ -106,6 +116,7 @@ public class SeqTsFileRecoverPerformer {
     // close file
     try {
       restorableTsFileIOWriter.endFile(fileSchema);
+      tsFileResource.serialize();
     } catch (IOException e) {
       throw new ProcessorException("Cannot setCloseMark file when recovering", e);
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/SystemConstant.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/SystemConstant.java
index d3a6f56..fe3063b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/SystemConstant.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/SystemConstant.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.common.constant;
 
 public class SystemConstant {
 
+  public static final String TSFILE_SUFFIX = ".tsfile";
   public static final String TSFILE_HOME = "TSFILE_HOME";
   public static final String TSFILE_CONF = "TSFILE_CONF";
   public static final String PATH_SEPARATOR = ".";
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileRestorableReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileRestorableReader.java
index 44b29a6..16b339c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileRestorableReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileRestorableReader.java
@@ -61,7 +61,7 @@ public class TsFileRestorableReader extends TsFileSequenceReader {
     if (!isComplete()) {
       // Try to close it
       LOGGER.info("File {} has no correct tail magic, try to repair...", file);
-      NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(new File(file), false);
+      NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(new File(file));
       TsFileWriter writer = new TsFileWriter(rWriter);
       // This writes the right magic string
       writer.close();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index 52cfdf9..b626020 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -118,16 +118,6 @@ public class TsFileWriter implements AutoCloseable{
    * init this TsFileWriter.
    *
    * @param file the File to be written by this TsFileWriter
-   * @param conf the configuration of this TsFile
-   */
-  public TsFileWriter(File file, TSFileConfig conf) throws IOException {
-    this(new TsFileIOWriter(file), new FileSchema(), conf);
-  }
-
-  /**
-   * init this TsFileWriter.
-   *
-   * @param file the File to be written by this TsFileWriter
    * @param schema the schema of this TsFile
    * @param conf the configuration of this TsFile
    */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
index 5d4b4e4..6b86691 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
@@ -48,6 +48,8 @@ public class NativeRestorableIOWriter extends TsFileIOWriter {
 
   private int lastFlushedChunkGroupIndex = 0;
 
+  private boolean crashed;
+
   /**
    * all chunk group metadata which have been serialized on disk.
    */
@@ -58,40 +60,40 @@ public class NativeRestorableIOWriter extends TsFileIOWriter {
     return truncatedPosition;
   }
 
-  public NativeRestorableIOWriter(File file) throws IOException {
-    this(file, false);
-  }
-
   /**
    * @param file a given tsfile path you want to (continue to) write
-   * @param append if true, then the file can support appending data even though the file is complete (i.e., tail magic string exists)
    * @throws IOException if write failed, or the file is broken but autoRepair==false.
    */
-  public NativeRestorableIOWriter(File file, boolean append) throws IOException {
-    super();
+  public NativeRestorableIOWriter(File file) throws IOException {
     this.out = new DefaultTsFileOutput(file, true);
+
+    // file doesn't exist
     if (file.length() == 0) {
-      //this is a new file
+      startFile();
       return;
     }
+
     if (file.exists()) {
       try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
-        if (reader.isComplete() && !append) {
+
+        // this tsfile is complete
+        if (reader.isComplete()) {
+          crashed = false;
           canWrite = false;
           out.close();
           return;
         }
-        truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetaDataList, !append);
-        if (truncatedPosition == TsFileCheckStatus.COMPLETE_FILE && !append) {
-            this.canWrite = false;
-            out.close();
-        } else if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
+
+        // uncompleted file
+        truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetaDataList, true);
+        if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
           out.close();
-          throw new IOException(
-              String.format("%s is not in TsFile format.", file.getAbsolutePath()));
+          throw new IOException(String.format("%s is not in TsFile format.", file.getAbsolutePath()));
         } else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) {
+          crashed = true;
           out.truncate(TSFileConfig.MAGIC_STRING.length());
         } else {
+          crashed = true;
           //remove broken data
           out.truncate(truncatedPosition + 1);
         }
@@ -157,6 +159,9 @@ public class NativeRestorableIOWriter extends TsFileIOWriter {
     }
   }
 
+  public boolean hasCrashed() {
+    return crashed;
+  }
 
   /**
    * get all the chunkGroups' metadata which are appended after the last calling of this method, or
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 19995c7..4c491a7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -88,7 +88,6 @@ public class TsFileIOWriter {
    */
   public TsFileIOWriter(File file) throws IOException {
     this.out = new DefaultTsFileOutput(file);
-    startFile();
   }
 
   /**
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
index 64b6376..b75f84f 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
@@ -80,7 +80,7 @@ public class NativeRestorableIOWriterTest {
     rWriter = new NativeRestorableIOWriter(file);
     assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedPosition());
     assertFalse(rWriter.canWrite());
-    rWriter = new NativeRestorableIOWriter(file, true);
+    rWriter = new NativeRestorableIOWriter(file);
     assertEquals(TSFileConfig.MAGIC_STRING.length(), rWriter.getTruncatedPosition());
     writer = new TsFileWriter(rWriter);
     writer.close();
@@ -322,7 +322,7 @@ public class NativeRestorableIOWriterTest {
     assertFalse(rWriter.canWrite());
     rWriter.close();
 
-    rWriter = new NativeRestorableIOWriter(file, true);
+    rWriter = new NativeRestorableIOWriter(file);
     assertTrue(rWriter.canWrite());
     writer = new TsFileWriter(rWriter);
     writer.write(new TSRecord(3, "d1").addTuple(new FloatDataPoint("s1", 5))