You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/12/31 12:56:53 UTC

[incubator-iotdb] branch continue_file_after_recovery created (now 55ab786)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch continue_file_after_recovery
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 55ab786  continue writing the last unclosed file

This branch includes the following new commits:

     new 55ab786  continue writing the last unclosed file

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: continue writing the last unclosed file

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch continue_file_after_recovery
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 55ab7868ad5ff65b6d496f8217a8d029899c55bf
Author: jt2594838 <jt...@163.com>
AuthorDate: Tue Dec 31 20:55:55 2019 +0800

    continue writing the last unclosed file
---
 .../engine/storagegroup/StorageGroupProcessor.java | 39 ++++++++++----
 .../db/engine/storagegroup/TsFileProcessor.java    | 17 +++++-
 .../db/engine/storagegroup/TsFileResource.java     | 29 +++++++++++
 .../writelog/recover/TsFileRecoverPerformer.java   | 25 ++++++---
 .../db/writelog/recover/SeqTsFileRecoverTest.java  | 60 ++++++++++++++++++++--
 .../writelog/recover/UnseqTsFileRecoverTest.java   |  2 +-
 6 files changed, 151 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 741b902..0089895 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -85,6 +85,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -287,24 +288,42 @@ public class StorageGroupProcessor {
   }
 
   private void recoverSeqFiles(List<TsFileResource> tsFiles) throws StorageGroupProcessorException {
-    for (TsFileResource tsFileResource : tsFiles) {
+    for (int i = 0; i < tsFiles.size(); i++) {
+      TsFileResource tsFileResource = tsFiles.get(i);
       sequenceFileList.add(tsFileResource);
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-"
-          , schema, versionController, tsFileResource, false);
-      recoverPerformer.recover();
-      tsFileResource.setClosed(true);
+          , schema, versionController, tsFileResource, false, i == tsFiles.size() - 1);
+      RestorableTsFileIOWriter writer = recoverPerformer.recover();
+      if (i != tsFiles.size() - 1) {
+        // not the last file, just close it
+        tsFileResource.setClosed(true);
+      } else if (writer.canWrite()) {
+        // the last file is not closed, continue writing to in
+        workSequenceTsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
+            schema, versionController, this::closeUnsealedTsFileProcessor,
+            this::updateLatestFlushTimeCallback, true, writer);
+      }
     }
   }
 
   private void recoverUnseqFiles(List<TsFileResource> tsFiles)
       throws StorageGroupProcessorException {
-    for (TsFileResource tsFileResource : tsFiles) {
+    for (int i = 0; i < tsFiles.size(); i++) {
+      TsFileResource tsFileResource = tsFiles.get(i);
       unSequenceFileList.add(tsFileResource);
-      TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
-          schema,
-          versionController, tsFileResource, true);
-      recoverPerformer.recover();
-      tsFileResource.setClosed(true);
+      TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-"
+          , schema, versionController, tsFileResource, true, i == tsFiles.size() - 1);
+      RestorableTsFileIOWriter writer = recoverPerformer.recover();
+      if (i != tsFiles.size() - 1) {
+        // not the last file, just close it
+        tsFileResource.setClosed(true);
+      } else if (writer.canWrite()) {
+        // the last file is not closed, continue writing to in
+        workUnSequenceTsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
+            schema, versionController, this::closeUnsealedTsFileProcessor,
+            () -> true, false, writer);
+      }
+
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index eec8828..26698a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -48,7 +48,6 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseTsFile
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -133,6 +132,20 @@ public class TsFileProcessor {
     logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
   }
 
+  public TsFileProcessor(String storageGroupName, TsFileResource tsFileResource, Schema schema,
+      VersionController versionController, CloseTsFileCallBack closeUnsealedTsFileProcessor,
+      Supplier updateLatestFlushTimeCallback, boolean sequence, RestorableTsFileIOWriter writer) {
+    this.storageGroupName =storageGroupName;
+    this.tsFileResource = tsFileResource;
+    this.schema = schema;
+    this.versionController = versionController;
+    this.writer = writer;
+    this.closeTsFileCallback = closeUnsealedTsFileProcessor;
+    this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
+    this.sequence = sequence;
+    logger.info("reopen a tsfile processor {}", tsFileResource.getFile());
+  }
+
   /**
    * insert data in an InsertPlan into the workingMemtable.
    *
@@ -284,6 +297,7 @@ public class TsFileProcessor {
         return;
       }
       shouldClose = true;
+      tsFileResource.setCloseFlag();
       // when a flush thread serves this TsFileProcessor (because the processor is submitted by
       // registerTsFileProcessor()), the thread will seal the corresponding TsFile and
       // execute other cleanup works if (shouldClose == true and flushingMemTables is empty).
@@ -492,6 +506,7 @@ public class TsFileProcessor {
 
     tsFileResource.serialize();
     writer.endFile(schema);
+    tsFileResource.cleanCloseFlag();
 
     // remove this processor from Closing list in StorageGroupProcessor,
     // mark the TsFileResource closed, no need writer anymore
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 32817cf..656851e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -39,14 +39,19 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TsFileResource {
 
+  private static final Logger logger = LoggerFactory.getLogger(TsFileResource.class);
+
   // tsfile
   private File file;
 
   public static final String RESOURCE_SUFFIX = ".resource";
   static final String TEMP_SUFFIX = ".temp";
+  private static final String CLOSING_SUFFIX = ".closing";
 
   /**
    * device -> start time
@@ -327,4 +332,28 @@ public class TsFileResource {
     }
     return false;
   }
+
+  /**
+   * set a file flag indicating that the file is being closed, so during recovery we could know
+   * we should close the file.
+   */
+  public void setCloseFlag() {
+    try {
+      new File(file.getAbsoluteFile() + CLOSING_SUFFIX).createNewFile();
+    } catch (IOException e) {
+      logger.error("Cannot create close flag for {}", file, e);
+    }
+  }
+
+  /**
+   * clean the close flag when the file is successfully closed.
+   */
+  public void cleanCloseFlag() {
+    new File(file.getAbsoluteFile() + CLOSING_SUFFIX).delete();
+  }
+
+  public boolean isCloseFlagSet() {
+    return new File(file.getAbsoluteFile() + CLOSING_SUFFIX).exists();
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 148cf73..2faac3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -61,23 +61,27 @@ public class TsFileRecoverPerformer {
   private LogReplayer logReplayer;
   private TsFileResource tsFileResource;
   private boolean acceptUnseq;
+  private boolean isLastFile;
 
   public TsFileRecoverPerformer(String logNodePrefix,
       Schema schema, VersionController versionController,
-      TsFileResource currentTsFileResource, boolean acceptUnseq) {
+      TsFileResource currentTsFileResource, boolean acceptUnseq, boolean isLastFile) {
     this.insertFilePath = currentTsFileResource.getFile().getPath();
     this.logNodePrefix = logNodePrefix;
     this.schema = schema;
     this.versionController = versionController;
     this.tsFileResource = currentTsFileResource;
     this.acceptUnseq = acceptUnseq;
+    this.isLastFile = isLastFile;
   }
 
   /**
    * 1. recover the TsFile by RestorableTsFileIOWriter and truncate the file to remaining corrected
    * data 2. redo the WALs to recover unpersisted data 3. flush and close the file 4. clean WALs
+   * @return a RestorableTsFileIOWriter if the file is not closed before crush, so this writer
+   * can be used to continue writing
    */
-  public void recover() throws StorageGroupProcessorException {
+  public RestorableTsFileIOWriter recover() throws StorageGroupProcessorException {
 
     IMemTable recoverMemTable = new PrimitiveMemTable();
     this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
@@ -86,7 +90,7 @@ public class TsFileRecoverPerformer {
     File insertFile = FSFactoryProducer.getFSFactory().getFile(insertFilePath);
     if (!insertFile.exists()) {
       logger.error("TsFile {} is missing, will skip its recovery.", insertFilePath);
-      return;
+      return null;
     }
     // remove corrupted part of the TsFile
     RestorableTsFileIOWriter restorableTsFileIOWriter;
@@ -126,7 +130,7 @@ public class TsFileRecoverPerformer {
           // write .resource file
           tsFileResource.serialize();
         }
-        return;
+        return null;
       } catch (IOException e) {
         throw new StorageGroupProcessorException(
             "recover the resource file failed: " + insertFilePath
@@ -148,6 +152,8 @@ public class TsFileRecoverPerformer {
     } catch (IOException e) {
       throw new StorageGroupProcessorException(e);
     }
+
+    return restorableTsFileIOWriter;
   }
 
   private void recoverResourceFromFile() throws IOException {
@@ -210,8 +216,15 @@ public class TsFileRecoverPerformer {
             restorableTsFileIOWriter, tsFileResource.getFile().getParentFile().getName());
         tableFlushTask.syncFlushMemTable();
       }
-      // close file
-      restorableTsFileIOWriter.endFile(schema);
+
+      if (!isLastFile || isLastFile && tsFileResource.isCloseFlagSet()) {
+        // end the file if it is not the last file or it is closed before crush
+        restorableTsFileIOWriter.endFile(schema);
+        tsFileResource.cleanCloseFlag();
+      }
+      // otherwise this file is not closed before crush, do nothing so we can continue writing
+      // into it
+
       tsFileResource.serialize();
     } catch (IOException | InterruptedException | ExecutionException e) {
       throw new StorageGroupProcessorException(e);
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index cbfd1fa..70645b5 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.writelog.recover;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -50,6 +52,7 @@ import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -136,11 +139,62 @@ public class SeqTsFileRecoverTest {
   }
 
   @Test
-  public void test() throws StorageGroupProcessorException, IOException {
+  public void testNonLastRecovery() throws StorageGroupProcessorException, IOException {
     TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema,
-        versionController, resource, true);
+        versionController, resource, true, false);
     ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
-    performer.recover();
+    RestorableTsFileIOWriter writer = performer.recover();
+    assertFalse(writer.canWrite());
+
+    assertEquals(2, (long) resource.getStartTimeMap().get("device99"));
+    assertEquals(100, (long) resource.getEndTimeMap().get("device99"));
+    for (int i = 0; i < 10; i++) {
+      assertEquals(0, (long) resource.getStartTimeMap().get("device" + i));
+      assertEquals(19, (long) resource.getEndTimeMap().get("device" + i));
+    }
+
+    ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath()));
+    List<Path> pathList = new ArrayList<>();
+    for (int j = 0; j < 10; j++) {
+      for (int k = 0; k < 10; k++) {
+        pathList.add(new Path("device" + j, "sensor" + k));
+      }
+    }
+    QueryExpression queryExpression = QueryExpression.create(pathList, null);
+    QueryDataSet dataSet = readOnlyTsFile.query(queryExpression);
+    for (int i = 0; i < 20; i++) {
+      RowRecord record = dataSet.next();
+      assertEquals(i, record.getTimestamp());
+      List<Field> fields = record.getFields();
+      assertEquals(100, fields.size());
+      for (int j = 0; j < 100; j++) {
+        assertEquals(j % 10, fields.get(j).getLongV());
+      }
+    }
+
+    pathList = new ArrayList<>();
+    pathList.add(new Path("device99", "sensor1"));
+    pathList.add(new Path("device99", "sensor4"));
+    queryExpression = QueryExpression.create(pathList, null);
+    dataSet = readOnlyTsFile.query(queryExpression);
+    Assert.assertTrue(dataSet.hasNext());
+    RowRecord record = dataSet.next();
+    Assert.assertEquals("2\t0\tnull", record.toString());
+    Assert.assertTrue(dataSet.hasNext());
+    record = dataSet.next();
+    Assert.assertEquals("100\tnull\t0", record.toString());
+
+    readOnlyTsFile.close();
+  }
+
+  @Test
+  public void testLastRecovery() throws StorageGroupProcessorException, IOException {
+    TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema,
+        versionController, resource, true, true);
+    ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
+    RestorableTsFileIOWriter writer = performer.recover();
+    assertTrue(writer.canWrite());
+    writer.endFile(schema);
 
     assertEquals(2, (long) resource.getStartTimeMap().get("device99"));
     assertEquals(100, (long) resource.getEndTimeMap().get("device99"));
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 351228c..e68f4c4 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -146,7 +146,7 @@ public class UnseqTsFileRecoverTest {
   @Test
   public void test() throws StorageGroupProcessorException, IOException {
     TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema,
-        versionController, resource, true);
+        versionController, resource, true, false);
     ActiveTimeSeriesCounter.getInstance().init(logNodePrefix);
     performer.recover();