You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/05/10 11:18:26 UTC
[iotdb] branch master updated: Add continuous compaction in level
compaction strategy (#2080)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 819b195 Add continuous compaction in level compaction strategy (#2080)
819b195 is described below
commit 819b19551d34406dade92527bfb36d10501bb6a2
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Mon May 10 19:18:02 2021 +0800
Add continuous compaction in level compaction strategy (#2080)
* add enable unseq compaction
* add continuous compaction in level compaction strategy
* fix compaction bug
* add load configure
* update continuous compaction to new compaction task
* fix comment
* update config and variable name
* fix format
* fix compile
* resolve conversation
Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
.../resources/conf/iotdb-engine.properties | 4 ++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 ++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 13 ++++++
.../db/engine/compaction/TsFileManagement.java | 9 +++-
.../level/LevelCompactionTsFileManagement.java | 51 ++++++++++++----------
.../engine/storagegroup/StorageGroupProcessor.java | 30 ++++++++++---
.../compaction/LevelCompactionCacheTest.java | 3 +-
.../engine/compaction/LevelCompactionLogTest.java | 3 +-
.../compaction/LevelCompactionMergeTest.java | 3 +-
.../compaction/LevelCompactionMoreDataTest.java | 3 +-
.../NoCompactionTsFileManagementTest.java | 4 +-
.../org/apache/iotdb/db/script/EnvScriptIT.java | 6 ++-
12 files changed, 105 insertions(+), 38 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index e7cd8ce..3a7725a 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -374,6 +374,10 @@ timestamp_precision=ms
# enable_unseq_compaction=true
# Works when the compaction_strategy is LEVEL_COMPACTION.
+# Whether to start next compaction task automatically after finish one compaction task
+# enable_continuous_compaction=true
+
+# Works when the compaction_strategy is LEVEL_COMPACTION.
# The max seq file num of each level.
# When the num of files in one level exceeds this,
# the files in this level will merge to one and put to upper level.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a21e783..23a1994 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -322,6 +322,12 @@ public class IoTDBConfig {
private boolean enableUnseqCompaction = true;
/**
+ * Works when the compaction_strategy is LEVEL_COMPACTION. Whether to start next compaction task
+ * automatically after finish one compaction task
+ */
+ private boolean enableContinuousCompaction = true;
+
+ /**
* Works when the compaction_strategy is LEVEL_COMPACTION. The max seq file num of each level.
* When the num of files in one level exceeds this, the files in this level will merge to one and
* put to upper level.
@@ -1474,6 +1480,14 @@ public class IoTDBConfig {
this.enableUnseqCompaction = enableUnseqCompaction;
}
+ public boolean isEnableContinuousCompaction() {
+ return enableContinuousCompaction;
+ }
+
+ public void setEnableContinuousCompaction(boolean enableContinuousCompaction) {
+ this.enableContinuousCompaction = enableContinuousCompaction;
+ }
+
public int getSeqFileNumInEachLevel() {
return seqFileNumInEachLevel;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 988f865..4590e33 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -321,6 +321,12 @@ public class IoTDBDescriptor {
properties.getProperty(
"enable_unseq_compaction", Boolean.toString(conf.isEnableUnseqCompaction()))));
+ conf.setEnableContinuousCompaction(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_continuous_compaction",
+ Boolean.toString(conf.isEnableContinuousCompaction()))));
+
conf.setSeqLevelNum(
Integer.parseInt(
properties.getProperty("seq_level_num", Integer.toString(conf.getSeqLevelNum()))));
@@ -1029,6 +1035,13 @@ public class IoTDBDescriptor {
// update slow_query_threshold
conf.setSlowQueryThreshold(Long.parseLong(properties.getProperty("slow_query_threshold")));
+ // update enable_continuous_compaction
+ conf.setEnableContinuousCompaction(
+ Boolean.parseBoolean(properties.getProperty("enable_continuous_compaction")));
+
+ // update merge_write_throughput_mb_per_sec
+ conf.setMergeWriteThroughputMbPerSec(
+ Integer.parseInt(properties.getProperty("merge_write_throughput_mb_per_sec")));
} catch (Exception e) {
throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 3b68e37..a5ea1da 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -72,6 +72,9 @@ public abstract class TsFileManagement {
private long mergeStartTime;
+ /** whether execute merge chunk in this task */
+ protected boolean isMergeExecutedInCurrentTask = false;
+
protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
public TsFileManagement(String storageGroupName, String storageGroupDir) {
@@ -166,7 +169,7 @@ public abstract class TsFileManagement {
@Override
public Void call() {
merge(timePartitionId);
- closeCompactionMergeCallBack.call();
+ closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId);
return null;
}
}
@@ -182,7 +185,9 @@ public abstract class TsFileManagement {
@Override
public Void call() {
recover();
- closeCompactionMergeCallBack.call();
+ // in recover logic, we do not have to start next compaction task, and in this case the param
+ // time partition is useless, we can just pass 0L
+ closeCompactionMergeCallBack.call(false, 0L);
return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 61f928d..6bac52a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -564,25 +564,20 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
forkTsFileList(
forkedSequenceTsFileResources,
sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
- seqLevelNum,
- seqFileNumInEachLevel);
+ seqLevelNum);
// we have to copy all unseq file
forkTsFileList(
forkedUnSequenceTsFileResources,
unSequenceTsFileResources.computeIfAbsent(
timePartition, this::newUnSequenceTsFileResources),
- unseqLevelNum + 1,
- unseqFileNumInEachLevel);
+ unseqLevelNum + 1);
} finally {
readUnLock();
}
}
private void forkTsFileList(
- List<List<TsFileResource>> forkedTsFileResources,
- List rawTsFileResources,
- int currMaxLevel,
- int currFileNumInEachLevel) {
+ List<List<TsFileResource>> forkedTsFileResources, List rawTsFileResources, int currMaxLevel) {
forkedTsFileResources.clear();
for (int i = 0; i < currMaxLevel - 1; i++) {
List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
@@ -591,9 +586,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
for (TsFileResource tsFileResource : levelRawTsFileResources) {
if (tsFileResource.isClosed()) {
forkedLevelTsFileResources.add(tsFileResource);
- if (forkedLevelTsFileResources.size() > currFileNumInEachLevel) {
- break;
- }
}
}
forkedTsFileResources.add(forkedLevelTsFileResources);
@@ -602,25 +594,31 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
@Override
protected void merge(long timePartition) {
- merge(forkedSequenceTsFileResources, true, timePartition, seqLevelNum, seqFileNumInEachLevel);
- if (enableUnseqCompaction && unseqLevelNum <= 1 && forkedUnSequenceTsFileResources.size() > 0) {
+ isMergeExecutedInCurrentTask =
+ merge(
+ forkedSequenceTsFileResources, true, timePartition, seqLevelNum, seqFileNumInEachLevel);
+ if (enableUnseqCompaction
+ && unseqLevelNum <= 1
+ && forkedUnSequenceTsFileResources.get(0).size() > 0) {
+ isMergeExecutedInCurrentTask = true;
merge(
isForceFullMerge,
getTsFileListByTimePartition(true, timePartition),
forkedUnSequenceTsFileResources.get(0),
Long.MAX_VALUE);
} else {
- merge(
- forkedUnSequenceTsFileResources,
- false,
- timePartition,
- unseqLevelNum,
- unseqFileNumInEachLevel);
+ isMergeExecutedInCurrentTask =
+ merge(
+ forkedUnSequenceTsFileResources,
+ false,
+ timePartition,
+ unseqLevelNum,
+ unseqFileNumInEachLevel);
}
}
@SuppressWarnings("squid:S3776")
- private void merge(
+ private boolean merge(
List<List<TsFileResource>> mergeResources,
boolean sequence,
long timePartition,
@@ -633,16 +631,21 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
} catch (InterruptedException e) {
logger.error("{} [Compaction] shutdown", storageGroupName, e);
Thread.currentThread().interrupt();
- return;
+ return false;
}
}
isSeqMerging = true;
long startTimeMillis = System.currentTimeMillis();
+ // whether execute merge chunk in the loop below
+ boolean isMergeExecutedInCurrentTask = false;
CompactionLogger compactionLogger = null;
try {
logger.info("{} start to filter compaction condition", storageGroupName);
for (int i = 0; i < currMaxLevel - 1; i++) {
- if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
+ List<TsFileResource> currLevelTsFileResource = mergeResources.get(i);
+ if (currMaxFileNumInEachLevel <= currLevelTsFileResource.size()) {
+ // just merge part of the file
+ isMergeExecutedInCurrentTask = true;
// level is numbered from 0
if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) {
// do not merge current unseq file level to upper level and just merge all of them to
@@ -663,7 +666,8 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile());
compactionLogger.logSequence(sequence);
compactionLogger.logFile(TARGET_NAME, newLevelFile);
- List<TsFileResource> toMergeTsFiles = mergeResources.get(i);
+ List<TsFileResource> toMergeTsFiles =
+ mergeResources.get(i).subList(0, currMaxFileNumInEachLevel);
logger.info(
"{} [Compaction] merge level-{}'s {} TsFiles to next level",
storageGroupName,
@@ -740,6 +744,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
sequence,
System.currentTimeMillis() - startTimeMillis);
}
+ return isMergeExecutedInCurrentTask;
}
private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 3cea096..2e0ac5c 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -494,7 +494,8 @@ public class StorageGroupProcessor {
throw new StorageGroupProcessorException(e);
}
- for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
+ List<TsFileResource> seqTsFileResources = tsFileManagement.getTsFileList(true);
+ for (TsFileResource resource : seqTsFileResources) {
long timePartitionId = resource.getTimePartition();
Map<String, Long> endTimeMap = new HashMap<>();
for (String deviceId : resource.getDevices()) {
@@ -509,20 +510,30 @@ public class StorageGroupProcessor {
.putAll(endTimeMap);
globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
}
+
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
+ && seqTsFileResources.size() > 0) {
+ for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
+ executeCompaction(
+ timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ }
+ }
}
private void recoverCompaction() {
if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
compactionMergeWorking = true;
logger.info(
- "{} - {} submit a compaction merge task", logicalStorageGroupName, virtualStorageGroupId);
+ "{} - {} submit a compaction recover merge task",
+ logicalStorageGroupName,
+ virtualStorageGroupId);
try {
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
logicalStorageGroupName,
tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
} catch (RejectedExecutionException e) {
- this.closeCompactionMergeCallBack();
+ this.closeCompactionMergeCallBack(false, 0);
logger.error(
"{} - {} compaction submit task failed",
logicalStorageGroupName,
@@ -1979,7 +1990,7 @@ public class StorageGroupProcessor {
tsFileManagement
.new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
} catch (IOException | RejectedExecutionException e) {
- this.closeCompactionMergeCallBack();
+ this.closeCompactionMergeCallBack(false, timePartition);
logger.error(
"{} compaction submit task failed",
logicalStorageGroupName + "-" + virtualStorageGroupId,
@@ -1993,8 +2004,13 @@ public class StorageGroupProcessor {
}
/** close compaction merge callback, to release some locks */
- private void closeCompactionMergeCallBack() {
- this.compactionMergeWorking = false;
+ private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
+ if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+ executeCompaction(
+ timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ } else {
+ this.compactionMergeWorking = false;
+ }
}
/**
@@ -2961,7 +2977,7 @@ public class StorageGroupProcessor {
@FunctionalInterface
public interface CloseCompactionMergeCallBack {
- void call();
+ void call(boolean isMergeExecutedInCurrentTask, long timePartitionId);
}
@FunctionalInterface
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
index 46c2771..d632cb9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
@@ -120,7 +120,8 @@ public class LevelCompactionCacheTest extends LevelCompactionTest {
}
/** close compaction merge callback, to release some locks */
- private void closeCompactionMergeCallBack() {
+ private void closeCompactionMergeCallBack(
+ boolean isMergeExecutedInCurrentTask, long timePartitionId) {
this.compactionMergeWorking = false;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
index d6d9c99..2170fbd 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
@@ -80,7 +80,8 @@ public class LevelCompactionLogTest extends LevelCompactionTest {
}
/** close compaction merge callback, to release some locks */
- private void closeCompactionMergeCallBack() {
+ private void closeCompactionMergeCallBack(
+ boolean isMergeExecutedInCurrentTask, long timePartitionId) {
this.compactionMergeWorking = false;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 3d9aecd..9b3984b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -226,7 +226,8 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
}
/** close compaction merge callback, to release some locks */
- private void closeCompactionMergeCallBack() {
+ private void closeCompactionMergeCallBack(
+ boolean isMergeExecutedInCurrentTask, long timePartitionId) {
this.compactionMergeWorking = false;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
index 1411eac..75e576b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
@@ -232,7 +232,8 @@ public class LevelCompactionMoreDataTest extends LevelCompactionTest {
}
/** close compaction merge callback, to release some locks */
- private void closeCompactionMergeCallBack() {
+ private void closeCompactionMergeCallBack(
+ boolean isMergeExecutedInCurrentTask, long timePartitionId) {
this.compactionMergeWorking = false;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
index 6407c9e..8ad9f18 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
@@ -153,7 +153,9 @@ public class NoCompactionTsFileManagementTest extends LevelCompactionTest {
noCompactionTsFileManagement.forkCurrentFileList(0);
noCompactionTsFileManagement.recover();
CompactionMergeTask compactionMergeTask =
- noCompactionTsFileManagement.new CompactionMergeTask(() -> {}, 0);
+ noCompactionTsFileManagement
+ .new CompactionMergeTask(
+ (boolean isMergeExecutedInCurrentTask, long timePartitionId) -> {}, 0);
compactionMergeTask.call();
assertEquals(1, noCompactionTsFileManagement.size(true));
assertEquals(1, noCompactionTsFileManagement.size(false));
diff --git a/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java b/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
index ef8961a..4051f06 100644
--- a/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
@@ -22,7 +22,11 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;