You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/12/28 06:35:20 UTC

[iotdb] branch test_write_5126711 created (now ffbfa036b0)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch test_write_5126711
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at ffbfa036b0 Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"

This branch includes the following new commits:

     new ffbfa036b0 Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch test_write_5126711
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ffbfa036b04b914bf8046422beb60d2391b7d9fa
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Dec 28 14:32:50 2022 +0800

    Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"
    
    This reverts commit 5126711dff14a818a76a7f5dcbd4b29a4251431b.
---
 docs/UserGuide/Reference/Common-Config-Manual.md         | 16 ++++++++--------
 docs/zh/UserGuide/Reference/Common-Config-Manual.md      | 16 ++++++++--------
 .../src/assembly/resources/conf/iotdb-common.properties  |  4 ++--
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java  | 12 ++++++------
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java   | 11 ++++++-----
 .../db/engine/compaction/CompactionTaskManager.java      | 15 +++++++++++++--
 .../cross/utils/AlignedSeriesCompactionExecutor.java     |  8 --------
 .../cross/utils/NonAlignedSeriesCompactionExecutor.java  |  2 --
 .../inner/utils/AlignedSeriesCompactionExecutor.java     |  9 +++++----
 .../inner/utils/SingleSeriesCompactionExecutor.java      | 11 ++++++-----
 .../compaction/writer/AbstractCompactionWriter.java      | 12 +++++++-----
 .../compaction/utils/CompactionConfigRestorer.java       |  2 +-
 12 files changed, 62 insertions(+), 56 deletions(-)

diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index 4d1d24281c..e2c79e093e 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -990,14 +990,14 @@ Different configuration parameters take effect in the following three ways:
 |   Default   | 60000                                    |
 |  Effective  | After restart system                     |
 
-* compaction\_io\_rate\_per\_sec
-
-|Name| compaction\_io\_rate\_per\_sec                 |
-|:---:|:-----------------------------------------------|
-|Description| The io rate of all compaction tasks per second |
-|Type| int32                                          |
-|Default| 50                                             |
-|Effective| After restart system                           |
+* compaction\_write\_throughput\_mb\_per\_sec
+
+|Name| compaction\_write\_throughput\_mb\_per\_sec |
+|:---:|:---|
+|Description| The write rate of all compaction tasks in MB/s |
+|Type| int32 |
+|Default| 16 |
+|Effective|After restart system|
 
 * sub\_compaction\_thread\_count
 
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index fa91601017..0c6c28fcfb 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1045,14 +1045,14 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
 |    默认值    | 60000                                    |
 | 改后生效方式 | 重启服务生效                             |
 
-* compaction\_io\_rate\_per\_sec
-
-|名字| compaction\_io\_rate\_per\_sec |
-|:---:|:-------------------------------|
-|描述| 每秒合并随机IO的次数。                   |
-|类型| int32                          |
-|默认值| 50                             |
-|改后生效方式| 重启服务生效                         |
+* compaction\_write\_throughput\_mb\_per\_sec
+
+|名字| compaction\_write\_throughput\_mb\_per\_sec |
+|:---:|:---|
+|描述| 每秒可达到的写入吞吐量合并限制。|
+|类型| int32 |
+|默认值| 16 |
+|改后生效方式| 重启服务生效|
 
 * sub\_compaction\_thread\_count
 
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index d0302b5c77..d932a8e9f3 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -619,9 +619,9 @@
 # Datatype: long, Unit: ms
 # compaction_submission_interval_in_ms=60000
 
-# The limit of io rate can reach per second
+# The limit of write throughput merge can reach per second
 # Datatype: int
-# compaction_io_rate_per_sec=50
+# compaction_write_throughput_mb_per_sec=16
 
 # The number of sub compaction threads to be set up to perform compaction.
 # Currently only works for nonAligned data in cross space compaction and unseq inner space compaction.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ffed1b6e82..0b3be2e18b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -658,8 +658,8 @@ public class IoTDBConfig {
    */
   private long mergeIntervalSec = 0L;
 
-  /** The limit of io rate can reach per second */
-  private int compactionIORatePerSec = 50;
+  /** The limit of compaction merge can reach per second */
+  private int compactionWriteThroughputMbPerSec = 16;
 
   /**
    * How many thread will be set up to perform compaction, 10 by default. Set to 1 when less than or
@@ -1920,12 +1920,12 @@ public class IoTDBConfig {
     this.intoOperationExecutionThreadCount = intoOperationExecutionThreadCount;
   }
 
-  public int getCompactionIORatePerSec() {
-    return compactionIORatePerSec;
+  public int getCompactionWriteThroughputMbPerSec() {
+    return compactionWriteThroughputMbPerSec;
   }
 
-  public void setCompactionIORatePerSec(int compactionIORatePerSec) {
-    this.compactionIORatePerSec = compactionIORatePerSec;
+  public void setCompactionWriteThroughputMbPerSec(int compactionWriteThroughputMbPerSec) {
+    this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
   }
 
   public boolean isEnableMemControl() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 37f44e2f41..596da574af 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -655,10 +655,11 @@ public class IoTDBDescriptor {
                 "max_cross_compaction_candidate_file_size",
                 Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
 
-    conf.setCompactionIORatePerSec(
+    conf.setCompactionWriteThroughputMbPerSec(
         Integer.parseInt(
             properties.getProperty(
-                "compaction_io_rate_per_sec", Integer.toString(conf.getCompactionIORatePerSec()))));
+                "compaction_write_throughput_mb_per_sec",
+                Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
     conf.setEnableCompactionValidation(
         Boolean.parseBoolean(
@@ -1435,11 +1436,11 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "slow_query_threshold", Long.toString(conf.getSlowQueryThreshold()))));
       // update merge_write_throughput_mb_per_sec
-      conf.setCompactionIORatePerSec(
+      conf.setCompactionWriteThroughputMbPerSec(
           Integer.parseInt(
               properties.getProperty(
-                  "compaction_io_rate_per_sec",
-                  Integer.toString(conf.getCompactionIORatePerSec()))));
+                  "merge_write_throughput_mb_per_sec",
+                  Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
       // update insert-tablet-plan's row limit for select-into
       conf.setSelectIntoInsertTabletPlanRowLimit(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 1e5424bc51..ea7e70805f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -240,8 +240,9 @@ public class CompactionTaskManager implements IService {
         .containsKey(task);
   }
 
-  public RateLimiter getCompactionIORateLimiter() {
-    setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getCompactionIORatePerSec());
+  public RateLimiter getMergeWriteRateLimiter() {
+    setWriteMergeRate(
+        IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
     return mergeWriteRateLimiter;
   }
 
@@ -255,6 +256,16 @@ public class CompactionTaskManager implements IService {
       mergeWriteRateLimiter.setRate(throughout);
     }
   }
+  /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
+  public static void mergeRateLimiterAcquire(RateLimiter limiter, long bytesLength) {
+    while (bytesLength >= Integer.MAX_VALUE) {
+      limiter.acquire(Integer.MAX_VALUE);
+      bytesLength -= Integer.MAX_VALUE;
+    }
+    if (bytesLength > 0) {
+      limiter.acquire((int) bytesLength);
+    }
+  }
 
   public synchronized void removeRunningTaskFuture(AbstractCompactionTask task) {
     String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java
index ac9e3b17c0..72b364adb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.cross.utils;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.task.SubCompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.modification.Modification;
@@ -41,8 +40,6 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
-import com.google.common.util.concurrent.RateLimiter;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -57,9 +54,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
 
   private final List<IMeasurementSchema> measurementSchemas;
 
-  private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
-
   public AlignedSeriesCompactionExecutor(
       AbstractCompactionWriter compactionWriter,
       Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap,
@@ -287,7 +281,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
   void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException {
     AlignedChunkMetadata alignedChunkMetadata =
         (AlignedChunkMetadata) chunkMetadataElement.chunkMetadata;
-    rateLimiter.acquire(1);
     chunkMetadataElement.chunk =
         readerCacheMap
             .get(chunkMetadataElement.fileElement.resource)
@@ -299,7 +292,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
         valueChunks.add(null);
         continue;
       }
-      rateLimiter.acquire(1);
       valueChunks.add(
           readerCacheMap
               .get(chunkMetadataElement.fileElement.resource)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java
index 1c0d3a1e75..da26d515c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.cross.utils;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.task.SubCompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.modification.Modification;
@@ -172,7 +171,6 @@ public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor
 
   @Override
   void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException {
-    CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
     chunkMetadataElement.chunk =
         readerCacheMap
             .get(chunkMetadataElement.fileElement.resource)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index 3ecc19d11e..3f4d475679 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -63,7 +63,7 @@ public class AlignedSeriesCompactionExecutor {
   private final List<IMeasurementSchema> schemaList;
   private long remainingPointInChunkWriter = 0L;
   private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
 
   private final long chunkSizeThreshold =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -139,13 +139,13 @@ public class AlignedSeriesCompactionExecutor {
       while (readerIterator.hasNext()) {
         Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader();
         CompactionMetricsRecorder.recordReadInfo(chunkReaderAndChunkSize.right);
-        rateLimiter.acquire(schemaList.size() + 1);
         compactOneAlignedChunk(chunkReaderAndChunkSize.left);
       }
     }
 
     if (remainingPointInChunkWriter != 0L) {
-      rateLimiter.acquire(chunkWriter.getValueChunkWriterList().size() + 1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -187,7 +187,8 @@ public class AlignedSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (remainingPointInChunkWriter >= chunkPointNumThreshold
         || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) {
-      rateLimiter.acquire(chunkWriter.getValueChunkWriterList().size() + 1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index e8f7a1186a..9575757906 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -58,7 +58,7 @@ public class SingleSeriesCompactionExecutor {
   private Chunk cachedChunk;
   private ChunkMetadata cachedChunkMetadata;
   private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
   // record the min time and max time to update the target resource
   private long minStartTimestamp = Long.MAX_VALUE;
   private long maxEndTimestamp = Long.MIN_VALUE;
@@ -118,7 +118,6 @@ public class SingleSeriesCompactionExecutor {
       TsFileSequenceReader reader = readerListPair.left;
       List<ChunkMetadata> chunkMetadataList = readerListPair.right;
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-        compactionRateLimiter.acquire(1);
         Chunk currentChunk = reader.readMemChunk(chunkMetadata);
         if (this.chunkWriter == null) {
           constructChunkWriterFromReadChunk(currentChunk);
@@ -300,7 +299,7 @@ public class SingleSeriesCompactionExecutor {
 
   private void flushChunkToFileWriter(
       Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws IOException {
-    compactionRateLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk));
     if (chunkMetadata.getStartTime() < minStartTimestamp) {
       minStartTimestamp = chunkMetadata.getStartTime();
     }
@@ -318,7 +317,8 @@ public class SingleSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (pointCountInChunkWriter >= targetChunkPointNum
         || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
-      compactionRateLimiter.acquire(1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -339,7 +339,8 @@ public class SingleSeriesCompactionExecutor {
   }
 
   private void flushChunkWriter() throws IOException {
-    compactionRateLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(
+        compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
     CompactionMetricsRecorder.recordWriteInfo(
         CompactionType.INNER_SEQ_COMPACTION,
         ProcessChunkType.DESERIALIZE_CHUNK,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index e95a386b72..1a0973602d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -49,7 +49,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   protected int subTaskNum = IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
 
   private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
 
   // check if there is unseq error point during writing
   protected long[] lastTime = new long[subTaskNum];
@@ -164,7 +164,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
 
   protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId)
       throws IOException {
-    compactionRateLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(
+        compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
     synchronized (targetWriter) {
       iChunkWriter.writeToFileWriter(targetWriter);
     }
@@ -185,11 +186,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   protected void flushNonAlignedChunkToFileWriter(
       TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId)
       throws IOException {
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk));
     synchronized (targetWriter) {
       // seal last chunk to file writer
       chunkWriters[subTaskId].writeToFileWriter(targetWriter);
       chunkPointNumArray[subTaskId] = 0;
-      compactionRateLimiter.acquire(1);
       targetWriter.writeChunk(chunk, chunkMetadata);
     }
   }
@@ -209,7 +210,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
       chunkPointNumArray[subTaskId] = 0;
 
       // flush time chunk
-      compactionRateLimiter.acquire(1);
+      CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(timeChunk));
       targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata);
 
       // flush value chunks
@@ -226,7 +227,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
               valueChunkWriter.getStatistics());
           continue;
         }
-        compactionRateLimiter.acquire(1);
+        CompactionTaskManager.mergeRateLimiterAcquire(
+            compactionRateLimiter, getChunkSize(valueChunk));
         targetWriter.writeChunk(valueChunk, (ChunkMetadata) valueChunkMetadatas.get(i));
       }
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
index 4b854df976..6df41eac2a 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
@@ -75,7 +75,7 @@ public class CompactionConfigRestorer {
     config.setCompactionThreadCount(concurrentCompactionThread);
     config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs);
     config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs);
-    config.setCompactionIORatePerSec(compactionWriteThroughputMbPerSec);
+    config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec);
     config.setCrossCompactionPerformer(oldCrossPerformer);
     config.setInnerSeqCompactionPerformer(oldInnerSeqPerformer);
     config.setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer);