You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/09 13:03:52 UTC

[iotdb] branch rel/0.11 updated: [IOTDB-1419][To rel/0.11] remove redundant clearCompactionStatus (#3371)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 777186e  [IOTDB-1419][To rel/0.11] remove redundant clearCompactionStatus (#3371)
777186e is described below

commit 777186e962cd5e5cba3660ab7b6b9768f96a16af
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Wed Jun 9 21:03:17 2021 +0800

    [IOTDB-1419][To rel/0.11] remove redundant clearCompactionStatus (#3371)
---
 .../compaction/CompactionMergeTaskPoolManager.java | 10 ++++++-
 .../db/engine/compaction/TsFileManagement.java     |  4 +--
 .../level/LevelCompactionTsFileManagement.java     |  1 +
 .../engine/compaction/utils/CompactionUtils.java   |  3 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 35 ++++++++++++----------
 .../writelog/recover/TsFileRecoverPerformer.java   |  2 +-
 6 files changed, 34 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 268c92e..f949cfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -139,6 +139,14 @@ public class CompactionMergeTaskPoolManager implements IService {
     return ServiceType.COMPACTION_SERVICE;
   }
 
+  public synchronized void clearCompactionStatus(String storageGroupName) {
+    // for test
+    if (sgCompactionStatus == null) {
+      sgCompactionStatus = new ConcurrentHashMap<>();
+    }
+    sgCompactionStatus.put(storageGroupName, false);
+  }
+
   public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
       throws RejectedExecutionException {
     if (pool != null && !pool.isTerminated()) {
@@ -147,8 +155,8 @@ public class CompactionMergeTaskPoolManager implements IService {
       if (isCompacting) {
         return;
       }
-      storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
       sgCompactionStatus.put(storageGroup, true);
+      storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
       pool.submit(storageGroupCompactionTask);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 339ac27..2625b92 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -181,10 +181,8 @@ public abstract class TsFileManagement {
     @Override
     public void run() {
       recover();
-      // in recover logic, we do not have to start next compaction task, and in this case the param
-      // time partition is useless, we can just pass 0L
+      // in recover logic, the param time partition is useless, we can just pass 0L
       closeCompactionMergeCallBack.call(false, 0L);
-      clearCompactionStatus();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 9f00b4b..208941d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -702,6 +702,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           logger.error("{} Compaction log close fail", storageGroupName + COMPACTION_LOG_NAME);
         }
       }
+      isMergeExecutedInCurrentTask = false;
       restoreCompaction();
       logger.error("Error occurred in Compaction Merge thread", e);
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 4662569..822b149 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -256,8 +256,8 @@ public class CompactionUtils {
       throws IOException, IllegalPathException {
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
     Map<String, List<Modification>> modificationCache = new HashMap<>();
+    RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
     try {
-      RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
       RateLimiter compactionWriteRateLimiter =
           MergeManager.getINSTANCE().getMergeWriteRateLimiter();
       Set<String> tsFileDevicesMap =
@@ -405,6 +405,7 @@ public class CompactionUtils {
       writer.endFile();
       targetResource.close();
     } finally {
+      writer.close();
       for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
         reader.close();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1ce64d3..9f0cf36 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -371,14 +371,9 @@ public class StorageGroupProcessor {
           .putAll(endTimeMap);
       globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
     }
-
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
-      CompactionMergeTaskPoolManager.getInstance()
-          .submitTask(new CompactionAllPartitionTask(storageGroupName));
-    }
   }
 
-  public class CompactionAllPartitionTask extends StorageGroupCompactionTask{
+  public class CompactionAllPartitionTask extends StorageGroupCompactionTask {
 
     CompactionAllPartitionTask(String storageGroupName) {
       super(storageGroupName);
@@ -400,9 +395,9 @@ public class StorageGroupProcessor {
       try {
         CompactionMergeTaskPoolManager.getInstance()
             .submitTask(
-                tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
+                tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack));
       } catch (RejectedExecutionException e) {
-        this.closeCompactionMergeCallBack(false, 0);
+        this.closeCompactionRecoverCallBack(false, 0);
         logger.error("{} compaction submit task failed", storageGroupName);
       }
     } else {
@@ -1861,12 +1856,12 @@ public class StorageGroupProcessor {
     }
     logger.info("signal closing storage group condition in {}", storageGroupName);
 
-    CompactionMergeTaskPoolManager.getInstance().submitTask(
-        new CompactionOnePartitionTask(storageGroupName, tsFileProcessor.getTimeRangeId()));
+    CompactionMergeTaskPoolManager.getInstance()
+        .submitTask(
+            new CompactionOnePartitionTask(storageGroupName, tsFileProcessor.getTimeRangeId()));
   }
 
-
-  public class CompactionOnePartitionTask extends StorageGroupCompactionTask{
+  public class CompactionOnePartitionTask extends StorageGroupCompactionTask {
 
     private long partition;
 
@@ -1878,8 +1873,7 @@ public class StorageGroupProcessor {
     @Override
     public void run() {
       syncCompactOnePartition(
-          partition,
-          IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+          partition, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
       clearCompactionStatus();
     }
   }
@@ -1890,7 +1884,8 @@ public class StorageGroupProcessor {
       // fork and filter current tsfile, then commit then to compaction merge
       tsFileManagement.forkCurrentFileList(timePartition);
       tsFileManagement.setForceFullMerge(fullMerge);
-      tsFileManagement.new CompactionOnePartitionUtil(this::closeCompactionMergeCallBack, timePartition)
+      tsFileManagement
+          .new CompactionOnePartitionUtil(this::closeCompactionMergeCallBack, timePartition)
           .run();
     } catch (IOException e) {
       this.closeCompactionMergeCallBack(false, timePartition);
@@ -1898,6 +1893,16 @@ public class StorageGroupProcessor {
     }
   }
 
+  /** close recover compaction merge callback, to start continuous compaction */
+  private void closeCompactionRecoverCallBack(boolean isMerge, long timePartitionId) {
+    CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(storageGroupName);
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+      logger.info("{} recover finished, submit continuous compaction task", storageGroupName);
+      CompactionMergeTaskPoolManager.getInstance()
+          .submitTask(new CompactionAllPartitionTask(storageGroupName));
+    }
+  }
+
   /** close compaction merge callback, to release some locks */
   private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
     if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index f2ccfe6..cd5e7b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -159,7 +159,7 @@ public class TsFileRecoverPerformer {
   private void recoverResourceFromFile() throws IOException {
     try {
       tsFileResource.deserialize();
-    } catch (IOException e) {
+    } catch (Exception e) {
       logger.warn("Cannot deserialize TsFileResource {}, construct it using "
           + "TsFileSequenceReader", tsFileResource.getTsFile(), e);
       recoverResourceFromReader();