You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/01/07 02:57:39 UTC

[iotdb] branch revert_5126711 created (now 23fc8827ab)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch revert_5126711
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 23fc8827ab Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"

This branch includes the following new commits:

     new 23fc8827ab Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch revert_5126711
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 23fc8827ab792a6bace0c5968556128d3d89bf67
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sat Jan 7 10:47:40 2023 +0800

    Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)"
    
    This reverts commit 5126711dff14a818a76a7f5dcbd4b29a4251431b.
---
 docs/UserGuide/Reference/Common-Config-Manual.md         | 16 ++++++++--------
 docs/zh/UserGuide/Reference/Common-Config-Manual.md      | 16 ++++++++--------
 .../src/assembly/resources/conf/iotdb-common.properties  |  4 ++--
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java  | 12 ++++++------
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java   | 11 ++++++-----
 .../executor/fast/AlignedSeriesCompactionExecutor.java   |  8 --------
 .../fast/NonAlignedSeriesCompactionExecutor.java         |  2 --
 .../readchunk/AlignedSeriesCompactionExecutor.java       |  9 +++++----
 .../readchunk/SingleSeriesCompactionExecutor.java        | 11 ++++++-----
 .../execute/utils/writer/AbstractCompactionWriter.java   | 12 +++++++-----
 .../compaction/schedule/CompactionTaskManager.java       | 15 +++++++++++++--
 .../compaction/utils/CompactionConfigRestorer.java       |  2 +-
 12 files changed, 62 insertions(+), 56 deletions(-)

diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index 4d1d24281c..e2c79e093e 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -990,14 +990,14 @@ Different configuration parameters take effect in the following three ways:
 |   Default   | 60000                                    |
 |  Effective  | After restart system                     |
 
-* compaction\_io\_rate\_per\_sec
-
-|Name| compaction\_io\_rate\_per\_sec                 |
-|:---:|:-----------------------------------------------|
-|Description| The io rate of all compaction tasks per second |
-|Type| int32                                          |
-|Default| 50                                             |
-|Effective| After restart system                           |
+* compaction\_write\_throughput\_mb\_per\_sec
+
+|Name| compaction\_write\_throughput\_mb\_per\_sec |
+|:---:|:---|
+|Description| The write rate of all compaction tasks in MB/s |
+|Type| int32 |
+|Default| 16 |
+|Effective|After restart system|
 
 * sub\_compaction\_thread\_count
 
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index fa91601017..0c6c28fcfb 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1045,14 +1045,14 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
 |    默认值    | 60000                                    |
 | 改后生效方式 | 重启服务生效                             |
 
-* compaction\_io\_rate\_per\_sec
-
-|名字| compaction\_io\_rate\_per\_sec |
-|:---:|:-------------------------------|
-|描述| 每秒合并随机IO的次数。                   |
-|类型| int32                          |
-|默认值| 50                             |
-|改后生效方式| 重启服务生效                         |
+* compaction\_write\_throughput\_mb\_per\_sec
+
+|名字| compaction\_write\_throughput\_mb\_per\_sec |
+|:---:|:---|
+|描述| 每秒可达到的写入吞吐量合并限制。|
+|类型| int32 |
+|默认值| 16 |
+|改后生效方式| 重启服务生效|
 
 * sub\_compaction\_thread\_count
 
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index ec90b9915b..ee3616784d 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -622,9 +622,9 @@ cluster_name=defaultCluster
 # Datatype: long, Unit: ms
 # compaction_submission_interval_in_ms=60000
 
-# The limit of io rate can reach per second
+# The limit of write throughput merge can reach per second
 # Datatype: int
-# compaction_io_rate_per_sec=50
+# compaction_write_throughput_mb_per_sec=16
 
 # The number of sub compaction threads to be set up to perform compaction.
 # Currently only works for nonAligned data in cross space compaction and unseq inner space compaction.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d562199db4..ad3a906a6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -661,8 +661,8 @@ public class IoTDBConfig {
    */
   private long mergeIntervalSec = 0L;
 
-  /** The limit of io rate can reach per second */
-  private int compactionIORatePerSec = 50;
+  /** The limit of compaction merge can reach per second */
+  private int compactionWriteThroughputMbPerSec = 16;
 
   /**
    * How many thread will be set up to perform compaction, 10 by default. Set to 1 when less than or
@@ -1916,12 +1916,12 @@ public class IoTDBConfig {
     this.intoOperationExecutionThreadCount = intoOperationExecutionThreadCount;
   }
 
-  public int getCompactionIORatePerSec() {
-    return compactionIORatePerSec;
+  public int getCompactionWriteThroughputMbPerSec() {
+    return compactionWriteThroughputMbPerSec;
   }
 
-  public void setCompactionIORatePerSec(int compactionIORatePerSec) {
-    this.compactionIORatePerSec = compactionIORatePerSec;
+  public void setCompactionWriteThroughputMbPerSec(int compactionWriteThroughputMbPerSec) {
+    this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
   }
 
   public boolean isEnableMemControl() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 43f5da3278..6657bdae25 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -650,10 +650,11 @@ public class IoTDBDescriptor {
                 "max_cross_compaction_candidate_file_size",
                 Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
 
-    conf.setCompactionIORatePerSec(
+    conf.setCompactionWriteThroughputMbPerSec(
         Integer.parseInt(
             properties.getProperty(
-                "compaction_io_rate_per_sec", Integer.toString(conf.getCompactionIORatePerSec()))));
+                "compaction_write_throughput_mb_per_sec",
+                Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
     conf.setEnableCompactionValidation(
         Boolean.parseBoolean(
@@ -1430,11 +1431,11 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "slow_query_threshold", Long.toString(conf.getSlowQueryThreshold()))));
       // update merge_write_throughput_mb_per_sec
-      conf.setCompactionIORatePerSec(
+      conf.setCompactionWriteThroughputMbPerSec(
           Integer.parseInt(
               properties.getProperty(
-                  "compaction_io_rate_per_sec",
-                  Integer.toString(conf.getCompactionIORatePerSec()))));
+                  "merge_write_throughput_mb_per_sec",
+                  Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
       // update insert-tablet-plan's row limit for select-into
       conf.setSelectIntoInsertTabletPlanRowLimit(
           Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index 88b7229d2a..fae9603ada 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element
 import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.FileElement;
 import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.PageElement;
 import org.apache.iotdb.db.engine.compaction.execute.utils.writer.AbstractCompactionWriter;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.WriteProcessException;
@@ -44,8 +43,6 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
-import com.google.common.util.concurrent.RateLimiter;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -60,9 +57,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
 
   private final List<IMeasurementSchema> measurementSchemas;
 
-  private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
-
   public AlignedSeriesCompactionExecutor(
       AbstractCompactionWriter compactionWriter,
       Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap,
@@ -290,7 +284,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
   void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException {
     AlignedChunkMetadata alignedChunkMetadata =
         (AlignedChunkMetadata) chunkMetadataElement.chunkMetadata;
-    rateLimiter.acquire(1);
     chunkMetadataElement.chunk =
         readerCacheMap
             .get(chunkMetadataElement.fileElement.resource)
@@ -302,7 +295,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
         valueChunks.add(null);
         continue;
       }
-      rateLimiter.acquire(1);
       valueChunks.add(
           readerCacheMap
               .get(chunkMetadataElement.fileElement.resource)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
index acbce311db..8782345081 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element
 import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.FileElement;
 import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.PageElement;
 import org.apache.iotdb.db.engine.compaction.execute.utils.writer.AbstractCompactionWriter;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.WriteProcessException;
@@ -175,7 +174,6 @@ public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor
 
   @Override
   void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException {
-    CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
     chunkMetadataElement.chunk =
         readerCacheMap
             .get(chunkMetadataElement.fileElement.resource)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
index 58c4e4bef0..a35163ee7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
@@ -63,7 +63,7 @@ public class AlignedSeriesCompactionExecutor {
   private final List<IMeasurementSchema> schemaList;
   private long remainingPointInChunkWriter = 0L;
   private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
 
   private final long chunkSizeThreshold =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -139,13 +139,13 @@ public class AlignedSeriesCompactionExecutor {
       while (readerIterator.hasNext()) {
         Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader();
         CompactionMetricsRecorder.recordReadInfo(chunkReaderAndChunkSize.right);
-        rateLimiter.acquire(schemaList.size() + 1);
         compactOneAlignedChunk(chunkReaderAndChunkSize.left);
       }
     }
 
     if (remainingPointInChunkWriter != 0L) {
-      rateLimiter.acquire(chunkWriter.getValueChunkWriterList().size() + 1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -187,7 +187,8 @@ public class AlignedSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (remainingPointInChunkWriter >= chunkPointNumThreshold
         || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) {
-      rateLimiter.acquire(chunkWriter.getValueChunkWriterList().size() + 1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index b8bd1f5b42..e636e30086 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -58,7 +58,7 @@ public class SingleSeriesCompactionExecutor {
   private Chunk cachedChunk;
   private ChunkMetadata cachedChunkMetadata;
   private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
   // record the min time and max time to update the target resource
   private long minStartTimestamp = Long.MAX_VALUE;
   private long maxEndTimestamp = Long.MIN_VALUE;
@@ -118,7 +118,6 @@ public class SingleSeriesCompactionExecutor {
       TsFileSequenceReader reader = readerListPair.left;
       List<ChunkMetadata> chunkMetadataList = readerListPair.right;
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-        compactionRateLimiter.acquire(1);
         Chunk currentChunk = reader.readMemChunk(chunkMetadata);
         if (this.chunkWriter == null) {
           constructChunkWriterFromReadChunk(currentChunk);
@@ -301,7 +300,7 @@ public class SingleSeriesCompactionExecutor {
 
   private void flushChunkToFileWriter(
       Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws IOException {
-    compactionRateLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk));
     if (chunkMetadata.getStartTime() < minStartTimestamp) {
       minStartTimestamp = chunkMetadata.getStartTime();
     }
@@ -319,7 +318,8 @@ public class SingleSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (pointCountInChunkWriter >= targetChunkPointNum
         || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
-      compactionRateLimiter.acquire(1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -340,7 +340,8 @@ public class SingleSeriesCompactionExecutor {
   }
 
   private void flushChunkWriter() throws IOException {
-    compactionRateLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(
+        compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
     CompactionMetricsRecorder.recordWriteInfo(
         CompactionType.INNER_SEQ_COMPACTION,
         ProcessChunkType.DESERIALIZE_CHUNK,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 76b94be225..92ff3008e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -49,7 +49,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   protected int subTaskNum = IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
 
   private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
 
   // check if there is unseq error point during writing
   protected long[] lastTime = new long[subTaskNum];
@@ -164,7 +164,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
 
   protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId)
       throws IOException {
-    compactionRateLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(
+        compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
     synchronized (targetWriter) {
       iChunkWriter.writeToFileWriter(targetWriter);
     }
@@ -185,11 +186,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   protected void flushNonAlignedChunkToFileWriter(
       TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId)
       throws IOException {
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk));
     synchronized (targetWriter) {
       // seal last chunk to file writer
       chunkWriters[subTaskId].writeToFileWriter(targetWriter);
       chunkPointNumArray[subTaskId] = 0;
-      compactionRateLimiter.acquire(1);
       targetWriter.writeChunk(chunk, chunkMetadata);
     }
   }
@@ -209,7 +210,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
       chunkPointNumArray[subTaskId] = 0;
 
       // flush time chunk
-      compactionRateLimiter.acquire(1);
+      CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(timeChunk));
       targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata);
 
       // flush value chunks
@@ -226,7 +227,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
               valueChunkWriter.getStatistics());
           continue;
         }
-        compactionRateLimiter.acquire(1);
+        CompactionTaskManager.mergeRateLimiterAcquire(
+            compactionRateLimiter, getChunkSize(valueChunk));
         targetWriter.writeChunk(valueChunk, (ChunkMetadata) valueChunkMetadatas.get(i));
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index b8f798795b..0e51e046a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -240,8 +240,9 @@ public class CompactionTaskManager implements IService {
         .containsKey(task);
   }
 
-  public RateLimiter getCompactionIORateLimiter() {
-    setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getCompactionIORatePerSec());
+  public RateLimiter getMergeWriteRateLimiter() {
+    setWriteMergeRate(
+        IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
     return mergeWriteRateLimiter;
   }
 
@@ -255,6 +256,16 @@ public class CompactionTaskManager implements IService {
       mergeWriteRateLimiter.setRate(throughout);
     }
   }
+  /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
+  public static void mergeRateLimiterAcquire(RateLimiter limiter, long bytesLength) {
+    while (bytesLength >= Integer.MAX_VALUE) {
+      limiter.acquire(Integer.MAX_VALUE);
+      bytesLength -= Integer.MAX_VALUE;
+    }
+    if (bytesLength > 0) {
+      limiter.acquire((int) bytesLength);
+    }
+  }
 
   public synchronized void removeRunningTaskFuture(AbstractCompactionTask task) {
     String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
index 6ab662e42e..81fbbc37e9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
@@ -75,7 +75,7 @@ public class CompactionConfigRestorer {
     config.setCompactionThreadCount(concurrentCompactionThread);
     config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs);
     config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs);
-    config.setCompactionIORatePerSec(compactionWriteThroughputMbPerSec);
+    config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec);
     config.setCrossCompactionPerformer(oldCrossPerformer);
     config.setInnerSeqCompactionPerformer(oldInnerSeqPerformer);
     config.setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer);