You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/20 02:56:54 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: refactor recovery of SeqTsFiles

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 2cfbb3f  refactor recovery of SeqTsFiles
2cfbb3f is described below

commit 2cfbb3fc2e961aaf362aa835b698fe2dd9dd06ca
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jun 20 10:54:50 2019 +0800

    refactor recovery of SeqTsFiles
---
 .../db/engine/bufferwrite/BufferWriteProcessor.java | 16 +++++++---------
 .../iotdb/db/engine/filenode/FileNodeProcessor.java | 17 +++++++++--------
 .../db/writelog/recover/TsFileRecoverPerformer.java | 21 ++++++++++++++-------
 3 files changed, 30 insertions(+), 24 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index c9129ef..d16dc32 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -144,15 +144,6 @@ public class BufferWriteProcessor extends Processor {
     this.insertFilePath = Paths.get(baseDir, processorName, fileName).toString();
     bufferWriteRelativePath = processorName + File.separatorChar + fileName;
 
-    TsFileRecoverPerformer recoverPerformer =
-        new TsFileRecoverPerformer(insertFilePath, logNodePrefix(),
-            fileSchema, versionController, currentTsFileResource,
-            currentTsFileResource != null ? currentTsFileResource.getModFile() : null);
-    try {
-      recoverPerformer.recover();
-    } catch (ProcessorException e) {
-      throw new BufferWriteProcessorException(e);
-    }
     open();
     try {
       getLogNode();
@@ -482,6 +473,9 @@ public class BufferWriteProcessor extends Processor {
 
   @Override
   public synchronized void close() throws BufferWriteProcessorException {
+    if (writer == null) {
+      return;
+    }
     try {
       // flushMetadata data (if there are flushing task, flushMetadata() will be blocked) and wait for finishing flushMetadata async
       LOGGER.info("Submit a BufferWrite ({}) setCloseMark task.", getProcessorName());
@@ -590,6 +584,10 @@ public class BufferWriteProcessor extends Processor {
   }
 
   public String logNodePrefix() {
+    return logNodePrefix(processorName);
+  }
+
+  public static String logNodePrefix(String processorName) {
     return processorName + "-BufferWrite-";
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 082063a..3177431 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -88,6 +88,7 @@ import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
@@ -470,15 +471,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 
     for (int i = 0; i < newFileNodes.size(); i++) {
       TsFileResource tsFile = newFileNodes.get(i);
-      String baseDir = directories
-          .getTsFileFolder(tsFile.getBaseDirIndex());
       try {
-        // recover in initialization
-        new BufferWriteProcessor(baseDir,
-            getProcessorName(),
-            tsFile.getFile().getName(), parameters, bufferwriteCloseConsumer,
-            versionController, fileSchema, tsFile);
-      } catch (BufferWriteProcessorException e) {
+        String filePath = tsFile.getFilePath();
+        String logNodePrefix = BufferWriteProcessor.logNodePrefix(processorName);
+        TsFileRecoverPerformer recoverPerformer =
+            new TsFileRecoverPerformer(filePath, logNodePrefix,
+                fileSchema, versionController, tsFile,
+                tsFile.getModFile());
+        recoverPerformer.recover();
+      } catch (ProcessorException e) {
         LOGGER.error(
             "The filenode processor {} failed to recover the bufferwrite processor, "
                 + "the last bufferwrite file is {}.",
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 37b8c6f..8041aa4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
 import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
 
@@ -43,6 +45,7 @@ public class TsFileRecoverPerformer {
   private VersionController versionController;
   private LogReplayer logReplayer;
   private IMemTable recoverMemTable;
+  private TsFileResource tsFileResource;
 
   public TsFileRecoverPerformer(String insertFilePath, String processorName,
       FileSchema fileSchema, VersionController versionController,
@@ -56,12 +59,20 @@ public class TsFileRecoverPerformer {
         currentTsFileResource, fileSchema, recoverMemTable);
   }
 
-  public boolean recover() throws ProcessorException {
+  public void recover() throws ProcessorException {
     File insertFile = new File(insertFilePath);
     if (!insertFile.exists()) {
-      return false;
+      return;
     }
     NativeRestorableIOWriter restorableTsFileIOWriter = recoverFile(insertFile);
+    // due to failure, the last ChunkGroup may contain the same data with the WALs, so the time
+    // map must be updated first to avoid duplicated insertion
+    for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter.getChunkGroupMetaDatas()) {
+      for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
+        tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
+        tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+      }
+    }
 
     logReplayer.replayLogs();
 
@@ -77,15 +88,11 @@ public class TsFileRecoverPerformer {
 
     removeTruncatePosition(insertFile);
 
-    WriteLogNode logNode;
     try {
-      logNode = MultiFileLogNodeManager.getInstance().getNode(
-          processorName + new File(insertFilePath).getName());
-      logNode.delete();
+      MultiFileLogNodeManager.getInstance().deleteNode(processorName + new File(insertFilePath).getName());
     } catch (IOException e) {
       throw new ProcessorException(e);
     }
-    return true;
   }