You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/07/27 09:00:42 UTC

[iotdb] branch 0.12-revert created (now 696a202)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch 0.12-revert
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 696a202  Revert "[To rel/0.12] fix compaction block flush bug (#3534)"

This branch includes the following new commits:

     new b9873c4  Revert "[IoTDB-1501][To rel/0.12] Fix compaction recover delete tsfile bug (#3568)"
     new 696a202  Revert "[To rel/0.12] fix compaction block flush bug (#3534)"

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/02: Revert "[IoTDB-1501][To rel/0.12] Fix compaction recover delete tsfile bug (#3568)"

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch 0.12-revert
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b9873c466c1c832cd557d7100593cf9906771165
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Jul 27 16:20:42 2021 +0800

    Revert "[IoTDB-1501][To rel/0.12] Fix compaction recover delete tsfile bug (#3568)"
    
    This reverts commit cac70a50
---
 .../compaction/CompactionMergeTaskPoolManager.java | 24 +++---
 .../level/LevelCompactionTsFileManagement.java     | 89 +++++++++-------------
 .../compaction/LevelCompactionRecoverTest.java     | 14 +---
 3 files changed, 52 insertions(+), 75 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 2a40452..9b7949c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -34,10 +34,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
@@ -55,7 +56,7 @@ public class CompactionMergeTaskPoolManager implements IService {
       new CompactionMergeTaskPoolManager();
   private ScheduledExecutorService scheduledPool;
   private ExecutorService pool;
-  private Map<String, List<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
+  private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
@@ -68,13 +69,12 @@ public class CompactionMergeTaskPoolManager implements IService {
   public void start() {
     if (pool == null) {
       this.pool =
-          IoTDBThreadPoolFactory.newFixedThreadPool(
+          IoTDBThreadPoolFactory.newScheduledThreadPool(
               IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
               ThreadName.COMPACTION_SERVICE.getName());
       this.scheduledPool =
           IoTDBThreadPoolFactory.newScheduledThreadPool(
-              IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
-              ThreadName.COMPACTION_SERVICE.getName());
+              Integer.MAX_VALUE, ThreadName.COMPACTION_SERVICE.getName());
     }
     logger.info("Compaction task manager started.");
   }
@@ -175,15 +175,17 @@ public class CompactionMergeTaskPoolManager implements IService {
    * corresponding storage group.
    */
   public void abortCompaction(String storageGroup) {
-    List<Future<Void>> subTasks =
-        storageGroupTasks.getOrDefault(storageGroup, Collections.emptyList());
-    for (Future<Void> next : subTasks) {
+    Set<Future<Void>> subTasks =
+        storageGroupTasks.getOrDefault(storageGroup, Collections.emptySet());
+    Iterator<Future<Void>> subIterator = subTasks.iterator();
+    while (subIterator.hasNext()) {
+      Future<Void> next = subIterator.next();
       if (!next.isDone() && !next.isCancelled()) {
         next.cancel(true);
         sgCompactionStatus.put(storageGroup, false);
       }
+      subIterator.remove();
     }
-    subTasks.clear();
   }
 
   public synchronized void clearCompactionStatus(String storageGroupName) {
@@ -211,7 +213,7 @@ public class CompactionMergeTaskPoolManager implements IService {
       sgCompactionStatus.put(storageGroup, true);
       Future<Void> future = pool.submit(storageGroupCompactionTask);
       storageGroupTasks
-          .computeIfAbsent(storageGroup, k -> new CopyOnWriteArrayList<>())
+          .computeIfAbsent(storageGroup, k -> new ConcurrentSkipListSet<>())
           .add(future);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index fb5b601..f740c6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -472,13 +472,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         } else {
           // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
           TsFileResource targetResource = getRecoverTsFileResource(targetFile, isSeq);
-          if (targetResource == null) {
-            // new file already merged but old file not deleted
-            targetResource = getTsFileResource(targetFile, isSeq);
-            if (targetResource == null) {
-              throw new IOException();
-            }
-          }
           long timePartition = targetResource.getTimePartition();
           List<TsFileResource> sourceTsFileResources = new ArrayList<>();
           for (String file : sourceFileList) {
@@ -491,7 +484,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           }
           int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
           RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
-          List<Modification> modifications = new ArrayList<>();
           // if not complete compaction, resume merge
           if (writer.hasCrashed()) {
             if (offset > 0) {
@@ -500,6 +492,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             writer.close();
             CompactionLogger compactionLogger =
                 new CompactionLogger(storageGroupDir, storageGroupName);
+            List<Modification> modifications = new ArrayList<>();
             CompactionUtils.merge(
                 targetResource,
                 sourceTsFileResources,
@@ -508,39 +501,31 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
                 deviceSet,
                 isSeq,
                 modifications);
-            compactionLogger.close();
-            // complete compaction and add target tsfile
-            int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
-            if (isSeq) {
-              sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
-              sequenceRecoverTsFileResources.clear();
-            } else {
-              unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
-              unSequenceRecoverTsFileResources.clear();
+            // complete compaction and delete source file
+            writeLock();
+            try {
+              if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException(
+                    String.format("%s [Compaction] abort", storageGroupName));
+              }
+              int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
+              if (isSeq) {
+                sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+                sequenceRecoverTsFileResources.clear();
+              } else {
+                unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+                unSequenceRecoverTsFileResources.clear();
+              }
+              deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
+            } finally {
+              writeUnlock();
             }
+            deleteLevelFilesInDisk(sourceTsFileResources);
+            renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
+            compactionLogger.close();
           } else {
-            // complete compaction, just close writer
             writer.close();
           }
-          // complete compaction and delete source file
-          writeLock();
-          try {
-            if (Thread.currentThread().isInterrupted()) {
-              throw new InterruptedException(
-                  String.format("%s [Compaction] abort", storageGroupName));
-            }
-            deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
-          } finally {
-            writeUnlock();
-          }
-          for (TsFileResource tsFileResource : sourceTsFileResources) {
-            logger.error(
-                "{} recover storage group delete source file {}",
-                storageGroupName,
-                tsFileResource.getTsFile().getName());
-          }
-          deleteLevelFilesInDisk(sourceTsFileResources);
-          renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
         }
       }
     } catch (IOException | IllegalPathException | InterruptedException e) {
@@ -802,25 +787,23 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     return newUnSequenceTsFileResources;
   }
 
-  private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq) {
-    try {
-      if (isSeq) {
-        for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
-          if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
-            return tsFileResource;
-          }
+  private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq)
+      throws IOException {
+    if (isSeq) {
+      for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
+        if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
+          return tsFileResource;
         }
-      } else {
-        for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
-          if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
-            return tsFileResource;
-          }
+      }
+    } else {
+      for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
+        if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
+          return tsFileResource;
         }
       }
-    } catch (IOException e) {
-      logger.error("cannot get tsfile resource path: {}", filePath);
     }
-    return null;
+    logger.error("cannot get tsfile resource path: {}", filePath);
+    throw new IOException();
   }
 
   private TsFileResource getTsFileResource(String filePath, boolean isSeq) {
@@ -885,8 +868,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         if (targetFilePath != null) {
           File targetFile = new File(targetFilePath);
           if (targetFile.exists()) {
-            logger.error(
-                "{} restore delete target file {} ", storageGroupName, targetFile.getName());
             targetFile.delete();
           }
         }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index ddd2005..7adffc2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.engine.compaction;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
@@ -140,7 +139,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         true,
         new ArrayList<>());
     compactionLogger.close();
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -255,7 +254,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
     }
     logStream.close();
 
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -409,9 +408,6 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
   @Test
   public void testCompactionMergeRecoverMergeFinishedUnseq()
       throws IOException, IllegalPathException {
-    int prevUnseqLevelNum = IoTDBDescriptor.getInstance().getConfig().getUnseqLevelNum();
-    IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(2);
-
     LevelCompactionTsFileManagement levelCompactionTsFileManagement =
         new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
     levelCompactionTsFileManagement.addAll(seqResources, true);
@@ -470,7 +466,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         false,
         new ArrayList<>());
     compactionLogger.close();
-    levelCompactionTsFileManagement.add(targetTsFileResource, false);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, false);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -497,8 +493,6 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
       }
     }
     assertEquals(500, count);
-
-    IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(prevUnseqLevelNum);
   }
 
   /** compaction recover merge start just log source file */
@@ -677,7 +671,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         new HashSet<>(),
         true,
         new ArrayList<>());
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     compactionLogger.close();
     levelCompactionTsFileManagement.recover();
     QueryContext context = new QueryContext();

[iotdb] 02/02: Revert "[To rel/0.12] fix compaction block flush bug (#3534)"

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch 0.12-revert
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 696a202f2b8be19ba3f5b6b981bac54047396e07
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Jul 27 16:20:48 2021 +0800

    Revert "[To rel/0.12] fix compaction block flush bug (#3534)"
    
    This reverts commit bf6d83cd
---
 .../db/engine/compaction/CompactionMergeTaskPoolManager.java  | 11 ++---------
 1 file changed, 2 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 9b7949c..9f7ff5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -54,8 +54,7 @@ public class CompactionMergeTaskPoolManager implements IService {
       LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
   private static final CompactionMergeTaskPoolManager INSTANCE =
       new CompactionMergeTaskPoolManager();
-  private ScheduledExecutorService scheduledPool;
-  private ExecutorService pool;
+  private ScheduledExecutorService pool;
   private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
@@ -72,9 +71,6 @@ public class CompactionMergeTaskPoolManager implements IService {
           IoTDBThreadPoolFactory.newScheduledThreadPool(
               IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
               ThreadName.COMPACTION_SERVICE.getName());
-      this.scheduledPool =
-          IoTDBThreadPoolFactory.newScheduledThreadPool(
-              Integer.MAX_VALUE, ThreadName.COMPACTION_SERVICE.getName());
     }
     logger.info("Compaction task manager started.");
   }
@@ -82,7 +78,6 @@ public class CompactionMergeTaskPoolManager implements IService {
   @Override
   public void stop() {
     if (pool != null) {
-      scheduledPool.shutdownNow();
       pool.shutdownNow();
       logger.info("Waiting for task pool to shut down");
       waitTermination();
@@ -93,7 +88,6 @@ public class CompactionMergeTaskPoolManager implements IService {
   @Override
   public void waitAndStop(long milliseconds) {
     if (pool != null) {
-      awaitTermination(scheduledPool, milliseconds);
       awaitTermination(pool, milliseconds);
       logger.info("Waiting for task pool to shut down");
       waitTermination();
@@ -148,7 +142,6 @@ public class CompactionMergeTaskPoolManager implements IService {
         logger.warn("CompactionManager has wait for {} seconds to stop", time / 1000);
       }
     }
-    scheduledPool = null;
     pool = null;
     storageGroupTasks.clear();
     logger.info("CompactionManager stopped");
@@ -197,7 +190,7 @@ public class CompactionMergeTaskPoolManager implements IService {
   }
 
   public void init(Runnable function) {
-    scheduledPool.scheduleWithFixedDelay(
+    pool.scheduleWithFixedDelay(
         function, 1000, config.getCompactionInterval(), TimeUnit.MILLISECONDS);
   }