You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/19 03:36:27 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1][IOTDB-5776]Update memory estimation of cross space compaction (#9628)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 21338fe774 [To rel/1.1][IOTDB-5776]Update memory estimation of cross space compaction (#9628)
21338fe774 is described below
commit 21338fe7741abbf673c5f6652e69510d58614ad9
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Wed Apr 19 11:36:21 2023 +0800
[To rel/1.1][IOTDB-5776]Update memory estimation of cross space compaction (#9628)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++
.../estimator/AbstractCompactionEstimator.java | 6 ++
.../ReadPointCrossCompactionEstimator.java | 108 +++++++++++++--------
.../impl/RewriteCrossSpaceCompactionSelector.java | 7 +-
.../utils/CrossCompactionTaskResource.java | 12 +++
.../org/apache/iotdb/db/rescon/SystemInfo.java | 15 +--
.../cross/RewriteCompactionFileSelectorTest.java | 1 +
8 files changed, 118 insertions(+), 52 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5650b54638..73af082000 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -428,6 +428,13 @@ public class IoTDBConfig {
*/
private CompactionPriority compactionPriority = CompactionPriority.BALANCE;
+ /**
+ * Enable compaction memory control or not. If true and estimated memory size of one compaction
+ * task exceeds the threshold, system will block the compaction. It only works for cross space
+ * compaction currently.
+ */
+ private boolean enableCompactionMemControl = true;
+
private double chunkMetadataSizeProportion = 0.1;
/** The target tsfile size in compaction, 1 GB by default */
@@ -2734,6 +2741,14 @@ public class IoTDBConfig {
this.compactionPriority = compactionPriority;
}
+ public boolean isEnableCompactionMemControl() {
+ return enableCompactionMemControl;
+ }
+
+ public void setEnableCompactionMemControl(boolean enableCompactionMemControl) {
+ this.enableCompactionMemControl = enableCompactionMemControl;
+ }
+
public long getTargetCompactionFileSize() {
return targetCompactionFileSize;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5b99924801..c45cfd7ee6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -500,6 +500,12 @@ public class IoTDBDescriptor {
properties.getProperty(
"compaction_priority", conf.getCompactionPriority().toString())));
+ conf.setEnableCompactionMemControl(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_compaction_mem_control",
+ Boolean.toString(conf.isEnableCompactionMemControl()))));
+
int subtaskNum =
Integer.parseInt(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java
index f4b085deb9..50be5b5872 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.engine.compaction.selector.estimator;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -34,6 +36,10 @@ public abstract class AbstractCompactionEstimator {
protected Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
+ protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ protected int compressionRatio = 5;
+
/**
* Estimate the memory cost of compacting the unseq file and its corresponding overlapped seq
* files in cross space compaction task.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java
index d50e5a3af2..2ba25aab69 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java
@@ -23,13 +23,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -41,13 +39,10 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
// task
private long maxCostOfReadingSeqFile;
- // left is the max chunk num in chunkgroup of unseq file, right is the total chunk num of unseq
- // file.
- private Pair<Integer, Integer> maxUnseqChunkNumInDevice;
+ // the max cost of writing target file
+ private long maxCostOfWritingTargetFile;
- // it stores all chunk info of seq files. Left is the max chunk num in chunkgroup of seq file,
- // right is the total chunk num of seq file.
- private final List<Pair<Integer, Integer>> maxSeqChunkNumInDeviceList;
+ private int maxConcurrentSeriesNum = 1;
// the number of timeseries being compacted at the same time
private final int subCompactionTaskNum =
@@ -55,7 +50,7 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
public ReadPointCrossCompactionEstimator() {
this.maxCostOfReadingSeqFile = 0;
- this.maxSeqChunkNumInDeviceList = new ArrayList<>();
+ this.maxCostOfWritingTargetFile = 0;
}
@Override
@@ -65,7 +60,6 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
cost += calculateReadingUnseqFile(unseqResource);
cost += calculateReadingSeqFiles(seqResources);
cost += calculatingWritingTargetFiles(seqResources, unseqResource);
- maxSeqChunkNumInDeviceList.clear();
return cost;
}
@@ -75,25 +69,28 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
*/
private long calculateReadingUnseqFile(TsFileResource unseqResource) throws IOException {
TsFileSequenceReader reader = getFileReader(unseqResource);
- int[] fileInfo = getSeriesAndDeviceChunkNum(reader);
+ FileInfo fileInfo = getSeriesAndDeviceChunkNum(reader);
// it is max aligned series num of one device when tsfile contains aligned series,
// else is sub compaction task num.
- int concurrentSeriesNum = fileInfo[2] == -1 ? subCompactionTaskNum : fileInfo[2];
- maxUnseqChunkNumInDevice = new Pair<>(fileInfo[3], fileInfo[0]);
- // it means the max size of a timeseries in this file when reading all of its chunk into memory.
- // Not only reading chunk into chunk cache, but also need to deserialize data point into merge
- // reader, so we have to double the cost here.
- if (fileInfo[0] == 0) { // If totalChunkNum ==0, i.e. this unSeq tsFile has no chunk.
+ int concurrentSeriesNum =
+ fileInfo.maxAlignedSeriesNumInDevice == -1
+ ? subCompactionTaskNum
+ : fileInfo.maxAlignedSeriesNumInDevice;
+ maxConcurrentSeriesNum = Math.max(maxConcurrentSeriesNum, concurrentSeriesNum);
+ if (fileInfo.totalChunkNum == 0) { // If totalChunkNum ==0, i.e. this unSeq tsFile has no chunk.
logger.warn(
"calculateReadingUnseqFile(), find 1 empty unSeq tsFile: {}.",
unseqResource.getTsFilePath());
return 0;
}
- return 2 * concurrentSeriesNum * (unseqResource.getTsFileSize() * fileInfo[1] / fileInfo[0]);
+ // it means the max size of a timeseries in this file when reading all of its chunk into memory.
+ return compressionRatio
+ * concurrentSeriesNum
+ * (unseqResource.getTsFileSize() * fileInfo.maxSeriesChunkNum / fileInfo.totalChunkNum);
}
/**
- * Calculate memory cost of reading source seq files in the cross space compaction. Double the
+ * Calculate memory cost of reading source seq files in the cross space compaction. Select the
* maximun size of the timeseries to be compacted at the same time in one seq file, because only
* one seq file will be queried at the same time.
*/
@@ -101,38 +98,41 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
long cost = 0;
for (TsFileResource seqResource : seqResources) {
TsFileSequenceReader reader = getFileReader(seqResource);
- int[] fileInfo = getSeriesAndDeviceChunkNum(reader);
+ FileInfo fileInfo = getSeriesAndDeviceChunkNum(reader);
// it is max aligned series num of one device when tsfile contains aligned series,
// else is sub compaction task num.
- int concurrentSeriesNum = fileInfo[2] == -1 ? subCompactionTaskNum : fileInfo[2];
- long seqFileCost = 0;
- if (fileInfo[0] == 0) { // If totalChunkNum ==0, i.e. this seq tsFile has no chunk.
+ int concurrentSeriesNum =
+ fileInfo.maxAlignedSeriesNumInDevice == -1
+ ? subCompactionTaskNum
+ : fileInfo.maxAlignedSeriesNumInDevice;
+ maxConcurrentSeriesNum = Math.max(maxConcurrentSeriesNum, concurrentSeriesNum);
+ long seqFileCost;
+ if (fileInfo.totalChunkNum == 0) { // If totalChunkNum ==0, i.e. this seq tsFile has no chunk.
logger.warn(
"calculateReadingSeqFiles(), find 1 empty seq tsFile: {}.",
seqResource.getTsFilePath());
seqFileCost = 0;
} else {
- seqFileCost =
- concurrentSeriesNum * (seqResource.getTsFileSize() * fileInfo[1] / fileInfo[0]);
+ // We need to multiply the compression ratio here.
+ seqFileCost = compressionRatio * concurrentSeriesNum * config.getTargetChunkSize();
}
if (seqFileCost > maxCostOfReadingSeqFile) {
// Only one seq file will be read at the same time.
// not only reading chunk into chunk cache, but also need to deserialize data point into
- // merge reader, so we have to double the cost here.
- cost -= 2 * maxCostOfReadingSeqFile;
- cost += 2 * seqFileCost;
+ // merge reader. We have to add the cost in merge reader here and the cost of chunk cache is
+ // unnecessary.
+ cost -= maxCostOfReadingSeqFile;
+ cost += seqFileCost;
maxCostOfReadingSeqFile = seqFileCost;
}
- maxSeqChunkNumInDeviceList.add(new Pair<>(fileInfo[3], fileInfo[0]));
}
return cost;
}
/**
* Calculate memory cost of writing target files in the cross space compaction. Including metadata
- * size of all seq files, max chunk group size of each seq file and max chunk group size of
- * corresponding overlapped unseq file.
+ * size of all source files and size of concurrent target chunks.
*/
private long calculatingWritingTargetFiles(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException {
@@ -141,17 +141,16 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
TsFileSequenceReader reader = getFileReader(seqResource);
// add seq file metadata size
cost += reader.getFileMetadataSize();
- // add max chunk group size of this seq tsfile
- int totalSeqChunkNum = maxSeqChunkNumInDeviceList.get(0).right;
- if (totalSeqChunkNum > 0) {
- cost +=
- seqResource.getTsFileSize() * maxSeqChunkNumInDeviceList.get(0).left / totalSeqChunkNum;
- }
}
- // add max chunk group size of overlapped unseq tsfile
- int totalUnSeqChunkNum = maxUnseqChunkNumInDevice.right;
- if (totalUnSeqChunkNum > 0) {
- cost += unseqResource.getTsFileSize() * maxUnseqChunkNumInDevice.left / totalUnSeqChunkNum;
+ // add unseq file metadata size
+ cost += getFileReader(unseqResource).getFileMetadataSize();
+
+ // concurrent series chunk size
+ long writingTargetCost = maxConcurrentSeriesNum * config.getTargetChunkSize();
+ if (writingTargetCost > maxCostOfWritingTargetFile) {
+ cost -= maxCostOfWritingTargetFile;
+ cost += writingTargetCost;
+ maxCostOfWritingTargetFile = writingTargetCost;
}
return cost;
@@ -169,7 +168,7 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
*
* <p>max chunk num of one device in this tsfile
*/
- private int[] getSeriesAndDeviceChunkNum(TsFileSequenceReader reader) throws IOException {
+ private FileInfo getSeriesAndDeviceChunkNum(TsFileSequenceReader reader) throws IOException {
int totalChunkNum = 0;
int maxChunkNum = 0;
int maxAlignedSeriesNumInDevice = -1;
@@ -190,6 +189,29 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
}
maxDeviceChunkNum = Math.max(maxDeviceChunkNum, deviceChunkNum);
}
- return new int[] {totalChunkNum, maxChunkNum, maxAlignedSeriesNumInDevice, maxDeviceChunkNum};
+ return new FileInfo(totalChunkNum, maxChunkNum, maxAlignedSeriesNumInDevice, maxDeviceChunkNum);
+ }
+
+ private class FileInfo {
+ // total chunk num in this tsfile
+ public int totalChunkNum = 0;
+ // max chunk num of one timeseries in this tsfile
+ public int maxSeriesChunkNum = 0;
+ // max aligned series num in one device. If there is no aligned series in this file, then it
+ // turns to be -1.
+ public int maxAlignedSeriesNumInDevice = -1;
+ // max chunk num of one device in this tsfile
+ public int maxDeviceChunkNum = 0;
+
+ public FileInfo(
+ int totalChunkNum,
+ int maxSeriesChunkNum,
+ int maxAlignedSeriesNumInDevice,
+ int maxDeviceChunkNum) {
+ this.totalChunkNum = totalChunkNum;
+ this.maxSeriesChunkNum = maxSeriesChunkNum;
+ this.maxAlignedSeriesNumInDevice = maxAlignedSeriesNumInDevice;
+ this.maxDeviceChunkNum = maxDeviceChunkNum;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index f96f52ac64..9f63c7f108 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -279,7 +279,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
return Collections.emptyList();
}
LOGGER.info(
- "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Selected source files: {} seqFiles, {} unseqFiles, total memory cost {}, time consumption {}ms.",
+ "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Selected source files: {} seqFiles, {} unseqFiles, total memory cost {} MB, total selected file size is {} MB, total selected seq file size is {} MB, total selected unseq file size is {} MB, time consumption {}ms.",
logicalStorageGroupName + "-" + dataRegionId,
sequenceFileList.size(),
unsequenceFileList.size(),
@@ -287,7 +287,10 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
candidate.getUnseqFiles().size(),
taskResources.getSeqFiles().size(),
taskResources.getUnseqFiles().size(),
- taskResources.getTotalMemoryCost(),
+ (float) (taskResources.getTotalMemoryCost()) / 1024 / 1024,
+ (float) (taskResources.getTotalFileSize()) / 1024 / 1024,
+ taskResources.getTotalSeqFileSize() / 1024 / 1024,
+ taskResources.getTotalUnseqFileSize() / 1024 / 1024,
System.currentTimeMillis() - startTime);
hasPrintedLog = false;
return Collections.singletonList(taskResources);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossCompactionTaskResource.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossCompactionTaskResource.java
index 3a7e0afef5..1c9ae6f3d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossCompactionTaskResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossCompactionTaskResource.java
@@ -32,6 +32,8 @@ public class CrossCompactionTaskResource {
private long totalMemoryCost;
private long totalFileSize;
+ private float totalSeqFileSize;
+ private float totalUnseqFileSize;
private long totalFileNums;
public CrossCompactionTaskResource() {
@@ -72,6 +74,7 @@ public class CrossCompactionTaskResource {
private void addUnseqFile(TsFileResource file) {
unseqFiles.add(file);
+ totalUnseqFileSize += file.getTsFileSize();
countStatistic(file);
}
@@ -81,6 +84,7 @@ public class CrossCompactionTaskResource {
private void addSeqFile(TsFileResource file) {
seqFiles.add(file);
+ totalSeqFileSize += file.getTsFileSize();
countStatistic(file);
}
@@ -105,6 +109,14 @@ public class CrossCompactionTaskResource {
return totalFileSize;
}
+ public float getTotalSeqFileSize() {
+ return totalSeqFileSize;
+ }
+
+ public float getTotalUnseqFileSize() {
+ return totalUnseqFileSize;
+ }
+
public long getTotalFileNums() {
return totalFileNums;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index cad4629046..d85531a509 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -182,13 +182,14 @@ public class SystemInfo {
}
public void addCompactionMemoryCost(long memoryCost) throws InterruptedException {
- if (config.isEnableMemControl()) {
- long originSize = this.compactionMemoryCost.get();
- while (originSize + memoryCost > memorySizeForCompaction
- || !compactionMemoryCost.compareAndSet(originSize, originSize + memoryCost)) {
- Thread.sleep(100);
- originSize = this.compactionMemoryCost.get();
- }
+ if (!config.isEnableCompactionMemControl()) {
+ return;
+ }
+ long originSize = this.compactionMemoryCost.get();
+ while (originSize + memoryCost > memorySizeForCompaction
+ || !compactionMemoryCost.compareAndSet(originSize, originSize + memoryCost)) {
+ Thread.sleep(100);
+ originSize = this.compactionMemoryCost.get();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
index 11bb18a7de..f94c78e0a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
@@ -67,6 +67,7 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
public void setUp() throws IOException, MetadataException, WriteProcessException {
super.setUp();
IoTDBDescriptor.getInstance().getConfig().setMinCrossCompactionUnseqFileLevel(0);
+ IoTDBDescriptor.getInstance().getConfig().setCompactionThreadCount(1);
}
@After