You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/10 07:00:55 UTC

[iotdb] branch avoid_stack_overflow_0.11 created (now 8638e9d)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch avoid_stack_overflow_0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 8638e9d  avoid stack overflow risk in recover

This branch includes the following new commits:

     new 8638e9d  avoid stack overflow risk in recover

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: avoid stack overflow risk in recover

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch avoid_stack_overflow_0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8638e9db23f5a169c9bf70ef6ac5c470fb9a986e
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Jun 10 15:00:18 2021 +0800

    avoid stack overflow risk in recover
---
 .../compaction/CompactionMergeTaskPoolManager.java    |  7 ++++++-
 .../iotdb/db/engine/compaction/TsFileManagement.java  |  6 +-----
 .../db/engine/storagegroup/StorageGroupProcessor.java | 19 +++----------------
 3 files changed, 10 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index f949cfa..19f2ac7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
@@ -46,7 +47,7 @@ public class CompactionMergeTaskPoolManager implements IService {
   private static final Logger logger = LoggerFactory
       .getLogger(CompactionMergeTaskPoolManager.class);
   private static final CompactionMergeTaskPoolManager INSTANCE = new CompactionMergeTaskPoolManager();
-  private ExecutorService pool;
+  private ScheduledExecutorService pool;
 
   private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
   public static CompactionMergeTaskPoolManager getInstance() {
@@ -147,6 +148,10 @@ public class CompactionMergeTaskPoolManager implements IService {
     sgCompactionStatus.put(storageGroupName, false);
   }
 
+  public void init(Runnable function) {
+    pool.scheduleWithFixedDelay(function, 1, 1, TimeUnit.SECONDS);
+  }
+
   public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
       throws RejectedExecutionException {
     if (pool != null && !pool.isTerminated()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 2625b92..953de2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -154,18 +154,14 @@ public abstract class TsFileManagement {
 
   public class CompactionOnePartitionUtil {
 
-    private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
     private long timePartitionId;
 
-    public CompactionOnePartitionUtil(
-        CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
-      this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
+    public CompactionOnePartitionUtil(long timePartitionId) {
       this.timePartitionId = timePartitionId;
     }
 
     public void run() {
       merge(timePartitionId);
-      closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 9f0cf36..9d7357f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -391,13 +391,12 @@ public class StorageGroupProcessor {
 
   private void recoverCompaction() {
     if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      logger.info("{} submit a compaction merge task", storageGroupName);
       try {
         CompactionMergeTaskPoolManager.getInstance()
             .submitTask(
                 tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack));
+        logger.info("{} submit a compaction merge task", storageGroupName);
       } catch (RejectedExecutionException e) {
-        this.closeCompactionRecoverCallBack(false, 0);
         logger.error("{} compaction submit task failed", storageGroupName);
       }
     } else {
@@ -1884,11 +1883,8 @@ public class StorageGroupProcessor {
       // fork and filter current tsfile, then commit then to compaction merge
       tsFileManagement.forkCurrentFileList(timePartition);
       tsFileManagement.setForceFullMerge(fullMerge);
-      tsFileManagement
-          .new CompactionOnePartitionUtil(this::closeCompactionMergeCallBack, timePartition)
-          .run();
+      tsFileManagement.new CompactionOnePartitionUtil(timePartition).run();
     } catch (IOException e) {
-      this.closeCompactionMergeCallBack(false, timePartition);
       logger.error("{} compaction submit task failed", storageGroupName);
     }
   }
@@ -1898,16 +1894,7 @@ public class StorageGroupProcessor {
     CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(storageGroupName);
     if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
       logger.info("{} recover finished, submit continuous compaction task", storageGroupName);
-      CompactionMergeTaskPoolManager.getInstance()
-          .submitTask(new CompactionAllPartitionTask(storageGroupName));
-    }
-  }
-
-  /** close compaction merge callback, to release some locks */
-  private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
-    if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
-      syncCompactOnePartition(
-          timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+      CompactionMergeTaskPoolManager.getInstance().init(this::merge);
     }
   }