You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/10 05:44:38 UTC
[iotdb] branch rel/0.12 updated: [IOTDB-1419][To rel/0.12] remove
redundant clearCompactionStatus (#3370)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 7111d5a [IOTDB-1419][To rel/0.12] remove redundant clearCompactionStatus (#3370)
7111d5a is described below
commit 7111d5a54493cf5dea65bf6bee72a74844a59ae8
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Thu Jun 10 13:44:01 2021 +0800
[IOTDB-1419][To rel/0.12] remove redundant clearCompactionStatus (#3370)
---
.../compaction/CompactionMergeTaskPoolManager.java | 15 +++++++++++-
.../db/engine/compaction/TsFileManagement.java | 4 +---
.../level/LevelCompactionTsFileManagement.java | 1 +
.../engine/compaction/utils/CompactionUtils.java | 3 ++-
.../engine/storagegroup/StorageGroupProcessor.java | 28 +++++++---------------
.../writelog/recover/TsFileRecoverPerformer.java | 2 +-
6 files changed, 28 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index c04661c..8461bd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
@@ -52,7 +53,7 @@ public class CompactionMergeTaskPoolManager implements IService {
LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
private static final CompactionMergeTaskPoolManager INSTANCE =
new CompactionMergeTaskPoolManager();
- private ExecutorService pool;
+ private ScheduledExecutorService pool;
private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
@@ -178,6 +179,18 @@ public class CompactionMergeTaskPoolManager implements IService {
}
}
+ public synchronized void clearCompactionStatus(String storageGroupName) {
+ // for test
+ if (sgCompactionStatus == null) {
+ sgCompactionStatus = new ConcurrentHashMap<>();
+ }
+ sgCompactionStatus.put(storageGroupName, false);
+ }
+
+ public void init(Runnable function) {
+ pool.scheduleWithFixedDelay(function, 1, 1, TimeUnit.SECONDS);
+ }
+
public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
throws RejectedExecutionException {
if (pool != null && !pool.isTerminated()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 8dc2e16..d997e04 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -187,10 +187,8 @@ public abstract class TsFileManagement {
@Override
public Void call() {
recover();
- // in recover logic, we do not have to start next compaction task, and in this case the param
- // time partition is useless, we can just pass 0L
+ // in recover logic, the param time partition is useless, we can just pass 0L
closeCompactionMergeCallBack.call(false, 0L);
- clearCompactionStatus();
return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index f013669..50ec39f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -735,6 +735,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
logger.error("{} Compaction log close fail", storageGroupName + COMPACTION_LOG_NAME);
}
}
+ isMergeExecutedInCurrentTask = false;
restoreCompaction();
logger.error("Error occurred in Compaction Merge thread", e);
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 4f81329..aaeaf81 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -261,8 +261,8 @@ public class CompactionUtils {
List<Modification> modifications)
throws IOException, IllegalPathException {
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
try {
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
Map<String, List<Modification>> modificationCache = new HashMap<>();
RateLimiter compactionWriteRateLimiter =
MergeManager.getINSTANCE().getMergeWriteRateLimiter();
@@ -423,6 +423,7 @@ public class CompactionUtils {
writer.endFile();
targetResource.close();
} finally {
+ writer.close();
for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
reader.close();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index ff626d6..53b731e 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -543,7 +543,7 @@ public class StorageGroupProcessor {
.submitTask(
tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack));
} catch (RejectedExecutionException e) {
- this.closeCompactionMergeCallBack(false, 0);
+ this.closeCompactionRecoverCallBack(false, 0);
logger.error(
"{} - {} compaction submit task failed",
logicalStorageGroupName,
@@ -2010,30 +2010,20 @@ public class StorageGroupProcessor {
/** close recover compaction merge callback, to start continuous compaction */
private void closeCompactionRecoverCallBack(boolean isMerge, long timePartitionId) {
- logger.info(
- "{}-{} recover finished, submit continuous compaction task",
- logicalStorageGroupName,
- virtualStorageGroupId);
+ CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(logicalStorageGroupName);
if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
- }
- }
-
- /** close compaction merge callback, to release some locks */
- private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
- if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
- syncCompactOnePartition(
- timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
- } else {
logger.info(
- "{}-{} partition:{}, do not have to submit a new compaction merge task",
+ "{}-{} recover finished, submit continuous compaction task",
logicalStorageGroupName,
- virtualStorageGroupId,
- timePartitionId);
+ virtualStorageGroupId);
+
+ CompactionMergeTaskPoolManager.getInstance().init(this::merge);
}
}
+ /** close compaction merge callback, to release some locks */
+ private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {}
+
/**
* count all Tsfiles in the storage group which need to be upgraded
*
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index cea833d..72ea41c 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -156,7 +156,7 @@ public class TsFileRecoverPerformer {
private void recoverResourceFromFile() throws IOException {
try {
tsFileResource.deserialize();
- } catch (IOException e) {
+ } catch (Exception e) {
logger.warn(
"Cannot deserialize TsFileResource {}, construct it using " + "TsFileSequenceReader",
tsFileResource.getTsFile(),