You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/01/07 02:47:59 UTC

[iotdb] branch revert_compaction_limit created (now 81e9f5a973)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch revert_compaction_limit
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 81e9f5a973 Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"

This branch includes the following new commits:

     new 81e9f5a973 Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch revert_compaction_limit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 81e9f5a973baf538a706e18231ff070ac48f0c43
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sat Jan 7 10:47:40 2023 +0800

    Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"
    
    This reverts commit 5126711dff14a818a76a7f5dcbd4b29a4251431b.
---
 docs/UserGuide/Reference/Common-Config-Manual.md         | 16 ++++++++--------
 docs/zh/UserGuide/Reference/Common-Config-Manual.md      | 16 ++++++++--------
 .../src/assembly/resources/conf/iotdb-common.properties  |  4 ++--
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java  | 12 ++++++------
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java   | 11 ++++++-----
 .../db/engine/compaction/CompactionTaskManager.java      | 15 +++++++++++++--
 .../cross/utils/AlignedSeriesCompactionExecutor.java     |  8 --------
 .../cross/utils/NonAlignedSeriesCompactionExecutor.java  |  2 --
 .../inner/utils/AlignedSeriesCompactionExecutor.java     |  9 +++++----
 .../inner/utils/SingleSeriesCompactionExecutor.java      | 11 ++++++-----
 .../compaction/writer/AbstractCompactionWriter.java      | 12 +++++++-----
 .../compaction/utils/CompactionConfigRestorer.java       |  2 +-
 12 files changed, 62 insertions(+), 56 deletions(-)

diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index 4d1d24281c..e2c79e093e 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -990,14 +990,14 @@ Different configuration parameters take effect in the following three ways:
 |   Default   | 60000                                    |
 |  Effective  | After restart system                     |
 
-* compaction\_io\_rate\_per\_sec
-
-|Name| compaction\_io\_rate\_per\_sec                 |
-|:---:|:-----------------------------------------------|
-|Description| The io rate of all compaction tasks per second |
-|Type| int32                                          |
-|Default| 50                                             |
-|Effective| After restart system                           |
+* compaction\_write\_throughput\_mb\_per\_sec
+
+|Name| compaction\_write\_throughput\_mb\_per\_sec |
+|:---:|:---|
+|Description| The write rate of all compaction tasks in MB/s |
+|Type| int32 |
+|Default| 16 |
+|Effective|After restart system|
 
 * sub\_compaction\_thread\_count
 
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index fa91601017..0c6c28fcfb 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1045,14 +1045,14 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
 |    默认值    | 60000                                    |
 | 改后生效方式 | 重启服务生效                             |
 
-* compaction\_io\_rate\_per\_sec
-
-|名字| compaction\_io\_rate\_per\_sec |
-|:---:|:-------------------------------|
-|描述| 每秒合并随机IO的次数。                   |
-|类型| int32                          |
-|默认值| 50                             |
-|改后生效方式| 重启服务生效                         |
+* compaction\_write\_throughput\_mb\_per\_sec
+
+|名字| compaction\_write\_throughput\_mb\_per\_sec |
+|:---:|:---|
+|描述| 每秒可达到的写入吞吐量合并限制。|
+|类型| int32 |
+|默认值| 16 |
+|改后生效方式| 重启服务生效|
 
 * sub\_compaction\_thread\_count
 
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index ee21625cfd..16e30a896a 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -627,9 +627,9 @@ cluster_name=defaultCluster
 # Datatype: long, Unit: ms
 # compaction_submission_interval_in_ms=60000
 
-# The limit of io rate can reach per second
+# The limit of write throughput merge can reach per second
 # Datatype: int
-# compaction_io_rate_per_sec=50
+# compaction_write_throughput_mb_per_sec=16
 
 # The number of sub compaction threads to be set up to perform compaction.
 # Currently only works for nonAligned data in cross space compaction and unseq inner space compaction.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8991c91116..8664cf8ddc 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -667,8 +667,8 @@ public class IoTDBConfig {
    */
   private long mergeIntervalSec = 0L;
 
-  /** The limit of io rate can reach per second */
-  private int compactionIORatePerSec = 50;
+  /** The limit of compaction merge can reach per second */
+  private int compactionWriteThroughputMbPerSec = 16;
 
   /**
    * How many thread will be set up to perform compaction, 10 by default. Set to 1 when less than or
@@ -1937,12 +1937,12 @@ public class IoTDBConfig {
     this.intoOperationExecutionThreadCount = intoOperationExecutionThreadCount;
   }
 
-  public int getCompactionIORatePerSec() {
-    return compactionIORatePerSec;
+  public int getCompactionWriteThroughputMbPerSec() {
+    return compactionWriteThroughputMbPerSec;
   }
 
-  public void setCompactionIORatePerSec(int compactionIORatePerSec) {
-    this.compactionIORatePerSec = compactionIORatePerSec;
+  public void setCompactionWriteThroughputMbPerSec(int compactionWriteThroughputMbPerSec) {
+    this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
   }
 
   public boolean isEnableMemControl() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2c897229ab..c0f4135f9d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -658,10 +658,11 @@ public class IoTDBDescriptor {
                 "max_cross_compaction_candidate_file_size",
                 Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
 
-    conf.setCompactionIORatePerSec(
+    conf.setCompactionWriteThroughputMbPerSec(
         Integer.parseInt(
             properties.getProperty(
-                "compaction_io_rate_per_sec", Integer.toString(conf.getCompactionIORatePerSec()))));
+                "compaction_write_throughput_mb_per_sec",
+                Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
     conf.setEnableCompactionValidation(
         Boolean.parseBoolean(
@@ -1438,11 +1439,11 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "slow_query_threshold", Long.toString(conf.getSlowQueryThreshold()))));
       // update merge_write_throughput_mb_per_sec
-      conf.setCompactionIORatePerSec(
+      conf.setCompactionWriteThroughputMbPerSec(
           Integer.parseInt(
               properties.getProperty(
-                  "compaction_io_rate_per_sec",
-                  Integer.toString(conf.getCompactionIORatePerSec()))));
+                  "merge_write_throughput_mb_per_sec",
+                  Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
       // update insert-tablet-plan's row limit for select-into
       conf.setSelectIntoInsertTabletPlanRowLimit(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 1e5424bc51..ea7e70805f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -240,8 +240,9 @@ public class CompactionTaskManager implements IService {
         .containsKey(task);
   }
 
-  public RateLimiter getCompactionIORateLimiter() {
-    setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getCompactionIORatePerSec());
+  public RateLimiter getMergeWriteRateLimiter() {
+    setWriteMergeRate(
+        IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
     return mergeWriteRateLimiter;
   }
 
@@ -255,6 +256,16 @@ public class CompactionTaskManager implements IService {
       mergeWriteRateLimiter.setRate(throughout);
     }
   }
+  /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
+  public static void mergeRateLimiterAcquire(RateLimiter limiter, long bytesLength) {
+    while (bytesLength >= Integer.MAX_VALUE) {
+      limiter.acquire(Integer.MAX_VALUE);
+      bytesLength -= Integer.MAX_VALUE;
+    }
+    if (bytesLength > 0) {
+      limiter.acquire((int) bytesLength);
+    }
+  }
 
   public synchronized void removeRunningTaskFuture(AbstractCompactionTask task) {
     String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java
index ac9e3b17c0..72b364adb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.cross.utils;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.task.SubCompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.modification.Modification;
@@ -41,8 +40,6 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
-import com.google.common.util.concurrent.RateLimiter;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -57,9 +54,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
 
   private final List<IMeasurementSchema> measurementSchemas;
 
-  private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
-
   public AlignedSeriesCompactionExecutor(
       AbstractCompactionWriter compactionWriter,
       Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap,
@@ -287,7 +281,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
   void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException {
     AlignedChunkMetadata alignedChunkMetadata =
         (AlignedChunkMetadata) chunkMetadataElement.chunkMetadata;
-    rateLimiter.acquire(1);
     chunkMetadataElement.chunk =
         readerCacheMap
             .get(chunkMetadataElement.fileElement.resource)
@@ -299,7 +292,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
         valueChunks.add(null);
         continue;
       }
-      rateLimiter.acquire(1);
       valueChunks.add(
           readerCacheMap
               .get(chunkMetadataElement.fileElement.resource)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java
index 1c0d3a1e75..da26d515c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.cross.utils;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.task.SubCompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.modification.Modification;
@@ -172,7 +171,6 @@ public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor
 
   @Override
   void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException {
-    CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
     chunkMetadataElement.chunk =
         readerCacheMap
             .get(chunkMetadataElement.fileElement.resource)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index 3ecc19d11e..3f4d475679 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -63,7 +63,7 @@ public class AlignedSeriesCompactionExecutor {
   private final List<IMeasurementSchema> schemaList;
   private long remainingPointInChunkWriter = 0L;
   private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
 
   private final long chunkSizeThreshold =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -139,13 +139,13 @@ public class AlignedSeriesCompactionExecutor {
       while (readerIterator.hasNext()) {
         Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader();
         CompactionMetricsRecorder.recordReadInfo(chunkReaderAndChunkSize.right);
-        rateLimiter.acquire(schemaList.size() + 1);
         compactOneAlignedChunk(chunkReaderAndChunkSize.left);
       }
     }
 
     if (remainingPointInChunkWriter != 0L) {
-      rateLimiter.acquire(chunkWriter.getValueChunkWriterList().size() + 1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -187,7 +187,8 @@ public class AlignedSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (remainingPointInChunkWriter >= chunkPointNumThreshold
         || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) {
-      rateLimiter.acquire(chunkWriter.getValueChunkWriterList().size() + 1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index e8f7a1186a..9575757906 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -58,7 +58,7 @@ public class SingleSeriesCompactionExecutor {
   private Chunk cachedChunk;
   private ChunkMetadata cachedChunkMetadata;
   private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
   // record the min time and max time to update the target resource
   private long minStartTimestamp = Long.MAX_VALUE;
   private long maxEndTimestamp = Long.MIN_VALUE;
@@ -118,7 +118,6 @@ public class SingleSeriesCompactionExecutor {
       TsFileSequenceReader reader = readerListPair.left;
       List<ChunkMetadata> chunkMetadataList = readerListPair.right;
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-        compactionRateLimiter.acquire(1);
         Chunk currentChunk = reader.readMemChunk(chunkMetadata);
         if (this.chunkWriter == null) {
           constructChunkWriterFromReadChunk(currentChunk);
@@ -300,7 +299,7 @@ public class SingleSeriesCompactionExecutor {
 
   private void flushChunkToFileWriter(
       Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws IOException {
-    compactionRateLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk));
     if (chunkMetadata.getStartTime() < minStartTimestamp) {
       minStartTimestamp = chunkMetadata.getStartTime();
     }
@@ -318,7 +317,8 @@ public class SingleSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (pointCountInChunkWriter >= targetChunkPointNum
         || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
-      compactionRateLimiter.acquire(1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -339,7 +339,8 @@ public class SingleSeriesCompactionExecutor {
   }
 
   private void flushChunkWriter() throws IOException {
-    compactionRateLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(
+        compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
     CompactionMetricsRecorder.recordWriteInfo(
         CompactionType.INNER_SEQ_COMPACTION,
         ProcessChunkType.DESERIALIZE_CHUNK,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index e95a386b72..1a0973602d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -49,7 +49,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   protected int subTaskNum = IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
 
   private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
 
   // check if there is unseq error point during writing
   protected long[] lastTime = new long[subTaskNum];
@@ -164,7 +164,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
 
   protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId)
       throws IOException {
-    compactionRateLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(
+        compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
     synchronized (targetWriter) {
       iChunkWriter.writeToFileWriter(targetWriter);
     }
@@ -185,11 +186,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   protected void flushNonAlignedChunkToFileWriter(
       TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId)
       throws IOException {
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk));
     synchronized (targetWriter) {
       // seal last chunk to file writer
       chunkWriters[subTaskId].writeToFileWriter(targetWriter);
       chunkPointNumArray[subTaskId] = 0;
-      compactionRateLimiter.acquire(1);
       targetWriter.writeChunk(chunk, chunkMetadata);
     }
   }
@@ -209,7 +210,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
       chunkPointNumArray[subTaskId] = 0;
 
       // flush time chunk
-      compactionRateLimiter.acquire(1);
+      CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(timeChunk));
       targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata);
 
       // flush value chunks
@@ -226,7 +227,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
               valueChunkWriter.getStatistics());
           continue;
         }
-        compactionRateLimiter.acquire(1);
+        CompactionTaskManager.mergeRateLimiterAcquire(
+            compactionRateLimiter, getChunkSize(valueChunk));
         targetWriter.writeChunk(valueChunk, (ChunkMetadata) valueChunkMetadatas.get(i));
       }
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
index ada6af6de5..613308c5d7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
@@ -74,7 +74,7 @@ public class CompactionConfigRestorer {
     config.setCompactionThreadCount(concurrentCompactionThread);
     config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs);
     config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs);
-    config.setCompactionIORatePerSec(compactionWriteThroughputMbPerSec);
+    config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec);
     config.setCrossCompactionPerformer(oldCrossPerformer);
     config.setInnerSeqCompactionPerformer(oldInnerSeqPerformer);
     config.setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer);