You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/11/11 01:56:19 UTC
[iotdb] branch master updated: Fix compaction delete resource file
bug (#1997)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5bbbdbd Fix compaction delete resource file bug (#1997)
5bbbdbd is described below
commit 5bbbdbd3539385dfa98c135db503d5eb7d5427a6
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Wed Nov 11 09:56:06 2020 +0800
Fix compaction delete resource file bug (#1997)
---
.../compaction/CompactionMergeTaskPoolManager.java | 12 ++++-
.../db/engine/compaction/TsFileManagement.java | 7 +--
.../level/LevelCompactionTsFileManagement.java | 54 ++++++++++++++--------
.../iotdb/db/engine/merge/manage/MergeManager.java | 25 ++++++++--
.../iotdb/db/engine/merge/task/MergeFileTask.java | 5 --
5 files changed, 70 insertions(+), 33 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index f030300..830ea56 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -78,9 +78,17 @@ public class CompactionMergeTaskPoolManager implements IService {
private void waitTermination() {
long startTime = System.currentTimeMillis();
while (!pool.isTerminated()) {
- // wait
+ int timeMillis = 0;
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ logger.error("CompactionMergeTaskPoolManager {} shutdown",
+ ThreadName.COMPACTION_SERVICE.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ timeMillis += 200;
long time = System.currentTimeMillis() - startTime;
- if (time % 60_000 == 0) {
+ if (timeMillis % 60_000 == 0) {
logger.warn("CompactionManager has wait for {} seconds to stop", time / 1000);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 245769a..bdb304a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -190,16 +190,17 @@ public abstract class TsFileManagement {
}
return;
}
- logger.info("{} will close all files for starting a merge (fullmerge = {})", storageGroupName,
- fullMerge);
+ isUnseqMerging = true;
if (seqMergeList.isEmpty()) {
logger.info("{} no seq files to be merged", storageGroupName);
+ isUnseqMerging = false;
return;
}
if (unSeqMergeList.isEmpty()) {
logger.info("{} no unseq files to be merged", storageGroupName);
+ isUnseqMerging = false;
return;
}
@@ -213,6 +214,7 @@ public abstract class TsFileManagement {
if (mergeFiles.length == 0) {
logger.info("{} cannot select merge candidates under the budget {}", storageGroupName,
budget);
+ isUnseqMerging = false;
return;
}
// avoid pending tasks holds the metadata and streams
@@ -229,7 +231,6 @@ public abstract class TsFileManagement {
tsFileResource.setMerging(true);
}
- isUnseqMerging = true;
mergeStartTime = System.currentTimeMillis();
MergeTask mergeTask = new MergeTask(mergeResource, storageGroupDir,
this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index f046f6f..3d36f06 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -87,12 +87,22 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
logger.debug("{} [compaction] merge starts to delete file", storageGroupName);
for (TsFileResource mergeTsFile : mergeTsFiles) {
deleteLevelFile(mergeTsFile);
+ logger
+ .info("{} [Compaction] delete TsFile {}", storageGroupName, mergeTsFile.getTsFilePath());
}
for (int i = 0; i < seqLevelNum; i++) {
- sequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
+ if (sequenceTsFileResources.containsKey(timePartitionId)) {
+ if (sequenceTsFileResources.get(timePartitionId).size() > i) {
+ sequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
+ }
+ }
}
for (int i = 0; i < unseqLevelNum; i++) {
- unSequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
+ if (unSequenceTsFileResources.containsKey(timePartitionId)) {
+ if (unSequenceTsFileResources.get(timePartitionId).size() > i) {
+ unSequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
+ }
+ }
}
}
@@ -341,6 +351,10 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
TsFileResource targetResource = new TsFileResource(targetFile);
long timePartition = targetResource.getTimePartition();
RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+ List<TsFileResource> sourceTsFileResources = new ArrayList<>();
+ for (File file : sourceFileList) {
+ sourceTsFileResources.add(new TsFileResource(file));
+ }
if (sourceFileList.isEmpty()) {
return;
}
@@ -353,22 +367,15 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
writer.close();
if (isSeq) {
CompactionUtils
- .merge(targetResource,
- new ArrayList<>(sequenceTsFileResources.get(timePartition).get(level)),
- storageGroupName,
- new CompactionLogger(storageGroupDir, storageGroupName), deviceSet,
- true);
- deleteLevelFiles(timePartition,
- sequenceTsFileResources.get(timePartition).get(level));
+ .merge(targetResource, sourceTsFileResources, storageGroupName,
+ new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, true);
+ deleteLevelFiles(timePartition, sourceTsFileResources);
sequenceTsFileResources.get(timePartition).get(level + 1).add(targetResource);
} else {
CompactionUtils
- .merge(targetResource, unSequenceTsFileResources.get(timePartition).get(level),
- storageGroupName,
- new CompactionLogger(storageGroupDir, storageGroupName), deviceSet,
- false);
- deleteLevelFiles(timePartition,
- unSequenceTsFileResources.get(timePartition).get(level));
+ .merge(targetResource, sourceTsFileResources, storageGroupName,
+ new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, false);
+ deleteLevelFiles(timePartition, sourceTsFileResources);
unSequenceTsFileResources.get(timePartition).get(level + 1).add(targetResource);
}
}
@@ -445,6 +452,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
} catch (InterruptedException e) {
logger.error("{} [Compaction] shutdown", storageGroupName, e);
Thread.currentThread().interrupt();
+ return;
}
}
long startTimeMillis = System.currentTimeMillis();
@@ -466,16 +474,24 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
i + 1);
compactionLogger.logSequence(sequence);
compactionLogger.logFile(TARGET_NAME, newLevelFile);
- logger.info("{} [Compaction] merge level-{}'s {} tsfiles to next level",
- storageGroupName, i, mergeResources.get(i).size());
+ List<TsFileResource> toMergeTsFiles = mergeResources.get(i);
+ logger.info("{} [Compaction] merge level-{}'s {} TsFiles to next level",
+ storageGroupName, i, toMergeTsFiles.size());
+ for (TsFileResource toMergeTsFile : toMergeTsFiles) {
+ logger.info("{} [Compaction] start to merge TsFile {}", storageGroupName,
+ toMergeTsFile);
+ }
TsFileResource newResource = new TsFileResource(newLevelFile);
CompactionUtils
- .merge(newResource, mergeResources.get(i), storageGroupName, compactionLogger,
+ .merge(newResource, toMergeTsFiles, storageGroupName, compactionLogger,
new HashSet<>(), sequence);
+ logger.info(
+ "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files",
+ storageGroupName, i, toMergeTsFiles.size());
writeLock();
try {
- deleteLevelFiles(timePartition, mergeResources.get(i));
+ deleteLevelFiles(timePartition, toMergeTsFiles);
compactionLogger.logMergeFinish();
if (sequence) {
sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index 158bef6..ac0e38d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -168,9 +169,17 @@ public class MergeManager implements IService, MergeManagerMBean {
logger.info("Waiting for task pool to shut down");
long startTime = System.currentTimeMillis();
while (!mergeTaskPool.isTerminated() || !mergeChunkSubTaskPool.isTerminated()) {
- // wait
+ int timeMillis = 0;
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ logger.error("CompactionMergeTaskPoolManager {} shutdown",
+ ThreadName.COMPACTION_SERVICE.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ timeMillis += 200;
long time = System.currentTimeMillis() - startTime;
- if (time % 60_000 == 0) {
+ if (timeMillis % 60_000 == 0) {
logger.warn("MergeManager has wait for {} seconds to stop", time / 1000);
}
}
@@ -197,9 +206,17 @@ public class MergeManager implements IService, MergeManagerMBean {
logger.info("Waiting for task pool to shut down");
long startTime = System.currentTimeMillis();
while (!mergeTaskPool.isTerminated() || !mergeChunkSubTaskPool.isTerminated()) {
- // wait
+ int timeMillis = 0;
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ logger.error("CompactionMergeTaskPoolManager {} shutdown",
+ ThreadName.COMPACTION_SERVICE.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ timeMillis += 200;
long time = System.currentTimeMillis() - startTime;
- if (time % 60_000 == 0) {
+ if (timeMillis % 60_000 == 0) {
logger.warn("MergeManager has wait for {} seconds to stop", time / 1000);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index eddc290..693e58f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -188,11 +188,6 @@ class MergeFileTask {
seqFile.serialize();
mergeLogger.logFileMergeEnd();
logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
-
- File newMergedFile = newFileWriter.getFile();
- newMergedFile.delete();
- fsFactory.moveFile(seqFile.getTsFile(), newMergedFile);
- seqFile.setFile(newMergedFile);
} catch (Exception e) {
restoreOldFile(seqFile);
throw e;