You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/01 12:22:44 UTC
[iotdb] 01/01: clean merge code
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch clear_merge_code
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 20ce1dcb4860839a3509bde11e8e0467e2612301
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 1 20:21:59 2021 +0800
clean merge code
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +-
...PoolManager.java => CompactionTaskManager.java} | 12 +-
.../db/engine/compaction/TsFileManagement.java | 109 +++---------
.../level/LevelCompactionTsFileManagement.java | 191 +++++++++------------
.../iotdb/db/engine/merge/manage/MergeManager.java | 2 +-
.../db/engine/merge/manage/MergeResource.java | 48 ++++--
.../iotdb/db/engine/merge/recover/LogAnalyzer.java | 7 +-
.../engine/merge/selector/IMergeFileSelector.java | 4 +
.../iotdb/db/engine/merge/task/MergeTask.java | 183 ++++++++++----------
.../db/engine/merge/task/RecoverMergeTask.java | 1 +
.../engine/storagegroup/StorageGroupProcessor.java | 52 +++---
.../java/org/apache/iotdb/db/service/IoTDB.java | 4 +-
.../apache/iotdb/db/engine/merge/MergeLogTest.java | 3 +-
.../iotdb/db/engine/merge/MergeManagerTest.java | 11 +-
.../iotdb/db/engine/merge/MergeOverLapTest.java | 3 +-
.../iotdb/db/engine/merge/MergePerfTest.java | 4 +-
.../iotdb/db/engine/merge/MergeTaskTest.java | 33 ++--
.../storagegroup/StorageGroupProcessorTest.java | 2 +-
.../iotdb/db/integration/IoTDBRestartIT.java | 4 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 +-
21 files changed, 302 insertions(+), 389 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6f9498b..61b8997 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -519,7 +519,7 @@ public class IoTDBConfig {
* despite how much they are overflowed). This may increase merge overhead depending on how much
* the SeqFiles are overflowed.
*/
- private boolean forceFullMerge = true;
+ private boolean fullMerge = true;
/** The limit of compaction merge can reach per second */
private int mergeWriteThroughputMbPerSec = 8;
@@ -1404,12 +1404,12 @@ public class IoTDBConfig {
this.enablePartialInsert = enablePartialInsert;
}
- public boolean isForceFullMerge() {
- return forceFullMerge;
+ public boolean isFullMerge() {
+ return fullMerge;
}
- void setForceFullMerge(boolean forceFullMerge) {
- this.forceFullMerge = forceFullMerge;
+ void setFullMerge(boolean fullMerge) {
+ this.fullMerge = fullMerge;
}
public int getCompactionThreadNum() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 645a534..6077a3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -479,10 +479,10 @@ public class IoTDBDescriptor {
Long.parseLong(
properties.getProperty(
"merge_interval_sec", Long.toString(conf.getMergeIntervalSec()))));
- conf.setForceFullMerge(
+ conf.setFullMerge(
Boolean.parseBoolean(
properties.getProperty(
- "force_full_merge", Boolean.toString(conf.isForceFullMerge()))));
+ "force_full_merge", Boolean.toString(conf.isFullMerge()))));
conf.setCompactionThreadNum(
Integer.parseInt(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index cdf6f4a..53dc9cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -46,17 +46,17 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
-/** CompactionMergeTaskPoolManager provides a ThreadPool to queue and run all compaction tasks. */
-public class CompactionMergeTaskPoolManager implements IService {
+/** CompactionTaskManager provides a ThreadPool to queue and run all compaction tasks. */
+public class CompactionTaskManager implements IService {
private static final Logger logger =
- LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
- private static final CompactionMergeTaskPoolManager INSTANCE =
- new CompactionMergeTaskPoolManager();
+ LoggerFactory.getLogger(CompactionTaskManager.class);
+ private static final CompactionTaskManager INSTANCE =
+ new CompactionTaskManager();
private ExecutorService pool;
private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
- public static CompactionMergeTaskPoolManager getInstance() {
+ public static CompactionTaskManager getInstance() {
return INSTANCE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index aea6e8b..439c50d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
-import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
import org.apache.iotdb.db.engine.merge.selector.MaxFileMergeFileSelector;
@@ -33,7 +32,6 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseCompactionMergeCallBack;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
@@ -62,20 +60,16 @@ public abstract class TsFileManagement {
/** Serialize queries, delete resource files, compaction cleanup files */
private final ReadWriteLock compactionMergeLock = new ReentrantReadWriteLock();
- public volatile boolean isUnseqMerging = false;
- public volatile boolean isSeqMerging = false;
/**
* This is the modification file of the result of the current merge. Because the merged file may
* be invisible at this moment, without this, deletion/update during merge could be lost.
*/
public ModificationFile mergingModification;
- private long mergeStartTime;
-
/** whether execute merge chunk in this task */
protected boolean isMergeExecutedInCurrentTask = false;
- protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
+ protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isFullMerge();
private final int maxOpenFileNumInEachUnseqCompaction =
IoTDBDescriptor.getInstance().getConfig().getMaxOpenFileNumInEachUnseqCompaction();
@@ -84,7 +78,7 @@ public abstract class TsFileManagement {
this.storageGroupDir = storageGroupDir;
}
- public void setForceFullMerge(boolean forceFullMerge) {
+ public void setFullMerge(boolean forceFullMerge) {
isForceFullMerge = forceFullMerge;
}
@@ -176,11 +170,11 @@ public abstract class TsFileManagement {
}
}
- public class CompactionRecoverTask implements Callable<Void> {
+ public class LevelCompactionRecoverTask implements Callable<Void> {
private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
- public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+ public LevelCompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
}
@@ -194,107 +188,63 @@ public abstract class TsFileManagement {
}
}
- public synchronized void merge(
+ protected void doUnseqMerge(
boolean fullMerge,
List<TsFileResource> seqMergeList,
- List<TsFileResource> unSeqMergeList,
- long dataTTL) {
- if (isUnseqMerging) {
- if (logger.isInfoEnabled()) {
- logger.info(
- "{} Last merge is ongoing, currently consumed time: {}ms",
- storageGroupName,
- (System.currentTimeMillis() - mergeStartTime));
- }
- return;
- }
- // wait until seq merge has finished
- while (isSeqMerging) {
- try {
- wait(200);
- } catch (InterruptedException e) {
- logger.error("{} [Compaction] shutdown", storageGroupName, e);
- Thread.currentThread().interrupt();
- return;
- }
- }
- isUnseqMerging = true;
-
- if (seqMergeList.isEmpty()) {
- logger.info("{} no seq files to be merged", storageGroupName);
- isUnseqMerging = false;
- return;
- }
+ List<TsFileResource> unSeqMergeList) {
- if (unSeqMergeList.isEmpty()) {
- logger.info("{} no unseq files to be merged", storageGroupName);
- isUnseqMerging = false;
+ if (seqMergeList.isEmpty() || unSeqMergeList.isEmpty()) {
+ logger.info("{} no files to be merged, seqFiles={}, unseqFiles={}",
+ storageGroupName, seqMergeList.size(), unSeqMergeList.size());
return;
}
+ // the number of unseq files in one merge should not exceed maxOpenFileNumInEachUnseqCompaction
if (unSeqMergeList.size() > maxOpenFileNumInEachUnseqCompaction) {
- logger.info(
- "{} too much unseq files to be merged, reduce it to {}",
- storageGroupName,
- maxOpenFileNumInEachUnseqCompaction);
unSeqMergeList = unSeqMergeList.subList(0, maxOpenFileNumInEachUnseqCompaction);
}
- long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
- long timeLowerBound = System.currentTimeMillis() - dataTTL;
+ long timeLowerBound = System.currentTimeMillis() - IoTDBDescriptor.getInstance().getConfig().getDefaultTTL();
MergeResource mergeResource = new MergeResource(seqMergeList, unSeqMergeList, timeLowerBound);
- IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
+ IMergeFileSelector fileSelector = getMergeFileSelector(mergeResource);
try {
List[] mergeFiles = fileSelector.select();
if (mergeFiles.length == 0) {
logger.info(
- "{} cannot select merge candidates under the budget {}", storageGroupName, budget);
- isUnseqMerging = false;
+ "{} cannot select merge candidates under the budget {}",
+ storageGroupName, IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget());
return;
}
- // avoid pending tasks holds the metadata and streams
- mergeResource.clear();
- String taskName = storageGroupName + "-" + System.currentTimeMillis();
- // do not cache metadata until true candidates are chosen, or too much metadata will be
- // cached during selection
- mergeResource.setCacheDeviceMeta(true);
-
- for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) {
- tsFileResource.setMerging(true);
- }
- for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) {
- tsFileResource.setMerging(true);
- }
- mergeStartTime = System.currentTimeMillis();
+ mergeResource.startMerging();
+
MergeTask mergeTask =
new MergeTask(
mergeResource,
storageGroupDir,
this::mergeEndAction,
- taskName,
fullMerge,
fileSelector.getConcurrentMergeNum(),
storageGroupName);
+
mergingModification =
new ModificationFile(storageGroupDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
- MergeManager.getINSTANCE().submitMainTask(mergeTask);
- if (logger.isInfoEnabled()) {
- logger.info(
- "{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
- storageGroupName,
- taskName,
- mergeFiles[0].size(),
- mergeFiles[1].size());
- }
- } catch (MergeException | IOException e) {
+ logger.info(
+ "{} start merge {} seqFiles, {} unseqFiles", storageGroupName,
+ mergeFiles[0].size(),
+ mergeFiles[1].size());
+
+ mergeTask.doMerge();
+
+ } catch (Exception e) {
logger.error("{} cannot select file for merge", storageGroupName, e);
}
}
- private IMergeFileSelector getMergeFileSelector(long budget, MergeResource resource) {
+ private IMergeFileSelector getMergeFileSelector(MergeResource resource) {
+ long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
switch (strategy) {
case MAX_FILE_NUM:
@@ -402,14 +352,12 @@ public abstract class TsFileManagement {
if (Thread.currentThread().isInterrupted() || unseqFiles.isEmpty()) {
// merge task abort, or merge runtime exception arose, just end this merge
- isUnseqMerging = false;
logger.info("{} a merge task abnormally ends", storageGroupName);
return;
}
removeUnseqFiles(unseqFiles);
- for (int i = 0; i < seqFiles.size(); i++) {
- TsFileResource seqFile = seqFiles.get(i);
+ for (TsFileResource seqFile : seqFiles) {
// get both seqFile lock and merge lock
doubleWriteLock(seqFile);
@@ -430,7 +378,6 @@ public abstract class TsFileManagement {
try {
removeMergingModification();
- isUnseqMerging = false;
Files.delete(mergeLog.toPath());
} catch (IOException e) {
logger.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 98f1c2e..0b6730d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -64,7 +64,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
private static final Logger logger =
LoggerFactory.getLogger(LevelCompactionTsFileManagement.class);
- private final int seqLevelNum =
+ private final int totalSeqLevelNum =
Math.max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1);
private final int seqFileNumInEachLevel =
Math.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
@@ -267,7 +267,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
long timePartitionId = tsFileResource.getTimePartition();
int level = getMergeLevel(tsFileResource.getTsFile());
if (sequence) {
- if (level <= seqLevelNum - 1) {
+ if (level <= totalSeqLevelNum - 1) {
// current file has normal level
sequenceTsFileResources
.computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
@@ -277,7 +277,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
// current file has too high level
sequenceTsFileResources
.computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
- .get(seqLevelNum - 1)
+ .get(totalSeqLevelNum - 1)
.add(tsFileResource);
}
} else {
@@ -397,7 +397,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (sequence) {
for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
sequenceTsFileResources.values()) {
- for (int i = seqLevelNum - 1; i >= 0; i--) {
+ for (int i = totalSeqLevelNum - 1; i >= 0; i--) {
result += partitionSequenceTsFileResource.get(i).size();
}
}
@@ -564,7 +564,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
forkTsFileList(
forkedSequenceTsFileResources,
sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
- seqLevelNum);
+ totalSeqLevelNum);
// we have to copy all unseq file
forkTsFileList(
forkedUnSequenceTsFileResources,
@@ -594,47 +594,28 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
@Override
protected void merge(long timePartition) {
- isMergeExecutedInCurrentTask =
- merge(
- forkedSequenceTsFileResources, true, timePartition, seqLevelNum, seqFileNumInEachLevel);
- if (enableUnseqCompaction
- && unseqLevelNum <= 1
- && forkedUnSequenceTsFileResources.get(0).size() > 0) {
- isMergeExecutedInCurrentTask = true;
- merge(
- isForceFullMerge,
- getTsFileListByTimePartition(true, timePartition),
- forkedUnSequenceTsFileResources.get(0),
- Long.MAX_VALUE);
- } else {
- isMergeExecutedInCurrentTask =
- merge(
- forkedUnSequenceTsFileResources,
- false,
- timePartition,
- unseqLevelNum,
- unseqFileNumInEachLevel);
+ // do unseq compaction if has an unseq file in max unseq level
+ if (enableUnseqCompaction && forkedUnSequenceTsFileResources.get(unseqLevelNum - 1).size() > 0) {
+ doUnseqMerge(isForceFullMerge, getTsFileListByTimePartition(true, timePartition),
+ forkedUnSequenceTsFileResources.get(unseqLevelNum - 1));
}
+
+ doLevelCompaction(forkedSequenceTsFileResources, true, timePartition, totalSeqLevelNum,
+ seqFileNumInEachLevel);
+
+ doLevelCompaction(forkedUnSequenceTsFileResources, false, timePartition, unseqLevelNum,
+ unseqFileNumInEachLevel);
+
}
- @SuppressWarnings("squid:S3776")
- private boolean merge(
- List<List<TsFileResource>> mergeResources,
+ @SuppressWarnings("squid:S3776") //MERGE TODO: move to a LevelCompactionExecutor
+ private boolean doLevelCompaction(
+ List<List<TsFileResource>> mergeResources, // each level is a List<TsFileResource>
boolean sequence,
long timePartition,
int currMaxLevel,
int currMaxFileNumInEachLevel) {
// wait until unseq merge has finished
- while (isUnseqMerging) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- logger.error("{} [Compaction] shutdown", storageGroupName, e);
- Thread.currentThread().interrupt();
- return false;
- }
- }
- isSeqMerging = true;
long startTimeMillis = System.currentTimeMillis();
// whether execute merge chunk in the loop below
boolean isMergeExecutedInCurrentTask = false;
@@ -646,83 +627,72 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (currMaxFileNumInEachLevel <= currLevelTsFileResource.size()) {
// just merge part of the file
isMergeExecutedInCurrentTask = true;
- // level is numbered from 0
- if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) {
- // do not merge current unseq file level to upper level and just merge all of them to
- // seq file
- isSeqMerging = false;
- merge(
- isForceFullMerge,
- getTsFileListByTimePartition(true, timePartition),
- mergeResources.get(i),
- Long.MAX_VALUE);
- } else {
- compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
- // log source file list and target file for recover
- for (TsFileResource mergeResource : mergeResources.get(i)) {
- compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
- }
- File newLevelFile =
- TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile());
- compactionLogger.logSequence(sequence);
- compactionLogger.logFile(TARGET_NAME, newLevelFile);
- List<TsFileResource> toMergeTsFiles =
- mergeResources.get(i).subList(0, currMaxFileNumInEachLevel);
+ compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
+ // log source file list and target file for recover
+ for (TsFileResource mergeResource : mergeResources.get(i)) {
+ compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
+ }
+ File newLevelFile =
+ TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile());
+ compactionLogger.logSequence(sequence);
+ compactionLogger.logFile(TARGET_NAME, newLevelFile);
+ List<TsFileResource> toMergeTsFiles =
+ mergeResources.get(i).subList(0, currMaxFileNumInEachLevel);
+ logger.info(
+ "{} [Compaction] merge level-{}'s {} TsFiles to next level",
+ storageGroupName,
+ i,
+ toMergeTsFiles.size());
+ for (TsFileResource toMergeTsFile : toMergeTsFiles) {
logger.info(
- "{} [Compaction] merge level-{}'s {} TsFiles to next level",
- storageGroupName,
- i,
- toMergeTsFiles.size());
- for (TsFileResource toMergeTsFile : toMergeTsFiles) {
- logger.info(
- "{} [Compaction] start to merge TsFile {}", storageGroupName, toMergeTsFile);
- }
+ "{} [Compaction] start to merge TsFile {}", storageGroupName, toMergeTsFile);
+ }
- TsFileResource newResource = new TsFileResource(newLevelFile);
- List<Modification> modifications = new ArrayList<>();
- // merge, read from source files and write to target file
- CompactionUtils.merge(
- newResource,
- toMergeTsFiles,
- storageGroupName,
- compactionLogger,
- new HashSet<>(),
- sequence,
- modifications);
- logger.info(
- "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files",
- storageGroupName,
- i,
- toMergeTsFiles.size());
- writeLock();
- try {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException(
- String.format("%s [Compaction] abort", storageGroupName));
- }
+ TsFileResource newResource = new TsFileResource(newLevelFile);
+ List<Modification> modifications = new ArrayList<>();
+ // merge, read from source files and write to target file
+ CompactionUtils.merge(
+ newResource,
+ toMergeTsFiles,
+ storageGroupName,
+ compactionLogger,
+ new HashSet<>(),
+ sequence,
+ modifications);
+ logger.info(
+ "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files",
+ storageGroupName,
+ i,
+ toMergeTsFiles.size());
+ writeLock();
+ try {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException(
+ String.format("%s [Compaction] abort", storageGroupName));
+ }
- if (sequence) {
- sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
- } else {
- unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
- }
- deleteLevelFilesInList(timePartition, toMergeTsFiles, i, sequence);
- if (mergeResources.size() > i + 1) {
- mergeResources.get(i + 1).add(newResource);
- }
- } finally {
- writeUnlock();
+ if (sequence) {
+ sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
+ } else {
+ unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
}
- deleteLevelFilesInDisk(toMergeTsFiles);
- renameLevelFilesMods(modifications, toMergeTsFiles, newResource);
- compactionLogger.close();
- File logFile =
- FSFactoryProducer.getFSFactory()
- .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
- if (logFile.exists()) {
- Files.delete(logFile.toPath());
+ deleteLevelFilesInList(timePartition, toMergeTsFiles, i, sequence);
+ if (mergeResources.size() > i + 1) {
+ mergeResources.get(i + 1).add(newResource);
}
+ } finally {
+ writeUnlock();
+ }
+ deleteLevelFilesInDisk(toMergeTsFiles);
+ renameLevelFilesMods(modifications, toMergeTsFiles, newResource);
+ compactionLogger.close();
+ File logFile =
+ FSFactoryProducer.getFSFactory()
+ .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
+ if (logFile.exists()) {
+ Files.delete(logFile.toPath());
}
+ break;
}
}
} catch (Exception e) {
@@ -736,7 +706,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
restoreCompaction();
logger.error("Error occurred in Compaction Merge thread", e);
} finally {
- isSeqMerging = false;
// reset the merge working state to false
logger.info(
"{} [Compaction] merge end time isSeq = {}, consumption: {} ms",
@@ -749,7 +718,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
List<SortedSet<TsFileResource>> newSequenceTsFileResources = new ArrayList<>();
- for (int i = 0; i < seqLevelNum; i++) {
+ for (int i = 0; i < totalSeqLevelNum; i++) {
newSequenceTsFileResources.add(
new TreeSet<>(
(o1, o2) -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index f9112e1..84c6859 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -253,7 +253,7 @@ public class MergeManager implements IService, MergeManagerMBean {
private void mergeAll() {
try {
StorageEngine.getInstance()
- .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ .mergeAll(IoTDBDescriptor.getInstance().getConfig().isFullMerge());
} catch (StorageEngineException e) {
logger.error("Cannot perform a global merge because", e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d7bc827..050d83e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.reader.resource.CachedUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -91,26 +92,27 @@ public class MergeResource {
this.unseqFiles = unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
}
- public void clear() throws IOException {
- for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
- sequenceReader.close();
- }
- for (RestorableTsFileIOWriter writer : fileWriterCache.values()) {
- writer.close();
- }
-
- fileReaderCache.clear();
- fileWriterCache.clear();
- modificationCache.clear();
- measurementSchemaMap.clear();
- chunkWriterCache.clear();
- }
-
public IMeasurementSchema getSchema(PartialPath path) {
return measurementSchemaMap.get(path);
}
/**
+ * startMerging() is called after selecting files
+ *
+ * do not cache metadata until true candidates are chosen, or too much metadata will be
+ * cached during selection
+ */
+ public void startMerging() {
+ cacheDeviceMeta = true;
+ for (TsFileResource tsFileResource : seqFiles) {
+ tsFileResource.setMerging(true);
+ }
+ for (TsFileResource tsFileResource : unseqFiles) {
+ tsFileResource.setMerging(true);
+ }
+ }
+
+ /**
* Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a SeqFile.
* The path of the merge temp file will be the seqFile's + ".merge".
*
@@ -290,4 +292,20 @@ public class MergeResource {
public Map<String, Pair<Long, Long>> getStartEndTime(TsFileResource tsFileResource) {
return startEndTimeCache.getOrDefault(tsFileResource, new HashMap<>());
}
+
+ @TestOnly
+ public void clear() throws IOException {
+ for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
+ sequenceReader.close();
+ }
+ for (RestorableTsFileIOWriter writer : fileWriterCache.values()) {
+ writer.close();
+ }
+
+ fileReaderCache.clear();
+ fileWriterCache.clear();
+ modificationCache.clear();
+ measurementSchemaMap.clear();
+ chunkWriterCache.clear();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
index 9693d3c..e6f84fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
@@ -78,6 +78,7 @@ public class LogAnalyzer {
private Status status;
+ // MERGE TODO: add two methods: List<String> getSeqTsFiles() List<String> getUnseqTsFiles()
public LogAnalyzer(
MergeResource resource, String taskName, File logFile, String storageGroupName) {
this.resource = resource;
@@ -102,10 +103,11 @@ public class LogAnalyzer {
analyzeUnseqFiles(bufferedReader);
- List<PartialPath> storageGroupPaths =
+ // get all timeseries in this storage group
+ List<PartialPath> timeseriesPaths =
IoTDB.metaManager.getAllTimeseriesPath(new PartialPath(storageGroupName + ".*"));
unmergedPaths = new ArrayList<>();
- unmergedPaths.addAll(storageGroupPaths);
+ unmergedPaths.addAll(timeseriesPaths);
analyzeMergedSeries(bufferedReader, unmergedPaths);
@@ -120,6 +122,7 @@ public class LogAnalyzer {
return;
}
long startTime = System.currentTimeMillis();
+ //
List<TsFileResource> mergeSeqFiles = new ArrayList<>();
while ((currLine = bufferedReader.readLine()) != null) {
if (STR_UNSEQ_FILES.equals(currLine)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java
index 077a58b..63fc04c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java
@@ -29,6 +29,10 @@ import java.util.List;
*/
public interface IMergeFileSelector {
+ /**
+ * @return seqFileList, unseqFileList
+ * @throws MergeException
+ */
List[] select() throws MergeException;
int getConcurrentMergeNum();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 381651c..89f4574 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -51,7 +51,7 @@ import java.util.concurrent.Callable;
* merged chunks in the temp files back to the seqFiles or move the unmerged chunks in the seqFiles
* into temp files and replace the seqFiles with the temp files. 3. remove unseqFiles
*/
-public class MergeTask implements Callable<Void> {
+public class MergeTask {
public static final String MERGE_SUFFIX = ".merge";
private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
@@ -67,7 +67,7 @@ public class MergeTask implements Callable<Void> {
States states = States.START;
MergeMultiChunkTask chunkTask;
MergeFileTask fileTask;
- private MergeCallback callback;
+ protected MergeCallback callback;
MergeTask(
List<TsFileResource> seqFiles,
@@ -90,29 +90,18 @@ public class MergeTask implements Callable<Void> {
MergeResource mergeResource,
String storageGroupSysDir,
MergeCallback callback,
- String taskName,
boolean fullMerge,
int concurrentMergeSeriesNum,
String storageGroupName) {
this.resource = mergeResource;
this.storageGroupSysDir = storageGroupSysDir;
this.callback = callback;
- this.taskName = taskName;
+ this.taskName = storageGroupName;
this.fullMerge = fullMerge;
this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
this.storageGroupName = storageGroupName;
}
- @Override
- public Void call() throws Exception {
- try {
- doMerge();
- } catch (Exception e) {
- logger.error("Runtime exception in merge {}", taskName, e);
- abort();
- }
- return null;
- }
private void abort() throws IOException {
states = States.ABORTED;
@@ -125,101 +114,103 @@ public class MergeTask implements Callable<Void> {
new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME));
}
- private void doMerge() throws IOException, MetadataException {
- if (resource.getSeqFiles().isEmpty()) {
- logger.info("{} no sequence file to merge into, so will abort task.", taskName);
- abort();
- return;
- }
- if (logger.isInfoEnabled()) {
- logger.info(
- "{} starts to merge {} seqFiles, {} unseqFiles",
- taskName,
- resource.getSeqFiles().size(),
- resource.getUnseqFiles().size());
- }
- long startTime = System.currentTimeMillis();
- long totalFileSize =
- MergeUtils.collectFileSizes(resource.getSeqFiles(), resource.getUnseqFiles());
- mergeLogger = new MergeLogger(storageGroupSysDir);
+ public void doMerge() throws IOException {
+ try {
+ if (resource.getSeqFiles().isEmpty()) {
+ logger.info("{} no sequence file to merge into, so will abort task.", taskName);
+ abort();
+ return;
+ }
+ if (logger.isInfoEnabled()) {
+ logger.info(
+ "{} starts to merge {} seqFiles, {} unseqFiles",
+ taskName,
+ resource.getSeqFiles().size(),
+ resource.getUnseqFiles().size());
+ }
+ long startTime = System.currentTimeMillis();
+ long totalFileSize =
+ MergeUtils.collectFileSizes(resource.getSeqFiles(), resource.getUnseqFiles());
+ mergeLogger = new MergeLogger(storageGroupSysDir);
- mergeLogger.logFiles(resource);
+ mergeLogger.logFiles(resource);
- Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
- Map<PartialPath, IMeasurementSchema> measurementSchemaMap = new HashMap<>();
- List<PartialPath> unmergedSeries = new ArrayList<>();
- for (PartialPath device : devices) {
- MNode deviceNode = IoTDB.metaManager.getNodeByPath(device);
- // todo add template merge logic
- for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) {
- PartialPath path = device.concatNode(entry.getKey());
- measurementSchemaMap.put(path, ((MeasurementMNode) entry.getValue()).getSchema());
- unmergedSeries.add(path);
+ Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
+ Map<PartialPath, IMeasurementSchema> measurementSchemaMap = new HashMap<>();
+ List<PartialPath> unmergedSeries = new ArrayList<>();
+ for (PartialPath device : devices) {
+ MNode deviceNode = IoTDB.metaManager.getNodeByPath(device);
+ // todo add template merge logic
+ for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) {
+ PartialPath path = device.concatNode(entry.getKey());
+ measurementSchemaMap.put(path, ((MeasurementMNode) entry.getValue()).getSchema());
+ unmergedSeries.add(path);
+ }
}
- }
- resource.setMeasurementSchemaMap(measurementSchemaMap);
+ resource.setMeasurementSchemaMap(measurementSchemaMap);
- mergeLogger.logMergeStart();
+ mergeLogger.logMergeStart();
- chunkTask =
- new MergeMultiChunkTask(
- mergeContext,
- taskName,
- mergeLogger,
- resource,
- fullMerge,
- unmergedSeries,
- concurrentMergeSeriesNum,
- storageGroupName);
- states = States.MERGE_CHUNKS;
- chunkTask.mergeSeries();
- if (Thread.interrupted()) {
- logger.info("Merge task {} aborted", taskName);
- abort();
- return;
- }
+ chunkTask =
+ new MergeMultiChunkTask(
+ mergeContext,
+ taskName,
+ mergeLogger,
+ resource,
+ fullMerge,
+ unmergedSeries,
+ concurrentMergeSeriesNum,
+ storageGroupName);
+ states = States.MERGE_CHUNKS;
+ chunkTask.mergeSeries();
+ if (Thread.interrupted()) {
+ logger.info("Merge task {} aborted", taskName);
+ abort();
+ return;
+ }
- fileTask =
- new MergeFileTask(taskName, mergeContext, mergeLogger, resource, resource.getSeqFiles());
- states = States.MERGE_FILES;
- chunkTask = null;
- fileTask.mergeFiles();
- if (Thread.interrupted()) {
- logger.info("Merge task {} aborted", taskName);
- abort();
- return;
- }
+ fileTask =
+ new MergeFileTask(taskName, mergeContext, mergeLogger, resource, resource.getSeqFiles());
+ states = States.MERGE_FILES;
+ chunkTask = null;
+ fileTask.mergeFiles();
+ if (Thread.interrupted()) {
+ logger.info("Merge task {} aborted", taskName);
+ abort();
+ return;
+ }
- states = States.CLEAN_UP;
- fileTask = null;
- cleanUp(true);
- if (logger.isInfoEnabled()) {
- double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
- double byteRate = totalFileSize / elapsedTime / 1024 / 1024;
- double seriesRate = unmergedSeries.size() / elapsedTime;
- double chunkRate = mergeContext.getTotalChunkWritten() / elapsedTime;
- double fileRate =
- (resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime;
- double ptRate = mergeContext.getTotalPointWritten() / elapsedTime;
- logger.info(
- "{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
- + "fileRate: {}/s, ptRate: {}/s",
- taskName,
- elapsedTime,
- byteRate,
- seriesRate,
- chunkRate,
- fileRate,
- ptRate);
+ states = States.CLEAN_UP;
+ fileTask = null;
+ cleanUp(true);
+ if (logger.isInfoEnabled()) {
+ double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
+ double byteRate = totalFileSize / elapsedTime / 1024 / 1024;
+ double seriesRate = unmergedSeries.size() / elapsedTime;
+ double chunkRate = mergeContext.getTotalChunkWritten() / elapsedTime;
+ double fileRate =
+ (resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime;
+ double ptRate = mergeContext.getTotalPointWritten() / elapsedTime;
+ logger.info(
+ "{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
+ + "fileRate: {}/s, ptRate: {}/s",
+ taskName,
+ elapsedTime,
+ byteRate,
+ seriesRate,
+ chunkRate,
+ fileRate,
+ ptRate);
+ }
+ } catch (Exception e) {
+ logger.error("Runtime exception in merge {}", taskName, e);
+ abort();
}
}
void cleanUp(boolean executeCallback) throws IOException {
logger.info("{} is cleaning up", taskName);
- resource.clear();
- mergeContext.clear();
-
if (mergeLogger != null) {
mergeLogger.close();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index 32cd897..cb1e39e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -53,6 +53,7 @@ public class RecoverMergeTask extends MergeTask {
private LogAnalyzer analyzer;
+ // MERGE TODO: get seqFiles and unseqFiles to be recovered from merge.log)
public RecoverMergeTask(
List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 26261df..75b2175 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -174,8 +174,8 @@ public class StorageGroupProcessor {
private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
/** time partition id in the storage group -> tsFileProcessor for this time partition */
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
- /** compactionMergeWorking is used to wait for last compaction to be done. */
- private volatile boolean compactionMergeWorking = false;
+ /** isCompactionWorking is used to wait for last compaction to be done. */
+ private volatile boolean isCompactionWorking = false;
// upgrading sequence TsFile resource list
private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
@@ -446,33 +446,37 @@ public class StorageGroupProcessor {
recoverTsFiles(value, false);
}
- String taskName =
+ String unseqMergeTaskName =
logicalStorageGroupName + "-" + virtualStorageGroupId + "-" + System.currentTimeMillis();
File mergingMods =
SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MERGING_MODIFICATION_FILE_NAME);
if (mergingMods.exists()) {
this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath());
}
+
+ // MERGE TODO: only pass the tsfileManagement into RecoverMergeTask
RecoverMergeTask recoverMergeTask =
new RecoverMergeTask(
new ArrayList<>(tsFileManagement.getTsFileList(true)),
tsFileManagement.getTsFileList(false),
storageGroupSysDir.getPath(),
tsFileManagement::mergeEndAction,
- taskName,
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(),
+ unseqMergeTaskName,
+ IoTDBDescriptor.getInstance().getConfig().isFullMerge(),
logicalStorageGroupName + "-" + virtualStorageGroupId);
logger.info(
"{} - {} a RecoverMergeTask {} starts...",
logicalStorageGroupName,
virtualStorageGroupId,
- taskName);
+ unseqMergeTaskName);
recoverMergeTask.recoverMerge(
IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
mergingMods.delete();
}
- recoverCompaction();
+
+ // MERGE TODO: move unseqMergeRecover into TsFileManagement.CompactionRecoverTask()
+ recoverLevelCompaction();
for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
long partitionNum = resource.getTimePartition();
updatePartitionFileVersion(partitionNum, resource.getVersion());
@@ -511,27 +515,25 @@ public class StorageGroupProcessor {
globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
}
- if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
- && seqTsFileResources.size() > 0) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
- executeCompaction(
- timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ executeCompaction(timePartitionId, IoTDBDescriptor.getInstance().getConfig().isFullMerge());
}
}
}
- private void recoverCompaction() {
- if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
- compactionMergeWorking = true;
+ private void recoverLevelCompaction() {
+ if (!CompactionTaskManager.getInstance().isTerminated()) {
+ isCompactionWorking = true;
logger.info(
"{} - {} submit a compaction recover merge task",
logicalStorageGroupName,
virtualStorageGroupId);
try {
- CompactionMergeTaskPoolManager.getInstance()
+ CompactionTaskManager.getInstance()
.submitTask(
logicalStorageGroupName,
- tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
+ tsFileManagement.new LevelCompactionRecoverTask(this::closeCompactionMergeCallBack));
} catch (RejectedExecutionException e) {
this.closeCompactionMergeCallBack(false, 0);
logger.error(
@@ -1971,20 +1973,20 @@ public class StorageGroupProcessor {
executeCompaction(
tsFileProcessor.getTimeRangeId(),
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ IoTDBDescriptor.getInstance().getConfig().isFullMerge());
}
private void executeCompaction(long timePartition, boolean fullMerge) {
- if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
- compactionMergeWorking = true;
+ if (!isCompactionWorking && !CompactionTaskManager.getInstance().isTerminated()) {
+ isCompactionWorking = true;
logger.info(
"{} submit a compaction merge task",
logicalStorageGroupName + "-" + virtualStorageGroupId);
try {
// fork and filter current tsfile, then commit then to compaction merge
tsFileManagement.forkCurrentFileList(timePartition);
- tsFileManagement.setForceFullMerge(fullMerge);
- CompactionMergeTaskPoolManager.getInstance()
+ tsFileManagement.setFullMerge(fullMerge);
+ CompactionTaskManager.getInstance()
.submitTask(
logicalStorageGroupName,
tsFileManagement
@@ -2007,9 +2009,9 @@ public class StorageGroupProcessor {
private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
executeCompaction(
- timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ timePartitionId, IoTDBDescriptor.getInstance().getConfig().isFullMerge());
} else {
- this.compactionMergeWorking = false;
+ this.isCompactionWorking = false;
}
}
@@ -2842,7 +2844,7 @@ public class StorageGroupProcessor {
writeLock();
try {
// abort ongoing comapctions and merges
- CompactionMergeTaskPoolManager.getInstance().abortCompaction(logicalStorageGroupName);
+ CompactionTaskManager.getInstance().abortCompaction(logicalStorageGroupName);
MergeManager.getINSTANCE().abortMerge(logicalStorageGroupName);
// close all working files that should be removed
removePartitions(filter, workSequenceTsFileProcessors.entrySet());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 51a64ac..e4458ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
@@ -108,7 +108,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(TVListAllocator.getInstance());
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(MergeManager.getINSTANCE());
- registerManager.register(CompactionMergeTaskPoolManager.getInstance());
+ registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
registerManager.register(StorageEngine.getInstance());
registerManager.register(TemporaryQueryDataFileService.getInstance());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
index cf7441f..1c0dfeb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
@@ -69,11 +69,10 @@ public class MergeLogTest extends MergeTest {
new MergeResource(seqResources.subList(0, 1), unseqResources.subList(0, 1)),
tempSGDir.getPath(),
this::testCallBack,
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
}
private void testCallBack(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
index 9842f53..2232521 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
@@ -99,20 +99,11 @@ public class MergeManagerTest extends MergeTest {
private String progress = "0";
public FakedMainMergeTask(int serialNum) {
- super(null, null, null, null, false, 0, null);
+ super(null, null, null, false, 0, null);
this.serialNum = serialNum;
}
@Override
- public Void call() {
- while (!Thread.currentThread().isInterrupted()) {
- // wait until interrupt
- }
- progress = "1";
- return null;
- }
-
- @Override
public String getStorageGroupName() {
return "test";
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index 6c70fe7..0684f46 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@ -185,11 +185,10 @@ public class MergeOverLapTest extends MergeTest {
new MergeResource(seqResources, unseqResources),
tempSGDir.getPath(),
(k, v, l) -> {},
- "test",
true,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
QueryContext context = new QueryContext();
PartialPath path =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
index ca816cf..84d4c5f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
@@ -48,8 +48,8 @@ public class MergePerfTest extends MergeTest {
resource.setCacheDeviceMeta(true);
MergeTask mergeTask =
new MergeTask(
- resource, tempSGDir.getPath(), (k, v, l) -> {}, "test", fullMerge, 100, MERGE_TEST_SG);
- mergeTask.call();
+ resource, tempSGDir.getPath(), (k, v, l) -> {}, fullMerge, 100, MERGE_TEST_SG);
+ mergeTask.doMerge();
timeConsumption = System.currentTimeMillis() - timeConsumption;
tearDown();
FileUtils.deleteDirectory(tempSGDir);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index d4f5dfe..5f7c036 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -79,11 +79,10 @@ public class MergeTaskTest extends MergeTest {
new MergeResource(seqResources, unseqResources),
tempSGDir.getPath(),
(k, v, l) -> {},
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
QueryContext context = new QueryContext();
PartialPath path =
@@ -123,11 +122,10 @@ public class MergeTaskTest extends MergeTest {
(k, v, l) -> {
assertEquals(499, k.get(2).getEndTime("root.mergeTest.device1"));
},
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
}
@Test
@@ -177,11 +175,10 @@ public class MergeTaskTest extends MergeTest {
(k, v, l) -> {
assertEquals(49, k.get(0).getEndTime("root.mergeTest.device1"));
},
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
}
@Test
@@ -191,11 +188,10 @@ public class MergeTaskTest extends MergeTest {
new MergeResource(seqResources, unseqResources),
tempSGDir.getPath(),
(k, v, l) -> {},
- "test",
true,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
QueryContext context = new QueryContext();
PartialPath path =
@@ -235,11 +231,10 @@ public class MergeTaskTest extends MergeTest {
new MergeResource(seqResources, unseqResources),
tempSGDir.getPath(),
(k, v, l) -> {},
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
QueryContext context = new QueryContext();
PartialPath path =
@@ -275,11 +270,10 @@ public class MergeTaskTest extends MergeTest {
new MergeResource(seqResources, unseqResources.subList(0, 1)),
tempSGDir.getPath(),
(k, v, l) -> {},
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
QueryContext context = new QueryContext();
PartialPath path =
@@ -319,11 +313,10 @@ public class MergeTaskTest extends MergeTest {
new MergeResource(seqResources, unseqResources.subList(5, 6)),
tempSGDir.getPath(),
(k, v, l) -> {},
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
QueryContext context = new QueryContext();
PartialPath path =
@@ -359,11 +352,10 @@ public class MergeTaskTest extends MergeTest {
new MergeResource(seqResources, unseqResources.subList(0, 5)),
tempSGDir.getPath(),
(k, v, l) -> {},
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
QueryContext context = new QueryContext();
PartialPath path =
@@ -424,11 +416,10 @@ public class MergeTaskTest extends MergeTest {
e.printStackTrace();
}
},
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
QueryContext context = new QueryContext();
PartialPath path =
@@ -474,11 +465,10 @@ public class MergeTaskTest extends MergeTest {
new MergeResource(testSeqResources, testUnseqResource),
tempSGDir.getPath(),
(k, v, l) -> {},
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
QueryContext context = new QueryContext();
PartialPath path =
@@ -540,11 +530,10 @@ public class MergeTaskTest extends MergeTest {
(k, v, l) -> {
assertEquals(99, k.get(0).getEndTime("root.mergeTest.device1"));
},
- "test",
false,
1,
MERGE_TEST_SG);
- mergeTask.call();
+ mergeTask.doMerge();
}
private void prepareFileWithLastSensor(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 3d7e2b6..8ad49c7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -610,7 +610,7 @@ public class StorageGroupProcessorTest {
}
processor.syncCloseAllWorkingTsFileProcessors();
- processor.merge(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ processor.merge(IoTDBDescriptor.getInstance().getConfig().isFullMerge());
while (processor.getTsFileManagement().isUnseqMerging) {
// wait
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index d01b9d9..289ce56 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.integration;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -379,7 +379,7 @@ public class IoTDBRestartIT {
}
try {
- CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish();
+ CompactionTaskManager.getInstance().waitAllCompactionFinish();
Thread.sleep(10000);
EnvironmentUtils.restartDaemon();
} catch (Exception e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index fc29451..9892780 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.TriggerManagementException;
@@ -86,7 +86,7 @@ public class EnvironmentUtils {
public static void cleanEnv() throws IOException, StorageEngineException {
// wait all compaction finished
- CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish();
+ CompactionTaskManager.getInstance().waitAllCompactionFinish();
// deregister all user defined classes
try {