You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/10 05:44:38 UTC

[iotdb] branch rel/0.12 updated: [IOTDB-1419][To rel/0.12] remove redundant clearCompactionStatus (#3370)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 7111d5a  [IOTDB-1419][To rel/0.12] remove redundant clearCompactionStatus (#3370)
7111d5a is described below

commit 7111d5a54493cf5dea65bf6bee72a74844a59ae8
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Thu Jun 10 13:44:01 2021 +0800

    [IOTDB-1419][To rel/0.12] remove redundant clearCompactionStatus (#3370)
---
 .../compaction/CompactionMergeTaskPoolManager.java | 15 +++++++++++-
 .../db/engine/compaction/TsFileManagement.java     |  4 +---
 .../level/LevelCompactionTsFileManagement.java     |  1 +
 .../engine/compaction/utils/CompactionUtils.java   |  3 ++-
 .../engine/storagegroup/StorageGroupProcessor.java | 28 +++++++---------------
 .../writelog/recover/TsFileRecoverPerformer.java   |  2 +-
 6 files changed, 28 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index c04661c..8461bd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
@@ -52,7 +53,7 @@ public class CompactionMergeTaskPoolManager implements IService {
       LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
   private static final CompactionMergeTaskPoolManager INSTANCE =
       new CompactionMergeTaskPoolManager();
-  private ExecutorService pool;
+  private ScheduledExecutorService pool;
   private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
 
   private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
@@ -178,6 +179,18 @@ public class CompactionMergeTaskPoolManager implements IService {
     }
   }
 
+  public synchronized void clearCompactionStatus(String storageGroupName) {
+    // for test
+    if (sgCompactionStatus == null) {
+      sgCompactionStatus = new ConcurrentHashMap<>();
+    }
+    sgCompactionStatus.put(storageGroupName, false);
+  }
+
+  public void init(Runnable function) {
+    pool.scheduleWithFixedDelay(function, 1, 1, TimeUnit.SECONDS);
+  }
+
   public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
       throws RejectedExecutionException {
     if (pool != null && !pool.isTerminated()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 8dc2e16..d997e04 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -187,10 +187,8 @@ public abstract class TsFileManagement {
     @Override
     public Void call() {
       recover();
-      // in recover logic, we do not have to start next compaction task, and in this case the param
-      // time partition is useless, we can just pass 0L
+      // in recover logic, the param time partition is useless, we can just pass 0L
       closeCompactionMergeCallBack.call(false, 0L);
-      clearCompactionStatus();
       return null;
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index f013669..50ec39f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -735,6 +735,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           logger.error("{} Compaction log close fail", storageGroupName + COMPACTION_LOG_NAME);
         }
       }
+      isMergeExecutedInCurrentTask = false;
       restoreCompaction();
       logger.error("Error occurred in Compaction Merge thread", e);
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 4f81329..aaeaf81 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -261,8 +261,8 @@ public class CompactionUtils {
       List<Modification> modifications)
       throws IOException, IllegalPathException {
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
+    RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
     try {
-      RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
       Map<String, List<Modification>> modificationCache = new HashMap<>();
       RateLimiter compactionWriteRateLimiter =
           MergeManager.getINSTANCE().getMergeWriteRateLimiter();
@@ -423,6 +423,7 @@ public class CompactionUtils {
       writer.endFile();
       targetResource.close();
     } finally {
+      writer.close();
       for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
         reader.close();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index ff626d6..53b731e 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -543,7 +543,7 @@ public class StorageGroupProcessor {
             .submitTask(
                 tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack));
       } catch (RejectedExecutionException e) {
-        this.closeCompactionMergeCallBack(false, 0);
+        this.closeCompactionRecoverCallBack(false, 0);
         logger.error(
             "{} - {} compaction submit task failed",
             logicalStorageGroupName,
@@ -2010,30 +2010,20 @@ public class StorageGroupProcessor {
 
   /** close recover compaction merge callback, to start continuous compaction */
   private void closeCompactionRecoverCallBack(boolean isMerge, long timePartitionId) {
-    logger.info(
-        "{}-{} recover finished, submit continuous compaction task",
-        logicalStorageGroupName,
-        virtualStorageGroupId);
+    CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(logicalStorageGroupName);
     if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
-      CompactionMergeTaskPoolManager.getInstance()
-          .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
-    }
-  }
-
-  /** close compaction merge callback, to release some locks */
-  private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
-    if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
-      syncCompactOnePartition(
-          timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
-    } else {
       logger.info(
-          "{}-{} partition:{}, do not have to submit a new compaction merge task",
+          "{}-{} recover finished, submit continuous compaction task",
           logicalStorageGroupName,
-          virtualStorageGroupId,
-          timePartitionId);
+          virtualStorageGroupId);
+
+      CompactionMergeTaskPoolManager.getInstance().init(this::merge);
     }
   }
 
+  /** close compaction merge callback, to release some locks */
+  private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {}
+
   /**
    * count all Tsfiles in the storage group which need to be upgraded
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index cea833d..72ea41c 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -156,7 +156,7 @@ public class TsFileRecoverPerformer {
   private void recoverResourceFromFile() throws IOException {
     try {
       tsFileResource.deserialize();
-    } catch (IOException e) {
+    } catch (Exception e) {
       logger.warn(
           "Cannot deserialize TsFileResource {}, construct it using " + "TsFileSequenceReader",
           tsFileResource.getTsFile(),