You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/12 12:49:03 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] Fix compaction recover loses data (#3733)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 62377da  [To rel/0.12] Fix compaction recover loses data (#3733)
62377da is described below

commit 62377da7b3f00351e8796ddad6f29418004d1fa7
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Thu Aug 12 20:48:37 2021 +0800

    [To rel/0.12] Fix compaction recover loses data (#3733)
---
 .../level/LevelCompactionTsFileManagement.java     | 19 +++++-----
 .../engine/compaction/utils/CompactionUtils.java   |  7 ++--
 .../engine/storagegroup/StorageGroupProcessor.java | 15 +++++---
 .../writelog/recover/TsFileRecoverPerformer.java   |  2 +-
 .../compaction/LevelCompactionRecoverTest.java     | 15 +++++---
 .../write/writer/RestorableTsFileIOWriter.java     | 41 ++++++++++++++++++++++
 .../iotdb/tsfile/write/writer/TsFileOutput.java    |  4 ++-
 7 files changed, 81 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 52141cb..3a6363a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -452,13 +452,12 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             }
           }
           long timePartition = targetTsFileResource.getTimePartition();
-          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
+          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target, false);
           // if not complete compaction, resume merge
           if (writer.hasCrashed()) {
             if (offset > 0) {
-              writer.getIOWriterOut().truncate(offset - 1);
+              writer.getIOWriterOut().truncate(offset);
             }
-            writer.close();
             CompactionLogger compactionLogger =
                 new CompactionLogger(storageGroupDir, storageGroupName);
             List<Modification> modifications = new ArrayList<>();
@@ -469,7 +468,8 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
                 compactionLogger,
                 deviceSet,
                 isSeq,
-                modifications);
+                modifications,
+                writer);
             compactionLogger.close();
           } else {
             writer.close();
@@ -497,14 +497,13 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             sourceTsFileResources.add(sourceTsFileResource);
           }
           int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
-          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
+          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target, false);
           List<Modification> modifications = new ArrayList<>();
           // if not complete compaction, resume merge
           if (writer.hasCrashed()) {
             if (offset > 0) {
-              writer.getIOWriterOut().truncate(offset - 1);
+              writer.getIOWriterOut().truncate(offset);
             }
-            writer.close();
             CompactionLogger compactionLogger =
                 new CompactionLogger(storageGroupDir, storageGroupName);
             CompactionUtils.merge(
@@ -514,7 +513,8 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
                 compactionLogger,
                 deviceSet,
                 isSeq,
-                modifications);
+                modifications,
+                writer);
             compactionLogger.close();
             // complete compaction and add target tsfile
             int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
@@ -718,7 +718,8 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
                 compactionLogger,
                 new HashSet<>(),
                 sequence,
-                modifications);
+                modifications,
+                null);
             logger.info(
                 "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files",
                 storageGroupName,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 4fafa9b..7eb7e46 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -258,10 +258,13 @@ public class CompactionUtils {
       CompactionLogger compactionLogger,
       Set<String> devices,
       boolean sequence,
-      List<Modification> modifications)
+      List<Modification> modifications,
+      RestorableTsFileIOWriter writer)
       throws IOException, IllegalPathException {
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
-    RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
+    if (writer == null) {
+      writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
+    }
     try {
       Map<String, List<Modification>> modificationCache = new HashMap<>();
       RateLimiter compactionWriteRateLimiter =
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1537952..631b0f9 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -268,6 +268,8 @@ public class StorageGroupProcessor {
 
   private String insertWriteLockHolder = "";
 
+  private volatile boolean compacting = false;
+
   /** get the direct byte buffer from pool, each fetch contains two ByteBuffer */
   public ByteBuffer[] getWalDirectByteBuffer() {
     ByteBuffer[] res = new ByteBuffer[2];
@@ -2074,12 +2076,17 @@ public class StorageGroupProcessor {
   }
 
   public void merge() {
-    if (!tsFileManagement.recovered) {
-      // doing recovered task
+    if (!tsFileManagement.recovered || compacting) {
+      // recovering or doing compaction
+      // stop running new compaction
       return;
     }
-    if (config.getCompactionStrategy() == CompactionStrategy.LEVEL_COMPACTION) {
-      new CompactionAllPartitionTask(logicalStorageGroupName).call();
+    try {
+      if (config.getCompactionStrategy() == CompactionStrategy.LEVEL_COMPACTION) {
+        new CompactionAllPartitionTask(logicalStorageGroupName).call();
+      }
+    } finally {
+      compacting = false;
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index da53108..9c7fd7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -99,7 +99,7 @@ public class TsFileRecoverPerformer {
     // remove corrupted part of the TsFile
     RestorableTsFileIOWriter restorableTsFileIOWriter;
     try {
-      restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
+      restorableTsFileIOWriter = new RestorableTsFileIOWriter(file, false);
     } catch (NotCompatibleTsFileException e) {
       boolean result = file.delete();
       logger.warn("TsFile {} is incompatible. Delete it successfully {}", filePath, result);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index cc2cb39..35ddc01 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -138,7 +138,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         compactionLogger,
         new HashSet<>(),
         true,
-        new ArrayList<>());
+        new ArrayList<>(),
+        null);
     compactionLogger.close();
     levelCompactionTsFileManagement.add(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
@@ -229,7 +230,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         compactionLogger,
         new HashSet<>(),
         true,
-        new ArrayList<>());
+        new ArrayList<>(),
+        null);
     compactionLogger.close();
 
     BufferedReader logReader =
@@ -344,7 +346,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         compactionLogger,
         new HashSet<>(),
         true,
-        new ArrayList<>());
+        new ArrayList<>(),
+        null);
     compactionLogger.close();
 
     BufferedReader logReader =
@@ -468,7 +471,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         compactionLogger,
         new HashSet<>(),
         false,
-        new ArrayList<>());
+        new ArrayList<>(),
+        null);
     compactionLogger.close();
     levelCompactionTsFileManagement.add(targetTsFileResource, false);
     levelCompactionTsFileManagement.recover();
@@ -675,7 +679,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         compactionLogger,
         new HashSet<>(),
         true,
-        new ArrayList<>());
+        new ArrayList<>(),
+        null);
     levelCompactionTsFileManagement.add(targetTsFileResource, true);
     compactionLogger.close();
     levelCompactionTsFileManagement.recover();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 26f8494..9f14127 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -112,6 +112,47 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
     }
   }
 
+  public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{} is opened.", file.getName());
+    }
+    this.file = file;
+    this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
+
+    // file doesn't exist
+    if (file.length() == 0) {
+      startFile();
+      crashed = true;
+      canWrite = true;
+      return;
+    }
+
+    if (file.exists()) {
+      try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
+
+        truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true);
+        minPlanIndex = reader.getMinPlanIndex();
+        maxPlanIndex = reader.getMaxPlanIndex();
+        if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
+          crashed = false;
+          canWrite = false;
+          out.close();
+        } else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
+          out.close();
+          throw new NotCompatibleTsFileException(
+              String.format("%s is not in TsFile format.", file.getAbsolutePath()));
+        } else {
+          crashed = true;
+          canWrite = true;
+          // remove broken data
+          if (truncate) {
+            out.truncate(truncatedSize);
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Given a TsFile, generate a writable RestorableTsFileIOWriter. That is, for a complete TsFile,
    * the function erases all FileMetadata and supports writing new data; For a incomplete TsFile,
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
index 5e0b433..3173206 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
@@ -83,7 +83,9 @@ public interface TsFileOutput {
   void flush() throws IOException;
 
   /**
-   * The same with {@link java.nio.channels.FileChannel#truncate(long)}.
+   * The same with {@link java.nio.channels.FileChannel#truncate(long)}. truncate will truncate from
+   * the position to the end of file (including the position) for example, a file has 80 bytes, if
+   * use truncate(60), it will truncate from 60 to 80. The file remain has data of range [0, 59].
    *
    * @param size size The new size, a non-negative byte count
    */