You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/11/03 07:14:33 UTC

[iotdb] branch master updated: [IOTDB-1604] Fix data increase when shut down iotdb in 0.12 and restart iotdb-0.13 (#4240)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0eeb0ce  [IOTDB-1604] Fix data increase when shut down iotdb in 0.12 and restart iotdb-0.13 (#4240)
0eeb0ce is described below

commit 0eeb0ce4aed2f203825689e0ce5abd1e1122c716
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Wed Nov 3 15:14:02 2021 +0800

    [IOTDB-1604] Fix data increase when shut down iotdb in 0.12 and restart iotdb-0.13 (#4240)
---
 .../SizeTieredCompactionRecoverTask.java           | 58 +++++++++++++++++++---
 .../inner/utils/InnerSpaceCompactionUtils.java     |  2 +-
 .../compaction/task/AbstractCompactionTask.java    |  7 ++-
 .../engine/storagegroup/StorageGroupProcessor.java | 30 ++++++++++-
 4 files changed, 87 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
index 42e357f..bb85289 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
@@ -74,7 +74,10 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
   public void doCompaction() {
     // read log -> Set<Device> -> doCompaction -> clear
     try {
+      LOGGER.info(
+          "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, compactionLogFile);
       if (compactionLogFile.exists()) {
+        LOGGER.info("{} [Compaction][Recover] log exists, start recover", fullStorageGroupName);
         SizeTieredCompactionLogAnalyzer logAnalyzer =
             new SizeTieredCompactionLogAnalyzer(compactionLogFile);
         logAnalyzer.analyze();
@@ -85,27 +88,64 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
         }
         File targetFile = new File(targetFileName);
         File resourceFile = new File(targetFileName + ".resource");
+        LOGGER.info("{} [Compaction][Recover] target file is {}", fullStorageGroupName, targetFile);
         if (!targetFile.exists()) {
           if (resourceFile.exists()) {
             if (!resourceFile.delete()) {
-              LOGGER.warn("Fail to delete tsfile resource {}", resourceFile);
+              LOGGER.warn(
+                  "{} [Compaction][Recover] Fail to delete tsfile resource {}",
+                  fullStorageGroupName,
+                  resourceFile);
+            } else {
+              LOGGER.info(
+                  "{} [Compaction][Recover] Deleted target file resource {}",
+                  fullStorageGroupName,
+                  resourceFile);
             }
           }
+          LOGGER.info(
+              "{} [Compaction][Recover] Target file {} not exists, return",
+              fullStorageGroupName,
+              targetFile);
           return;
         }
 
         RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile, false);
         if (writer.hasCrashed()) {
+          LOGGER.info(
+              "{} [Compaction][Recover] target file {} crash, start to delete it",
+              fullStorageGroupName,
+              targetFile);
           // the target tsfile is crashed, it is not completed
           writer.close();
           if (!targetFile.delete()) {
-            LOGGER.warn("Fail to delete uncompleted file {}", targetFile);
+            LOGGER.warn(
+                "{} [Compaction][Recover] Fail to delete uncompleted file {}",
+                fullStorageGroupName,
+                targetFile);
+          } else {
+            LOGGER.info(
+                "{} [Compaction][Recover] remove target file {}", fullStorageGroupName, targetFile);
           }
-          if (!resourceFile.delete()) {
-            LOGGER.warn("Fail to delete tsfile resource {}", resourceFile);
+          if (resourceFile.exists()) {
+            if (!resourceFile.delete()) {
+              LOGGER.warn(
+                  "{} [Compaction][Recover] Fail to delete tsfile resource {}",
+                  fullStorageGroupName,
+                  resourceFile);
+            } else {
+              LOGGER.info(
+                  "{} [Compaction][Recover] delete target file resource {}",
+                  fullStorageGroupName,
+                  resourceFile);
+            }
           }
         } else {
           // the target tsfile is completed
+          LOGGER.info(
+              "{} [Compaction][Recover] target file {} is completed, remove source files",
+              fullStorageGroupName,
+              targetFile);
           TsFileResource targetResource = new TsFileResource(targetFile);
           List<TsFileResource> sourceTsFileResources = new ArrayList<>();
           for (String sourceFileName : sourceFileList) {
@@ -123,9 +163,15 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
     } finally {
       if (compactionLogFile.exists()) {
         if (!compactionLogFile.delete()) {
-          LOGGER.warn("fail to delete {}", compactionLogFile);
+          LOGGER.warn(
+              "{} [Compaction][Recover] fail to delete {}",
+              fullStorageGroupName,
+              compactionLogFile);
         } else {
-          LOGGER.info("delete compaction log {}", compactionLogFile);
+          LOGGER.info(
+              "{} [Compaction][Recover] delete compaction log {}",
+              fullStorageGroupName,
+              compactionLogFile);
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index 8d17795..e2dd85e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -551,7 +551,7 @@ public class InnerSpaceCompactionUtils {
 
   public static void deleteTsFilesInDisk(
       Collection<TsFileResource> mergeTsFiles, String storageGroupName) {
-    logger.debug("{} [compaction] merge starts to delete real file", storageGroupName);
+    logger.info("{} [compaction] merge starts to delete real file ", storageGroupName);
     for (TsFileResource mergeTsFile : mergeTsFiles) {
       deleteTsFile(mergeTsFile);
       logger.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index 1555971..70eccf9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.engine.compaction.task;
 
 import org.apache.iotdb.db.engine.compaction.CompactionScheduler;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.InplaceCompactionRecoverTask;
+import org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionRecoverTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +58,10 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
     } catch (Exception e) {
       LOGGER.error(e.getMessage(), e);
     } finally {
-      CompactionScheduler.decPartitionCompaction(fullStorageGroupName, timePartition);
+      if (!(this instanceof InplaceCompactionRecoverTask)
+          && !(this instanceof SizeTieredCompactionRecoverTask)) {
+        CompactionScheduler.decPartitionCompaction(fullStorageGroupName, timePartition);
+      }
       this.currentTaskNum.decrementAndGet();
     }
     return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a48d308..f35a7d2 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -118,6 +118,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
 import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
 import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
 
@@ -533,6 +534,7 @@ public class StorageGroupProcessor {
   }
 
   private void recoverInnerSpaceCompaction(boolean isSequence) throws Exception {
+    // search compaction log for SizeTieredCompaction
     List<String> dirs;
     if (isSequence) {
       dirs = DirectoryManager.getInstance().getAllSequenceFileFolders();
@@ -575,6 +577,26 @@ public class StorageGroupProcessor {
         }
       }
     }
+
+    // search compaction log for old LevelCompaction
+    File logFile =
+        FSFactoryProducer.getFSFactory()
+            .getFile(
+                storageGroupSysDir.getAbsolutePath(),
+                logicalStorageGroupName + COMPACTION_LOG_NAME);
+    if (logFile.exists()) {
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .getInnerCompactionStrategy()
+          .getCompactionRecoverTask(
+              tsFileManager.getStorageGroupName(),
+              tsFileManager.getVirtualStorageGroup(),
+              -1,
+              logFile,
+              logFile.getParent(),
+              isSequence)
+          .call();
+    }
   }
 
   private void submitTimedCompactionTask() {
@@ -744,7 +766,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) {
+  private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) throws IOException {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
       long timePartitionId = tsFileResource.getTimePartition();
@@ -759,7 +781,7 @@ public class StorageGroupProcessor {
               isSeq,
               i == tsFiles.size() - 1);
 
-      RestorableTsFileIOWriter writer;
+      RestorableTsFileIOWriter writer = null;
       try {
         // this tsfile is not zero level, no need to perform redo wal
         if (TsFileResource.getInnerCompactionCount(tsFileResource.getTsFile().getName()) > 0) {
@@ -841,6 +863,10 @@ public class StorageGroupProcessor {
         logger.warn(
             "Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(), e);
         continue;
+      } finally {
+        if (writer != null) {
+          writer.close();
+        }
       }
     }
   }