You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/10 07:47:24 UTC
[iotdb] branch rel/0.11 updated: [IOTDB-1419][To rel/0.11] avoid
stack overflow risk in recover (#3384)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new f57782a [IOTDB-1419][To rel/0.11] avoid stack overflow risk in recover (#3384)
f57782a is described below
commit f57782a0cfdfc59be252b8f671f35d5f227b7039
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Thu Jun 10 02:46:54 2021 -0500
[IOTDB-1419][To rel/0.11] avoid stack overflow risk in recover (#3384)
---
.../engine/compaction/CompactionMergeTaskPoolManager.java | 7 ++++++-
.../iotdb/db/engine/compaction/TsFileManagement.java | 6 ++++--
.../db/engine/storagegroup/StorageGroupProcessor.java | 15 ++++-----------
3 files changed, 14 insertions(+), 14 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index f949cfa..19f2ac7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -25,6 +25,7 @@ import java.io.File;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
@@ -46,7 +47,7 @@ public class CompactionMergeTaskPoolManager implements IService {
private static final Logger logger = LoggerFactory
.getLogger(CompactionMergeTaskPoolManager.class);
private static final CompactionMergeTaskPoolManager INSTANCE = new CompactionMergeTaskPoolManager();
- private ExecutorService pool;
+ private ScheduledExecutorService pool;
private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
public static CompactionMergeTaskPoolManager getInstance() {
@@ -147,6 +148,10 @@ public class CompactionMergeTaskPoolManager implements IService {
sgCompactionStatus.put(storageGroupName, false);
}
+ public void init(Runnable function) {
+ pool.scheduleWithFixedDelay(function, 1, 1, TimeUnit.SECONDS);
+ }
+
public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
throws RejectedExecutionException {
if (pool != null && !pool.isTerminated()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 2625b92..da92f4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -154,11 +154,13 @@ public abstract class TsFileManagement {
public class CompactionOnePartitionUtil {
- private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
private long timePartitionId;
+ private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
+
public CompactionOnePartitionUtil(
- CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
+ CloseCompactionMergeCallBack closeCompactionMergeCallBack,
+ long timePartitionId) {
this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
this.timePartitionId = timePartitionId;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 9f0cf36..6b70bf5 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -391,13 +391,12 @@ public class StorageGroupProcessor {
private void recoverCompaction() {
if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
- logger.info("{} submit a compaction merge task", storageGroupName);
try {
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack));
+ logger.info("{} submit a compaction merge task", storageGroupName);
} catch (RejectedExecutionException e) {
- this.closeCompactionRecoverCallBack(false, 0);
logger.error("{} compaction submit task failed", storageGroupName);
}
} else {
@@ -1888,7 +1887,6 @@ public class StorageGroupProcessor {
.new CompactionOnePartitionUtil(this::closeCompactionMergeCallBack, timePartition)
.run();
} catch (IOException e) {
- this.closeCompactionMergeCallBack(false, timePartition);
logger.error("{} compaction submit task failed", storageGroupName);
}
}
@@ -1898,18 +1896,13 @@ public class StorageGroupProcessor {
CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(storageGroupName);
if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
logger.info("{} recover finished, submit continuous compaction task", storageGroupName);
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(new CompactionAllPartitionTask(storageGroupName));
+ CompactionMergeTaskPoolManager.getInstance().init(this::merge);
}
}
/** close compaction merge callback, to release some locks */
- private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
- if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
- syncCompactOnePartition(
- timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
- }
- }
+ private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {}
+
/**
* count all Tsfiles in the storage group which need to be upgraded