You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/12/16 07:48:45 UTC
[iotdb] 01/01: limit the io rate of compaction
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-5209-rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 08cb0e77809754a2d73cf031ac12fd70356537fe
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Fri Dec 16 15:48:24 2022 +0800
limit the io rate of compaction
---
docs/UserGuide/Reference/Config-Manual.md | 16 ++++++++--------
docs/zh/UserGuide/Reference/Config-Manual.md | 16 ++++++++--------
.../src/assembly/resources/conf/iotdb-engine.properties | 4 ++--
.../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++++++------
.../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 ++++----
.../db/engine/compaction/CompactionTaskManager.java | 13 ++++++-------
.../iotdb/db/engine/compaction/CompactionUtils.java | 3 +++
.../compaction/cross/rewrite/task/SubCompactionTask.java | 2 ++
.../inner/utils/AlignedSeriesCompactionExecutor.java | 9 ++++-----
.../inner/utils/SingleSeriesCompactionExecutor.java | 12 ++++++------
.../compaction/writer/AbstractCompactionWriter.java | 3 +--
.../compaction/utils/CompactionConfigRestorer.java | 2 +-
12 files changed, 51 insertions(+), 49 deletions(-)
diff --git a/docs/UserGuide/Reference/Config-Manual.md b/docs/UserGuide/Reference/Config-Manual.md
index 5596cb2098..0694d40d31 100644
--- a/docs/UserGuide/Reference/Config-Manual.md
+++ b/docs/UserGuide/Reference/Config-Manual.md
@@ -913,14 +913,14 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
|Default| 60000 |
|Effective|After restart system|
-* compaction\_write\_throughput\_mb\_per\_sec
-
-|Name| compaction\_write\_throughput\_mb\_per\_sec |
-|:---:|:---|
-|Description| The write rate of all compaction tasks in MB/s |
-|Type| Int32 |
-|Default| 16 |
-|Effective|After restart system|
+* compaction\_io\_rate\_per\_sec
+
+|Name| compaction\_io\_rate\_per\_sec |
+|:---:|:--------------------------------------------------|
+|Description| The io rate of all compaction tasks in one second |
+|Type| Int32 |
+|Default| 50 |
+|Effective| After restart system |
### Insertion
diff --git a/docs/zh/UserGuide/Reference/Config-Manual.md b/docs/zh/UserGuide/Reference/Config-Manual.md
index 33f71497b7..5fa2653b8b 100644
--- a/docs/zh/UserGuide/Reference/Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Config-Manual.md
@@ -952,14 +952,14 @@ Server,客户端的使用方式详见 [SQL 命令行终端(CLI)](https://i
|默认值| 60000 |
|改后生效方式|重启服务生效|
-* compaction\_write\_throughput\_mb\_per\_sec
-
-|名字| compaction\_write\_throughput\_mb\_per\_sec |
-|:---:|:---|
-|描述| 每秒可达到的写入吞吐量合并限制。|
-|类型| Int32 |
-|默认值| 16 |
-|改后生效方式| 重启服务生效|
+* compaction\_io\_rate\_per\_sec
+
+|名字| compaction\_io\_rate\_per\_sec |
+|:---:|:-------------------------------|
+|描述| 合并每秒 IO 的次数 |
+|类型| Int32 |
+|默认值| 50 |
+|改后生效方式| 重启服务生效 |
#### 插入配置
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 6e44b96b59..29206eefe3 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -509,9 +509,9 @@ timestamp_precision=ms
# Datatype: long, Unit: ms
# compaction_submission_interval_in_ms=60000
-# The limit of write throughput merge can reach per second
+# The limit of io rate of compaction can reach per second
# Datatype: int
-# compaction_write_throughput_mb_per_sec=16
+# compaction_io_rate_per_sec=50
# The maximum session idle time. unit: ms
# Idle sessions are the ones that performs neither query or non-query operations for a period of time
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b6b9efa343..ad318a2567 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -597,8 +597,8 @@ public class IoTDBConfig {
*/
private long mergeIntervalSec = 0L;
- /** The limit of compaction merge can reach per second */
- private int compactionWriteThroughputMbPerSec = 16;
+ /** The limit of compaction io times can reach per second */
+ private int compactionIORatePerSec = 50;
/**
* How many thread will be set up to perform compaction, 10 by default. Set to 1 when less than or
@@ -1707,12 +1707,12 @@ public class IoTDBConfig {
insertMultiTabletEnableMultithreadingColumnThreshold;
}
- public int getCompactionWriteThroughputMbPerSec() {
- return compactionWriteThroughputMbPerSec;
+ public int getCompactionIORatePerSec() {
+ return compactionIORatePerSec;
}
- public void setCompactionWriteThroughputMbPerSec(int compactionWriteThroughputMbPerSec) {
- this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
+ public void setCompactionIORatePerSec(int compactionIORatePerSec) {
+ this.compactionIORatePerSec = compactionIORatePerSec;
}
public boolean isEnableMemControl() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d12c871780..29f8e466b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -587,11 +587,11 @@ public class IoTDBDescriptor {
"max_cross_compaction_candidate_file_size",
Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
- conf.setCompactionWriteThroughputMbPerSec(
+ conf.setCompactionIORatePerSec(
Integer.parseInt(
properties.getProperty(
"compaction_write_throughput_mb_per_sec",
- Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+ Integer.toString(conf.getCompactionIORatePerSec()))));
conf.setEnablePartialInsert(
Boolean.parseBoolean(
@@ -1343,11 +1343,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"slow_query_threshold", Long.toString(conf.getSlowQueryThreshold()))));
// update merge_write_throughput_mb_per_sec
- conf.setCompactionWriteThroughputMbPerSec(
+ conf.setCompactionIORatePerSec(
Integer.parseInt(
properties.getProperty(
"merge_write_throughput_mb_per_sec",
- Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+ Integer.toString(conf.getCompactionIORatePerSec()))));
// update insert-tablet-plan's row limit for select-into
conf.setSelectIntoInsertTabletPlanRowLimit(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index ddac1512ec..7fdd197fc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -83,7 +83,7 @@ public class CompactionTaskManager implements IService {
private final long TASK_SUBMIT_INTERVAL =
IoTDBDescriptor.getInstance().getConfig().getCompactionSubmissionIntervalInMs();
- private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
+ private final RateLimiter compactionIORateLimiter = RateLimiter.create(Double.MAX_VALUE);
public static CompactionTaskManager getInstance() {
return INSTANCE;
@@ -274,10 +274,9 @@ public class CompactionTaskManager implements IService {
}
}
- public RateLimiter getMergeWriteRateLimiter() {
- setWriteMergeRate(
- IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
- return mergeWriteRateLimiter;
+ public RateLimiter getCompactionIORateLimiter() {
+ setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getCompactionIORatePerSec());
+ return compactionIORateLimiter;
}
private void setWriteMergeRate(final double throughoutMbPerSec) {
@@ -286,8 +285,8 @@ public class CompactionTaskManager implements IService {
if (throughout == 0) {
throughout = Double.MAX_VALUE;
}
- if (mergeWriteRateLimiter.getRate() != throughout) {
- mergeWriteRateLimiter.setRate(throughout);
+ if (compactionIORateLimiter.getRate() != throughout) {
+ compactionIORateLimiter.setRate(throughout);
}
}
/** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index dc2dfa92cb..5707fed2c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -150,6 +150,9 @@ public class CompactionUtils {
compactionWriter.startChunkGroup(device, true);
compactionWriter.startMeasurement(measurementSchemas, 0);
writeWithReader(compactionWriter, dataBatchReader, 0);
+ CompactionTaskManager.getInstance()
+ .getCompactionIORateLimiter()
+ .acquire(measurementSchemas.size() + 1);
compactionWriter.endMeasurement(0);
compactionWriter.endChunkGroup();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
index 00e7eaa011..f75e934b87 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -91,6 +92,7 @@ public class SubCompactionTask implements Callable<Void> {
if (dataBatchReader.hasNextBatch()) {
compactionWriter.startMeasurement(measurementSchemas, taskId);
CompactionUtils.writeWithReader(compactionWriter, dataBatchReader, taskId);
+ CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
compactionWriter.endMeasurement(taskId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index fd1ec218b2..990b9815c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -58,7 +58,7 @@ public class AlignedSeriesCompactionExecutor {
private final List<IMeasurementSchema> schemaList;
private long remainingPointInChunkWriter = 0L;
private final RateLimiter rateLimiter =
- CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
+ CompactionTaskManager.getInstance().getCompactionIORateLimiter();
private final long chunkSizeThreshold =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -120,6 +120,7 @@ public class AlignedSeriesCompactionExecutor {
TsFileAlignedSeriesReaderIterator readerIterator =
new TsFileAlignedSeriesReaderIterator(reader, alignedChunkMetadataList, schemaList);
while (readerIterator.hasNext()) {
+ rateLimiter.acquire(schemaList.size() + 1);
Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader();
CompactionMetricsManager.recordReadInfo(chunkReaderAndChunkSize.right);
compactOneAlignedChunk(chunkReaderAndChunkSize.left);
@@ -127,8 +128,7 @@ public class AlignedSeriesCompactionExecutor {
}
if (remainingPointInChunkWriter != 0L) {
- CompactionTaskManager.mergeRateLimiterAcquire(
- rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
+ rateLimiter.acquire(schemaList.size() + 1);
CompactionMetricsManager.recordWriteInfo(
CompactionType.INNER_SEQ_COMPACTION,
ProcessChunkType.DESERIALIZE_CHUNK,
@@ -167,8 +167,7 @@ public class AlignedSeriesCompactionExecutor {
private void flushChunkWriterIfLargeEnough() throws IOException {
if (remainingPointInChunkWriter >= chunkPointNumThreshold
|| chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) {
- CompactionTaskManager.mergeRateLimiterAcquire(
- rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
+ rateLimiter.acquire(schemaList.size() + 1);
CompactionMetricsManager.recordWriteInfo(
CompactionType.INNER_SEQ_COMPACTION,
ProcessChunkType.DESERIALIZE_CHUNK,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index 2d49094f44..fa9ee0efd8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -58,11 +58,13 @@ public class SingleSeriesCompactionExecutor {
private Chunk cachedChunk;
private ChunkMetadata cachedChunkMetadata;
private RateLimiter compactionRateLimiter =
- CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
+ CompactionTaskManager.getInstance().getCompactionIORateLimiter();
// record the min time and max time to update the target resource
private long minStartTimestamp = Long.MAX_VALUE;
private long maxEndTimestamp = Long.MIN_VALUE;
private long pointCountInChunkWriter = 0;
+ private final RateLimiter ioLimiter =
+ CompactionTaskManager.getInstance().getCompactionIORateLimiter();
private final long targetChunkSize =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -282,7 +284,7 @@ public class SingleSeriesCompactionExecutor {
private void flushChunkToFileWriter(
Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws IOException {
- CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk));
+ ioLimiter.acquire(1);
if (chunkMetadata.getStartTime() < minStartTimestamp) {
minStartTimestamp = chunkMetadata.getStartTime();
}
@@ -300,8 +302,7 @@ public class SingleSeriesCompactionExecutor {
private void flushChunkWriterIfLargeEnough() throws IOException {
if (pointCountInChunkWriter >= targetChunkPointNum
|| chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
- CompactionTaskManager.mergeRateLimiterAcquire(
- compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
+ ioLimiter.acquire(1);
CompactionMetricsManager.recordWriteInfo(
CompactionType.INNER_SEQ_COMPACTION,
ProcessChunkType.DESERIALIZE_CHUNK,
@@ -322,8 +323,7 @@ public class SingleSeriesCompactionExecutor {
}
private void flushChunkWriter() throws IOException {
- CompactionTaskManager.mergeRateLimiterAcquire(
- compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
+ ioLimiter.acquire(1);
CompactionMetricsManager.recordWriteInfo(
CompactionType.INNER_SEQ_COMPACTION,
ProcessChunkType.DESERIALIZE_CHUNK,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 72096069e1..683001b31c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -142,7 +142,6 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
protected void flushChunkToFileWriter(TsFileIOWriter targetWriter, int subTaskId)
throws IOException {
- writeRateLimit(chunkWriters[subTaskId].estimateMaxSeriesMemSize());
synchronized (targetWriter) {
chunkWriters[subTaskId].writeToFileWriter(targetWriter);
}
@@ -173,7 +172,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
protected void writeRateLimit(long bytesLength) {
CompactionTaskManager.mergeRateLimiterAcquire(
- CompactionTaskManager.getInstance().getMergeWriteRateLimiter(), bytesLength);
+ CompactionTaskManager.getInstance().getCompactionIORateLimiter(), bytesLength);
}
public abstract List<TsFileIOWriter> getFileIOWriter();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
index 014ad1afd5..2f6110e8a7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
@@ -64,6 +64,6 @@ public class CompactionConfigRestorer {
config.setConcurrentCompactionThread(concurrentCompactionThread);
config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs);
config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs);
- config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec);
+ config.setCompactionIORatePerSec(compactionWriteThroughputMbPerSec);
}
}