You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/07/27 10:13:32 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] Revert commit bf6d83cd (#3534) and commit cac70a50 (#3568) (#3634)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 30e3dad  [To rel/0.12] Revert commit bf6d83cd (#3534) and commit cac70a50 (#3568) (#3634)
30e3dad is described below

commit 30e3dad13738bed588a020ed41c293eeae96d618
Author: Steve Yurong Su (宇荣) <ro...@apache.org>
AuthorDate: Tue Jul 27 05:13:06 2021 -0500

    [To rel/0.12] Revert commit bf6d83cd (#3534) and commit cac70a50 (#3568) (#3634)
---
 .../compaction/CompactionMergeTaskPoolManager.java | 31 ++++----
 .../level/LevelCompactionTsFileManagement.java     | 89 +++++++++-------------
 .../compaction/LevelCompactionRecoverTest.java     | 14 +---
 3 files changed, 52 insertions(+), 82 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 2a40452..9f7ff5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -34,10 +34,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
@@ -53,9 +54,8 @@ public class CompactionMergeTaskPoolManager implements IService {
       LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
   private static final CompactionMergeTaskPoolManager INSTANCE =
       new CompactionMergeTaskPoolManager();
-  private ScheduledExecutorService scheduledPool;
-  private ExecutorService pool;
-  private Map<String, List<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
+  private ScheduledExecutorService pool;
+  private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
@@ -68,10 +68,6 @@ public class CompactionMergeTaskPoolManager implements IService {
   public void start() {
     if (pool == null) {
       this.pool =
-          IoTDBThreadPoolFactory.newFixedThreadPool(
-              IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
-              ThreadName.COMPACTION_SERVICE.getName());
-      this.scheduledPool =
           IoTDBThreadPoolFactory.newScheduledThreadPool(
               IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
               ThreadName.COMPACTION_SERVICE.getName());
@@ -82,7 +78,6 @@ public class CompactionMergeTaskPoolManager implements IService {
   @Override
   public void stop() {
     if (pool != null) {
-      scheduledPool.shutdownNow();
       pool.shutdownNow();
       logger.info("Waiting for task pool to shut down");
       waitTermination();
@@ -93,7 +88,6 @@ public class CompactionMergeTaskPoolManager implements IService {
   @Override
   public void waitAndStop(long milliseconds) {
     if (pool != null) {
-      awaitTermination(scheduledPool, milliseconds);
       awaitTermination(pool, milliseconds);
       logger.info("Waiting for task pool to shut down");
       waitTermination();
@@ -148,7 +142,6 @@ public class CompactionMergeTaskPoolManager implements IService {
         logger.warn("CompactionManager has wait for {} seconds to stop", time / 1000);
       }
     }
-    scheduledPool = null;
     pool = null;
     storageGroupTasks.clear();
     logger.info("CompactionManager stopped");
@@ -175,15 +168,17 @@ public class CompactionMergeTaskPoolManager implements IService {
    * corresponding storage group.
    */
   public void abortCompaction(String storageGroup) {
-    List<Future<Void>> subTasks =
-        storageGroupTasks.getOrDefault(storageGroup, Collections.emptyList());
-    for (Future<Void> next : subTasks) {
+    Set<Future<Void>> subTasks =
+        storageGroupTasks.getOrDefault(storageGroup, Collections.emptySet());
+    Iterator<Future<Void>> subIterator = subTasks.iterator();
+    while (subIterator.hasNext()) {
+      Future<Void> next = subIterator.next();
       if (!next.isDone() && !next.isCancelled()) {
         next.cancel(true);
         sgCompactionStatus.put(storageGroup, false);
       }
+      subIterator.remove();
     }
-    subTasks.clear();
   }
 
   public synchronized void clearCompactionStatus(String storageGroupName) {
@@ -195,7 +190,7 @@ public class CompactionMergeTaskPoolManager implements IService {
   }
 
   public void init(Runnable function) {
-    scheduledPool.scheduleWithFixedDelay(
+    pool.scheduleWithFixedDelay(
         function, 1000, config.getCompactionInterval(), TimeUnit.MILLISECONDS);
   }
 
@@ -211,7 +206,7 @@ public class CompactionMergeTaskPoolManager implements IService {
       sgCompactionStatus.put(storageGroup, true);
       Future<Void> future = pool.submit(storageGroupCompactionTask);
       storageGroupTasks
-          .computeIfAbsent(storageGroup, k -> new CopyOnWriteArrayList<>())
+          .computeIfAbsent(storageGroup, k -> new ConcurrentSkipListSet<>())
           .add(future);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index fb5b601..f740c6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -472,13 +472,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         } else {
           // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
           TsFileResource targetResource = getRecoverTsFileResource(targetFile, isSeq);
-          if (targetResource == null) {
-            // new file already merged but old file not deleted
-            targetResource = getTsFileResource(targetFile, isSeq);
-            if (targetResource == null) {
-              throw new IOException();
-            }
-          }
           long timePartition = targetResource.getTimePartition();
           List<TsFileResource> sourceTsFileResources = new ArrayList<>();
           for (String file : sourceFileList) {
@@ -491,7 +484,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           }
           int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
           RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
-          List<Modification> modifications = new ArrayList<>();
           // if not complete compaction, resume merge
           if (writer.hasCrashed()) {
             if (offset > 0) {
@@ -500,6 +492,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             writer.close();
             CompactionLogger compactionLogger =
                 new CompactionLogger(storageGroupDir, storageGroupName);
+            List<Modification> modifications = new ArrayList<>();
             CompactionUtils.merge(
                 targetResource,
                 sourceTsFileResources,
@@ -508,39 +501,31 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
                 deviceSet,
                 isSeq,
                 modifications);
-            compactionLogger.close();
-            // complete compaction and add target tsfile
-            int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
-            if (isSeq) {
-              sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
-              sequenceRecoverTsFileResources.clear();
-            } else {
-              unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
-              unSequenceRecoverTsFileResources.clear();
+            // complete compaction and delete source file
+            writeLock();
+            try {
+              if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException(
+                    String.format("%s [Compaction] abort", storageGroupName));
+              }
+              int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
+              if (isSeq) {
+                sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+                sequenceRecoverTsFileResources.clear();
+              } else {
+                unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+                unSequenceRecoverTsFileResources.clear();
+              }
+              deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
+            } finally {
+              writeUnlock();
             }
+            deleteLevelFilesInDisk(sourceTsFileResources);
+            renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
+            compactionLogger.close();
           } else {
-            // complete compaction, just close writer
             writer.close();
           }
-          // complete compaction and delete source file
-          writeLock();
-          try {
-            if (Thread.currentThread().isInterrupted()) {
-              throw new InterruptedException(
-                  String.format("%s [Compaction] abort", storageGroupName));
-            }
-            deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
-          } finally {
-            writeUnlock();
-          }
-          for (TsFileResource tsFileResource : sourceTsFileResources) {
-            logger.error(
-                "{} recover storage group delete source file {}",
-                storageGroupName,
-                tsFileResource.getTsFile().getName());
-          }
-          deleteLevelFilesInDisk(sourceTsFileResources);
-          renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
         }
       }
     } catch (IOException | IllegalPathException | InterruptedException e) {
@@ -802,25 +787,23 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     return newUnSequenceTsFileResources;
   }
 
-  private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq) {
-    try {
-      if (isSeq) {
-        for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
-          if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
-            return tsFileResource;
-          }
+  private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq)
+      throws IOException {
+    if (isSeq) {
+      for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
+        if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
+          return tsFileResource;
         }
-      } else {
-        for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
-          if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
-            return tsFileResource;
-          }
+      }
+    } else {
+      for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
+        if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
+          return tsFileResource;
         }
       }
-    } catch (IOException e) {
-      logger.error("cannot get tsfile resource path: {}", filePath);
     }
-    return null;
+    logger.error("cannot get tsfile resource path: {}", filePath);
+    throw new IOException();
   }
 
   private TsFileResource getTsFileResource(String filePath, boolean isSeq) {
@@ -885,8 +868,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         if (targetFilePath != null) {
           File targetFile = new File(targetFilePath);
           if (targetFile.exists()) {
-            logger.error(
-                "{} restore delete target file {} ", storageGroupName, targetFile.getName());
             targetFile.delete();
           }
         }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index ddd2005..7adffc2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.engine.compaction;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
@@ -140,7 +139,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         true,
         new ArrayList<>());
     compactionLogger.close();
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -255,7 +254,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
     }
     logStream.close();
 
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -409,9 +408,6 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
   @Test
   public void testCompactionMergeRecoverMergeFinishedUnseq()
       throws IOException, IllegalPathException {
-    int prevUnseqLevelNum = IoTDBDescriptor.getInstance().getConfig().getUnseqLevelNum();
-    IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(2);
-
     LevelCompactionTsFileManagement levelCompactionTsFileManagement =
         new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
     levelCompactionTsFileManagement.addAll(seqResources, true);
@@ -470,7 +466,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         false,
         new ArrayList<>());
     compactionLogger.close();
-    levelCompactionTsFileManagement.add(targetTsFileResource, false);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, false);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -497,8 +493,6 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
       }
     }
     assertEquals(500, count);
-
-    IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(prevUnseqLevelNum);
   }
 
   /** compaction recover merge start just log source file */
@@ -677,7 +671,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         new HashSet<>(),
         true,
         new ArrayList<>());
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     compactionLogger.close();
     levelCompactionTsFileManagement.recover();
     QueryContext context = new QueryContext();