You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/11/11 01:56:19 UTC

[iotdb] branch master updated: Fix compaction delete resource file bug (#1997)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bbbdbd  Fix compaction  delete resource file bug (#1997)
5bbbdbd is described below

commit 5bbbdbd3539385dfa98c135db503d5eb7d5427a6
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Wed Nov 11 09:56:06 2020 +0800

    Fix compaction  delete resource file bug (#1997)
---
 .../compaction/CompactionMergeTaskPoolManager.java | 12 ++++-
 .../db/engine/compaction/TsFileManagement.java     |  7 +--
 .../level/LevelCompactionTsFileManagement.java     | 54 ++++++++++++++--------
 .../iotdb/db/engine/merge/manage/MergeManager.java | 25 ++++++++--
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  5 --
 5 files changed, 70 insertions(+), 33 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index f030300..830ea56 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -78,9 +78,17 @@ public class CompactionMergeTaskPoolManager implements IService {
   private void waitTermination() {
     long startTime = System.currentTimeMillis();
     while (!pool.isTerminated()) {
-      // wait
+      int timeMillis = 0;
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException e) {
+        logger.error("CompactionMergeTaskPoolManager {} shutdown",
+            ThreadName.COMPACTION_SERVICE.getName(), e);
+        Thread.currentThread().interrupt();
+      }
+      timeMillis += 200;
       long time = System.currentTimeMillis() - startTime;
-      if (time % 60_000 == 0) {
+      if (timeMillis % 60_000 == 0) {
         logger.warn("CompactionManager has wait for {} seconds to stop", time / 1000);
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 245769a..bdb304a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -190,16 +190,17 @@ public abstract class TsFileManagement {
       }
       return;
     }
-    logger.info("{} will close all files for starting a merge (fullmerge = {})", storageGroupName,
-        fullMerge);
+    isUnseqMerging = true;
 
     if (seqMergeList.isEmpty()) {
       logger.info("{} no seq files to be merged", storageGroupName);
+      isUnseqMerging = false;
       return;
     }
 
     if (unSeqMergeList.isEmpty()) {
       logger.info("{} no unseq files to be merged", storageGroupName);
+      isUnseqMerging = false;
       return;
     }
 
@@ -213,6 +214,7 @@ public abstract class TsFileManagement {
       if (mergeFiles.length == 0) {
         logger.info("{} cannot select merge candidates under the budget {}", storageGroupName,
             budget);
+        isUnseqMerging = false;
         return;
       }
       // avoid pending tasks holds the metadata and streams
@@ -229,7 +231,6 @@ public abstract class TsFileManagement {
         tsFileResource.setMerging(true);
       }
 
-      isUnseqMerging = true;
       mergeStartTime = System.currentTimeMillis();
       MergeTask mergeTask = new MergeTask(mergeResource, storageGroupDir,
           this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index f046f6f..3d36f06 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -87,12 +87,22 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     logger.debug("{} [compaction] merge starts to delete file", storageGroupName);
     for (TsFileResource mergeTsFile : mergeTsFiles) {
       deleteLevelFile(mergeTsFile);
+      logger
+          .info("{} [Compaction] delete TsFile {}", storageGroupName, mergeTsFile.getTsFilePath());
     }
     for (int i = 0; i < seqLevelNum; i++) {
-      sequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
+      if (sequenceTsFileResources.containsKey(timePartitionId)) {
+        if (sequenceTsFileResources.get(timePartitionId).size() > i) {
+          sequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
+        }
+      }
     }
     for (int i = 0; i < unseqLevelNum; i++) {
-      unSequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
+      if (unSequenceTsFileResources.containsKey(timePartitionId)) {
+        if (unSequenceTsFileResources.get(timePartitionId).size() > i) {
+          unSequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
+        }
+      }
     }
   }
 
@@ -341,6 +351,10 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           TsFileResource targetResource = new TsFileResource(targetFile);
           long timePartition = targetResource.getTimePartition();
           RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+          List<TsFileResource> sourceTsFileResources = new ArrayList<>();
+          for (File file : sourceFileList) {
+            sourceTsFileResources.add(new TsFileResource(file));
+          }
           if (sourceFileList.isEmpty()) {
             return;
           }
@@ -353,22 +367,15 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
               writer.close();
               if (isSeq) {
                 CompactionUtils
-                    .merge(targetResource,
-                        new ArrayList<>(sequenceTsFileResources.get(timePartition).get(level)),
-                        storageGroupName,
-                        new CompactionLogger(storageGroupDir, storageGroupName), deviceSet,
-                        true);
-                deleteLevelFiles(timePartition,
-                    sequenceTsFileResources.get(timePartition).get(level));
+                    .merge(targetResource, sourceTsFileResources, storageGroupName,
+                        new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, true);
+                deleteLevelFiles(timePartition, sourceTsFileResources);
                 sequenceTsFileResources.get(timePartition).get(level + 1).add(targetResource);
               } else {
                 CompactionUtils
-                    .merge(targetResource, unSequenceTsFileResources.get(timePartition).get(level),
-                        storageGroupName,
-                        new CompactionLogger(storageGroupDir, storageGroupName), deviceSet,
-                        false);
-                deleteLevelFiles(timePartition,
-                    unSequenceTsFileResources.get(timePartition).get(level));
+                    .merge(targetResource, sourceTsFileResources, storageGroupName,
+                        new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, false);
+                deleteLevelFiles(timePartition, sourceTsFileResources);
                 unSequenceTsFileResources.get(timePartition).get(level + 1).add(targetResource);
               }
             }
@@ -445,6 +452,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       } catch (InterruptedException e) {
         logger.error("{} [Compaction] shutdown", storageGroupName, e);
         Thread.currentThread().interrupt();
+        return;
       }
     }
     long startTimeMillis = System.currentTimeMillis();
@@ -466,16 +474,24 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
                 i + 1);
             compactionLogger.logSequence(sequence);
             compactionLogger.logFile(TARGET_NAME, newLevelFile);
-            logger.info("{} [Compaction] merge level-{}'s {} tsfiles to next level",
-                storageGroupName, i, mergeResources.get(i).size());
+            List<TsFileResource> toMergeTsFiles = mergeResources.get(i);
+            logger.info("{} [Compaction] merge level-{}'s {} TsFiles to next level",
+                storageGroupName, i, toMergeTsFiles.size());
+            for (TsFileResource toMergeTsFile : toMergeTsFiles) {
+              logger.info("{} [Compaction] start to merge TsFile {}", storageGroupName,
+                  toMergeTsFile);
+            }
 
             TsFileResource newResource = new TsFileResource(newLevelFile);
             CompactionUtils
-                .merge(newResource, mergeResources.get(i), storageGroupName, compactionLogger,
+                .merge(newResource, toMergeTsFiles, storageGroupName, compactionLogger,
                     new HashSet<>(), sequence);
+            logger.info(
+                "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files",
+                storageGroupName, i, toMergeTsFiles.size());
             writeLock();
             try {
-              deleteLevelFiles(timePartition, mergeResources.get(i));
+              deleteLevelFiles(timePartition, toMergeTsFiles);
               compactionLogger.logMergeFinish();
               if (sequence) {
                 sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index 158bef6..ac0e38d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -168,9 +169,17 @@ public class MergeManager implements IService, MergeManagerMBean {
       logger.info("Waiting for task pool to shut down");
       long startTime = System.currentTimeMillis();
       while (!mergeTaskPool.isTerminated() || !mergeChunkSubTaskPool.isTerminated()) {
-        // wait
+        int timeMillis = 0;
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          logger.error("CompactionMergeTaskPoolManager {} shutdown",
+              ThreadName.COMPACTION_SERVICE.getName(), e);
+          Thread.currentThread().interrupt();
+        }
+        timeMillis += 200;
         long time = System.currentTimeMillis() - startTime;
-        if (time % 60_000 == 0) {
+        if (timeMillis % 60_000 == 0) {
           logger.warn("MergeManager has wait for {} seconds to stop", time / 1000);
         }
       }
@@ -197,9 +206,17 @@ public class MergeManager implements IService, MergeManagerMBean {
       logger.info("Waiting for task pool to shut down");
       long startTime = System.currentTimeMillis();
       while (!mergeTaskPool.isTerminated() || !mergeChunkSubTaskPool.isTerminated()) {
-        // wait
+        int timeMillis = 0;
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          logger.error("CompactionMergeTaskPoolManager {} shutdown",
+              ThreadName.COMPACTION_SERVICE.getName(), e);
+          Thread.currentThread().interrupt();
+        }
+        timeMillis += 200;
         long time = System.currentTimeMillis() - startTime;
-        if (time % 60_000 == 0) {
+        if (timeMillis % 60_000 == 0) {
           logger.warn("MergeManager has wait for {} seconds to stop", time / 1000);
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index eddc290..693e58f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -188,11 +188,6 @@ class MergeFileTask {
       seqFile.serialize();
       mergeLogger.logFileMergeEnd();
       logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
-
-      File newMergedFile = newFileWriter.getFile();
-      newMergedFile.delete();
-      fsFactory.moveFile(seqFile.getTsFile(), newMergedFile);
-      seqFile.setFile(newMergedFile);
     } catch (Exception e) {
       restoreOldFile(seqFile);
       throw e;