You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/07/15 11:58:15 UTC

[iotdb] branch rel/0.12 updated: [IoTDB-1501][To rel/0.12] Fix compaction recover delete tsfile bug (#3568)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new cac70a5  [IoTDB-1501][To rel/0.12] Fix compaction recover delete tsfile bug (#3568)
cac70a5 is described below

commit cac70a501e3b9ded50ea124b8b4499a845ca19a3
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Thu Jul 15 19:57:51 2021 +0800

    [IoTDB-1501][To rel/0.12] Fix compaction recover delete tsfile bug (#3568)
---
 .../compaction/CompactionMergeTaskPoolManager.java | 24 +++---
 .../level/LevelCompactionTsFileManagement.java     | 89 +++++++++++++---------
 .../compaction/LevelCompactionRecoverTest.java     | 14 +++-
 3 files changed, 75 insertions(+), 52 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 9b7949c..2a40452 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -34,11 +34,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Collections;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
@@ -56,7 +55,7 @@ public class CompactionMergeTaskPoolManager implements IService {
       new CompactionMergeTaskPoolManager();
   private ScheduledExecutorService scheduledPool;
   private ExecutorService pool;
-  private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
+  private Map<String, List<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
@@ -69,12 +68,13 @@ public class CompactionMergeTaskPoolManager implements IService {
   public void start() {
     if (pool == null) {
       this.pool =
-          IoTDBThreadPoolFactory.newScheduledThreadPool(
+          IoTDBThreadPoolFactory.newFixedThreadPool(
               IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
               ThreadName.COMPACTION_SERVICE.getName());
       this.scheduledPool =
           IoTDBThreadPoolFactory.newScheduledThreadPool(
-              Integer.MAX_VALUE, ThreadName.COMPACTION_SERVICE.getName());
+              IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
+              ThreadName.COMPACTION_SERVICE.getName());
     }
     logger.info("Compaction task manager started.");
   }
@@ -175,17 +175,15 @@ public class CompactionMergeTaskPoolManager implements IService {
    * corresponding storage group.
    */
   public void abortCompaction(String storageGroup) {
-    Set<Future<Void>> subTasks =
-        storageGroupTasks.getOrDefault(storageGroup, Collections.emptySet());
-    Iterator<Future<Void>> subIterator = subTasks.iterator();
-    while (subIterator.hasNext()) {
-      Future<Void> next = subIterator.next();
+    List<Future<Void>> subTasks =
+        storageGroupTasks.getOrDefault(storageGroup, Collections.emptyList());
+    for (Future<Void> next : subTasks) {
       if (!next.isDone() && !next.isCancelled()) {
         next.cancel(true);
         sgCompactionStatus.put(storageGroup, false);
       }
-      subIterator.remove();
     }
+    subTasks.clear();
   }
 
   public synchronized void clearCompactionStatus(String storageGroupName) {
@@ -213,7 +211,7 @@ public class CompactionMergeTaskPoolManager implements IService {
       sgCompactionStatus.put(storageGroup, true);
       Future<Void> future = pool.submit(storageGroupCompactionTask);
       storageGroupTasks
-          .computeIfAbsent(storageGroup, k -> new ConcurrentSkipListSet<>())
+          .computeIfAbsent(storageGroup, k -> new CopyOnWriteArrayList<>())
           .add(future);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index f740c6b..fb5b601 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -472,6 +472,13 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         } else {
           // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
           TsFileResource targetResource = getRecoverTsFileResource(targetFile, isSeq);
+          if (targetResource == null) {
+            // new file already merged but old file not deleted
+            targetResource = getTsFileResource(targetFile, isSeq);
+            if (targetResource == null) {
+              throw new IOException();
+            }
+          }
           long timePartition = targetResource.getTimePartition();
           List<TsFileResource> sourceTsFileResources = new ArrayList<>();
           for (String file : sourceFileList) {
@@ -484,6 +491,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           }
           int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
           RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
+          List<Modification> modifications = new ArrayList<>();
           // if not complete compaction, resume merge
           if (writer.hasCrashed()) {
             if (offset > 0) {
@@ -492,7 +500,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             writer.close();
             CompactionLogger compactionLogger =
                 new CompactionLogger(storageGroupDir, storageGroupName);
-            List<Modification> modifications = new ArrayList<>();
             CompactionUtils.merge(
                 targetResource,
                 sourceTsFileResources,
@@ -501,31 +508,39 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
                 deviceSet,
                 isSeq,
                 modifications);
-            // complete compaction and delete source file
-            writeLock();
-            try {
-              if (Thread.currentThread().isInterrupted()) {
-                throw new InterruptedException(
-                    String.format("%s [Compaction] abort", storageGroupName));
-              }
-              int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
-              if (isSeq) {
-                sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
-                sequenceRecoverTsFileResources.clear();
-              } else {
-                unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
-                unSequenceRecoverTsFileResources.clear();
-              }
-              deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
-            } finally {
-              writeUnlock();
-            }
-            deleteLevelFilesInDisk(sourceTsFileResources);
-            renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
             compactionLogger.close();
+            // complete compaction and add target tsfile
+            int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
+            if (isSeq) {
+              sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+              sequenceRecoverTsFileResources.clear();
+            } else {
+              unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+              unSequenceRecoverTsFileResources.clear();
+            }
           } else {
+            // complete compaction, just close writer
             writer.close();
           }
+          // complete compaction and delete source file
+          writeLock();
+          try {
+            if (Thread.currentThread().isInterrupted()) {
+              throw new InterruptedException(
+                  String.format("%s [Compaction] abort", storageGroupName));
+            }
+            deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
+          } finally {
+            writeUnlock();
+          }
+          for (TsFileResource tsFileResource : sourceTsFileResources) {
+            logger.error(
+                "{} recover storage group delete source file {}",
+                storageGroupName,
+                tsFileResource.getTsFile().getName());
+          }
+          deleteLevelFilesInDisk(sourceTsFileResources);
+          renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
         }
       }
     } catch (IOException | IllegalPathException | InterruptedException e) {
@@ -787,23 +802,25 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     return newUnSequenceTsFileResources;
   }
 
-  private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq)
-      throws IOException {
-    if (isSeq) {
-      for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
-        if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
-          return tsFileResource;
+  private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq) {
+    try {
+      if (isSeq) {
+        for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
+          if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
+            return tsFileResource;
+          }
         }
-      }
-    } else {
-      for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
-        if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
-          return tsFileResource;
+      } else {
+        for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
+          if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
+            return tsFileResource;
+          }
         }
       }
+    } catch (IOException e) {
+      logger.error("cannot get tsfile resource path: {}", filePath);
     }
-    logger.error("cannot get tsfile resource path: {}", filePath);
-    throw new IOException();
+    return null;
   }
 
   private TsFileResource getTsFileResource(String filePath, boolean isSeq) {
@@ -868,6 +885,8 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         if (targetFilePath != null) {
           File targetFile = new File(targetFilePath);
           if (targetFile.exists()) {
+            logger.error(
+                "{} restore delete target file {} ", storageGroupName, targetFile.getName());
             targetFile.delete();
           }
         }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index 7adffc2..ddd2005 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.engine.compaction;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
@@ -139,7 +140,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         true,
         new ArrayList<>());
     compactionLogger.close();
-    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
+    levelCompactionTsFileManagement.add(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -254,7 +255,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
     }
     logStream.close();
 
-    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
+    levelCompactionTsFileManagement.add(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -408,6 +409,9 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
   @Test
   public void testCompactionMergeRecoverMergeFinishedUnseq()
       throws IOException, IllegalPathException {
+    int prevUnseqLevelNum = IoTDBDescriptor.getInstance().getConfig().getUnseqLevelNum();
+    IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(2);
+
     LevelCompactionTsFileManagement levelCompactionTsFileManagement =
         new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
     levelCompactionTsFileManagement.addAll(seqResources, true);
@@ -466,7 +470,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         false,
         new ArrayList<>());
     compactionLogger.close();
-    levelCompactionTsFileManagement.addRecover(targetTsFileResource, false);
+    levelCompactionTsFileManagement.add(targetTsFileResource, false);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -493,6 +497,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
       }
     }
     assertEquals(500, count);
+
+    IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(prevUnseqLevelNum);
   }
 
   /** compaction recover merge start just log source file */
@@ -671,7 +677,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         new HashSet<>(),
         true,
         new ArrayList<>());
-    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
+    levelCompactionTsFileManagement.add(targetTsFileResource, true);
     compactionLogger.close();
     levelCompactionTsFileManagement.recover();
     QueryContext context = new QueryContext();