You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/24 13:29:14 UTC

[incubator-iotdb] 01/02: refactor RecoveryPerformer

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 44561c9157861e7dfe607fc4d5c0a6c9d09675cb
Author: 江天 <jt...@163.com>
AuthorDate: Mon Jun 24 21:26:20 2019 +0800

    refactor RecoveryPerformer
---
 .../db/engine/filenode/FileNodeProcessor.java      |   6 +-
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  |  11 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |  13 ++-
 ...rPerformer.java => TsFileRecoverPerformer.java} |  12 ++-
 .../recover/UnSeqTsFileRecoverPerformer.java       | 117 ---------------------
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |   5 +-
 .../writelog/recover/UnseqTsFileRecoverTest.java   |   4 +-
 7 files changed, 31 insertions(+), 137 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 21a4321..e7df8dd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -88,7 +88,7 @@
 //import org.apache.iotdb.db.utils.MemUtils;
 //import org.apache.iotdb.db.utils.QueryUtils;
 //import org.apache.iotdb.db.utils.TimeValuePair;
-//import org.apache.iotdb.db.writelog.recover.SeqTsFileRecoverPerformer;
+//import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 //import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 //import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 //import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
@@ -472,8 +472,8 @@
 ////      try {
 ////        String filePath = tsFile.getFilePath();
 ////        String logNodePrefix = BufferWriteProcessor.logNodePrefix(processorName);
-////        SeqTsFileRecoverPerformer recoverPerformer =
-////            new SeqTsFileRecoverPerformer(logNodePrefix,
+////        TsFileRecoverPerformer recoverPerformer =
+////            new TsFileRecoverPerformer(logNodePrefix,
 ////                fileSchema, versionController, tsFile);
 ////        recoverPerformer.recover();
 ////      } catch (ProcessorException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 42aeb87..97f8966 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -48,8 +47,7 @@ import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.writelog.recover.SeqTsFileRecoverPerformer;
-import org.apache.iotdb.db.writelog.recover.UnSeqTsFileRecoverPerformer;
+import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -169,7 +167,8 @@ public class FileNodeProcessorV2 {
     for (File tsfile: tsfiles) {
       TsFileResourceV2 tsFileResource = new TsFileResourceV2(tsfile);
       sequenceFileList.add(tsFileResource);
-      SeqTsFileRecoverPerformer recoverPerformer = new SeqTsFileRecoverPerformer(storageGroupName + "-", fileSchema, versionController, tsFileResource);
+      TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-"
+          , fileSchema, versionController, tsFileResource, false);
       recoverPerformer.recover();
     }
   }
@@ -179,8 +178,8 @@ public class FileNodeProcessorV2 {
     for (File tsfile: tsfiles) {
       TsFileResourceV2 tsFileResource = new TsFileResourceV2(tsfile);
       unSequenceFileList.add(tsFileResource);
-      UnSeqTsFileRecoverPerformer recoverPerformer = new UnSeqTsFileRecoverPerformer(storageGroupName + "-", fileSchema,
-          versionController, tsFileResource);
+      TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-", fileSchema,
+          versionController, tsFileResource, true);
       recoverPerformer.recover();
     }
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 65cdc3f..ded5d40 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.writelog.recover;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -60,6 +62,9 @@ public class LogReplayer {
   // overflow file tolerates duplicated data
   private boolean acceptDuplication;
 
+  private Map<String, Long> tempStartTimeMap = new HashMap<>();
+  private Map<String, Long> tempEndTimeMap = new HashMap<>();
+
   public LogReplayer(String logNodePrefix, String insertFilePath,
       ModificationFile modFile,
       VersionController versionController,
@@ -103,6 +108,8 @@ public class LogReplayer {
     } catch (IOException e) {
       throw new ProcessorException("Cannot replay logs", e);
     }
+    tempStartTimeMap.forEach((k, v) -> currentTsFileResource.updateTime(k, v));
+    tempEndTimeMap.forEach((k, v) -> currentTsFileResource.updateTime(k, v));
   }
 
   private void replayDelete(DeletePlan deletePlan) throws IOException {
@@ -121,7 +128,11 @@ public class LogReplayer {
           !acceptDuplication) {
         return;
       }
-      currentTsFileResource.updateTime(insertPlan.getDeviceId(), insertPlan.getTime());
+      tempStartTimeMap.putIfAbsent(insertPlan.getDeviceId(), insertPlan.getTime());
+      Long endTime = tempEndTimeMap.get(insertPlan.getDeviceId());
+      if (endTime == null || endTime < insertPlan.getTime()) {
+        tempEndTimeMap.put(insertPlan.getDeviceId(), insertPlan.getTime());
+      }
     }
     String[] measurementList = insertPlan.getMeasurements();
     TSDataType[] dataTypes = new TSDataType[measurementList.length];
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
similarity index 92%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
rename to iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 6f1f342..84579c8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -37,10 +37,10 @@ import org.apache.iotdb.tsfile.write.schema.FileSchema;
 import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
 
 /**
- * SeqTsFileRecoverPerformer recovers a SeqTsFile to correct status, redoes the WALs since last
+ * TsFileRecoverPerformer recovers a SeqTsFile to correct status, redoes the WALs since last
  * crash and removes the redone logs.
  */
-public class SeqTsFileRecoverPerformer {
+public class TsFileRecoverPerformer {
 
   private String insertFilePath;
   private String logNodePrefix;
@@ -48,15 +48,17 @@ public class SeqTsFileRecoverPerformer {
   private VersionController versionController;
   private LogReplayer logReplayer;
   private TsFileResourceV2 tsFileResource;
+  private boolean acceptUnseq;
 
-  public SeqTsFileRecoverPerformer(String logNodePrefix,
+  public TsFileRecoverPerformer(String logNodePrefix,
       FileSchema fileSchema, VersionController versionController,
-      TsFileResourceV2 currentTsFileResource) {
+      TsFileResourceV2 currentTsFileResource, boolean acceptUnseq) {
     this.insertFilePath = currentTsFileResource.getFile().getPath();
     this.logNodePrefix = logNodePrefix;
     this.fileSchema = fileSchema;
     this.versionController = versionController;
     this.tsFileResource = currentTsFileResource;
+    this.acceptUnseq = acceptUnseq;
   }
 
   /**
@@ -70,7 +72,7 @@ public class SeqTsFileRecoverPerformer {
     IMemTable recoverMemTable = new PrimitiveMemTable();
     this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
         versionController,
-        tsFileResource, fileSchema, recoverMemTable, false);
+        tsFileResource, fileSchema, recoverMemTable, acceptUnseq);
     File insertFile = new File(insertFilePath);
     if (!insertFile.exists()) {
       return;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java
deleted file mode 100644
index 116fadf..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.writelog.recover;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
-import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
-
-/**
- * SeqTsFileRecoverPerformer recovers a SeqTsFile to correct status, redoes the WALs since last
- * crash and removes the redone logs.
- */
-public class UnSeqTsFileRecoverPerformer {
-
-  private String insertFilePath;
-  private String logNodePrefix;
-  private FileSchema fileSchema;
-  private VersionController versionController;
-  private LogReplayer logReplayer;
-  private TsFileResourceV2 tsFileResource;
-
-  public UnSeqTsFileRecoverPerformer(String logNodePrefix,
-      FileSchema fileSchema, VersionController versionController,
-      TsFileResourceV2 currentTsFileResource) {
-    this.insertFilePath = currentTsFileResource.getFile().getPath();
-    this.logNodePrefix = logNodePrefix;
-    this.fileSchema = fileSchema;
-    this.versionController = versionController;
-    this.tsFileResource = currentTsFileResource;
-  }
-
-  /**
-   * 1. redo the WALs to recover unpersisted data
-   * 2. flush and close the file
-   * 3. clean WALs
-   * @throws ProcessorException
-   */
-  public void recover() throws ProcessorException {
-    IMemTable recoverMemTable = new PrimitiveMemTable();
-    this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
-        versionController,
-        tsFileResource, fileSchema, recoverMemTable, true);
-    File insertFile = new File(insertFilePath);
-    if (!insertFile.exists()) {
-      return;
-    }
-    // remove corrupted part of the TsFile
-    NativeRestorableIOWriter restorableTsFileIOWriter = null;
-    try {
-      restorableTsFileIOWriter = new NativeRestorableIOWriter(insertFile);
-    } catch (IOException e) {
-      throw new ProcessorException(e);
-    }
-
-    for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter.getChunkGroupMetaDatas()) {
-      for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
-        tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
-        tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
-      }
-    }
-
-    // redo logs
-    logReplayer.replayLogs();
-    if (recoverMemTable.isEmpty()) {
-      return;
-    }
-
-    // flush logs
-    MemTableFlushTask tableFlushTask = new MemTableFlushTask(restorableTsFileIOWriter,
-        logNodePrefix, 0, (a,b) -> {});
-    tableFlushTask.flushMemTable(fileSchema, recoverMemTable, versionController.nextVersion());
-
-    // close file
-    try {
-      restorableTsFileIOWriter.endFile(fileSchema);
-    } catch (IOException e) {
-      throw new ProcessorException("Cannot setCloseMark file when recovering", e);
-    }
-
-    // clean logs
-    try {
-      MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + new File(insertFilePath).getName());
-    } catch (IOException e) {
-      throw new ProcessorException(e);
-    }
-  }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index 334d690..f622299 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -122,8 +121,8 @@ public class SeqTsFileRecoverTest {
 
   @Test
   public void test() throws ProcessorException, IOException {
-    SeqTsFileRecoverPerformer performer = new SeqTsFileRecoverPerformer(logNodePrefix, schema,
-        versionController, resource);
+    TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema,
+        versionController, resource, true);
     performer.recover();
 
     ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath()));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 9cb55f0..8f49300 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -127,8 +127,8 @@ public class UnseqTsFileRecoverTest {
 
   @Test
   public void test() throws ProcessorException, IOException {
-    UnSeqTsFileRecoverPerformer performer = new UnSeqTsFileRecoverPerformer(logNodePrefix, schema,
-        versionController, resource);
+    TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema,
+        versionController, resource, true);
     performer.recover();
 
     TsFileSequenceReader fileReader = new TsFileSequenceReader(tsF.getPath(), true);