You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/09 13:03:52 UTC
[iotdb] branch rel/0.11 updated: [IOTDB-1419][To rel/0.11] remove
redundant clearCompactionStatus (#3371)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 777186e [IOTDB-1419][To rel/0.11] remove redundant clearCompactionStatus (#3371)
777186e is described below
commit 777186e962cd5e5cba3660ab7b6b9768f96a16af
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Wed Jun 9 21:03:17 2021 +0800
[IOTDB-1419][To rel/0.11] remove redundant clearCompactionStatus (#3371)
---
.../compaction/CompactionMergeTaskPoolManager.java | 10 ++++++-
.../db/engine/compaction/TsFileManagement.java | 4 +--
.../level/LevelCompactionTsFileManagement.java | 1 +
.../engine/compaction/utils/CompactionUtils.java | 3 +-
.../engine/storagegroup/StorageGroupProcessor.java | 35 ++++++++++++----------
.../writelog/recover/TsFileRecoverPerformer.java | 2 +-
6 files changed, 34 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 268c92e..f949cfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -139,6 +139,14 @@ public class CompactionMergeTaskPoolManager implements IService {
return ServiceType.COMPACTION_SERVICE;
}
+ public synchronized void clearCompactionStatus(String storageGroupName) {
+ // for test
+ if (sgCompactionStatus == null) {
+ sgCompactionStatus = new ConcurrentHashMap<>();
+ }
+ sgCompactionStatus.put(storageGroupName, false);
+ }
+
public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
throws RejectedExecutionException {
if (pool != null && !pool.isTerminated()) {
@@ -147,8 +155,8 @@ public class CompactionMergeTaskPoolManager implements IService {
if (isCompacting) {
return;
}
- storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
sgCompactionStatus.put(storageGroup, true);
+ storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
pool.submit(storageGroupCompactionTask);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 339ac27..2625b92 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -181,10 +181,8 @@ public abstract class TsFileManagement {
@Override
public void run() {
recover();
- // in recover logic, we do not have to start next compaction task, and in this case the param
- // time partition is useless, we can just pass 0L
+ // in recover logic, the param time partition is useless, we can just pass 0L
closeCompactionMergeCallBack.call(false, 0L);
- clearCompactionStatus();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 9f00b4b..208941d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -702,6 +702,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
logger.error("{} Compaction log close fail", storageGroupName + COMPACTION_LOG_NAME);
}
}
+ isMergeExecutedInCurrentTask = false;
restoreCompaction();
logger.error("Error occurred in Compaction Merge thread", e);
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 4662569..822b149 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -256,8 +256,8 @@ public class CompactionUtils {
throws IOException, IllegalPathException {
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
Map<String, List<Modification>> modificationCache = new HashMap<>();
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
try {
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
RateLimiter compactionWriteRateLimiter =
MergeManager.getINSTANCE().getMergeWriteRateLimiter();
Set<String> tsFileDevicesMap =
@@ -405,6 +405,7 @@ public class CompactionUtils {
writer.endFile();
targetResource.close();
} finally {
+ writer.close();
for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
reader.close();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1ce64d3..9f0cf36 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -371,14 +371,9 @@ public class StorageGroupProcessor {
.putAll(endTimeMap);
globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
}
-
- if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(new CompactionAllPartitionTask(storageGroupName));
- }
}
- public class CompactionAllPartitionTask extends StorageGroupCompactionTask{
+ public class CompactionAllPartitionTask extends StorageGroupCompactionTask {
CompactionAllPartitionTask(String storageGroupName) {
super(storageGroupName);
@@ -400,9 +395,9 @@ public class StorageGroupProcessor {
try {
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
- tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
+ tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack));
} catch (RejectedExecutionException e) {
- this.closeCompactionMergeCallBack(false, 0);
+ this.closeCompactionRecoverCallBack(false, 0);
logger.error("{} compaction submit task failed", storageGroupName);
}
} else {
@@ -1861,12 +1856,12 @@ public class StorageGroupProcessor {
}
logger.info("signal closing storage group condition in {}", storageGroupName);
- CompactionMergeTaskPoolManager.getInstance().submitTask(
- new CompactionOnePartitionTask(storageGroupName, tsFileProcessor.getTimeRangeId()));
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(
+ new CompactionOnePartitionTask(storageGroupName, tsFileProcessor.getTimeRangeId()));
}
-
- public class CompactionOnePartitionTask extends StorageGroupCompactionTask{
+ public class CompactionOnePartitionTask extends StorageGroupCompactionTask {
private long partition;
@@ -1878,8 +1873,7 @@ public class StorageGroupProcessor {
@Override
public void run() {
syncCompactOnePartition(
- partition,
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ partition, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
clearCompactionStatus();
}
}
@@ -1890,7 +1884,8 @@ public class StorageGroupProcessor {
// fork and filter current tsfile, then commit then to compaction merge
tsFileManagement.forkCurrentFileList(timePartition);
tsFileManagement.setForceFullMerge(fullMerge);
- tsFileManagement.new CompactionOnePartitionUtil(this::closeCompactionMergeCallBack, timePartition)
+ tsFileManagement
+ .new CompactionOnePartitionUtil(this::closeCompactionMergeCallBack, timePartition)
.run();
} catch (IOException e) {
this.closeCompactionMergeCallBack(false, timePartition);
@@ -1898,6 +1893,16 @@ public class StorageGroupProcessor {
}
}
+ /** close recover compaction merge callback, to start continuous compaction */
+ private void closeCompactionRecoverCallBack(boolean isMerge, long timePartitionId) {
+ CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(storageGroupName);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+ logger.info("{} recover finished, submit continuous compaction task", storageGroupName);
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(new CompactionAllPartitionTask(storageGroupName));
+ }
+ }
+
/** close compaction merge callback, to release some locks */
private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index f2ccfe6..cd5e7b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -159,7 +159,7 @@ public class TsFileRecoverPerformer {
private void recoverResourceFromFile() throws IOException {
try {
tsFileResource.deserialize();
- } catch (IOException e) {
+ } catch (Exception e) {
logger.warn("Cannot deserialize TsFileResource {}, construct it using "
+ "TsFileSequenceReader", tsFileResource.getTsFile(), e);
recoverResourceFromReader();