You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/12/14 09:00:46 UTC

[iotdb] branch IOTDB-5209 updated: limit the io rate for fast compaction performer

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-5209
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-5209 by this push:
     new 2c7be4e4e5 limit the io rate for fast compaction performer
2c7be4e4e5 is described below

commit 2c7be4e4e59d43edab6cb41007a669d2184ecab5
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Wed Dec 14 16:50:38 2022 +0800

    limit the io rate for fast compaction performer
---
 docs/UserGuide/Reference/Common-Config-Manual.md         | 16 ++++++++--------
 docs/zh/UserGuide/Reference/Common-Config-Manual.md      | 16 ++++++++--------
 .../db/engine/compaction/CompactionTaskManager.java      | 10 ----------
 .../cross/utils/AlignedSeriesCompactionExecutor.java     |  8 ++++++++
 .../cross/utils/NonAlignedSeriesCompactionExecutor.java  |  2 ++
 .../compaction/writer/AbstractCompactionWriter.java      | 12 ++++--------
 6 files changed, 30 insertions(+), 34 deletions(-)

diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index e2c79e093e..4d1d24281c 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -990,14 +990,14 @@ Different configuration parameters take effect in the following three ways:
 |   Default   | 60000                                    |
 |  Effective  | After restart system                     |
 
-* compaction\_write\_throughput\_mb\_per\_sec
-
-|Name| compaction\_write\_throughput\_mb\_per\_sec |
-|:---:|:---|
-|Description| The write rate of all compaction tasks in MB/s |
-|Type| int32 |
-|Default| 16 |
-|Effective|After restart system|
+* compaction\_io\_rate\_per\_sec
+
+|Name| compaction\_io\_rate\_per\_sec                 |
+|:---:|:-----------------------------------------------|
+|Description| The io rate of all compaction tasks per second |
+|Type| int32                                          |
+|Default| 50                                             |
+|Effective| After restart system                           |
 
 * sub\_compaction\_thread\_count
 
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index 0c6c28fcfb..fa91601017 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1045,14 +1045,14 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
 |    默认值    | 60000                                    |
 | 改后生效方式 | 重启服务生效                             |
 
-* compaction\_write\_throughput\_mb\_per\_sec
-
-|名字| compaction\_write\_throughput\_mb\_per\_sec |
-|:---:|:---|
-|描述| 每秒可达到的写入吞吐量合并限制。|
-|类型| int32 |
-|默认值| 16 |
-|改后生效方式| 重启服务生效|
+* compaction\_io\_rate\_per\_sec
+
+|名字| compaction\_io\_rate\_per\_sec |
+|:---:|:-------------------------------|
+|描述| 每秒合并随机IO的次数。                   |
+|类型| int32                          |
+|默认值| 50                             |
+|改后生效方式| 重启服务生效                         |
 
 * sub\_compaction\_thread\_count
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index e6201b448b..1e5424bc51 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -255,16 +255,6 @@ public class CompactionTaskManager implements IService {
       mergeWriteRateLimiter.setRate(throughout);
     }
   }
-  /** wait by compactionIORatePerSec limit to avoid continuous Write Or Read */
-  public static void compactionIORateLimiterAcquire(RateLimiter limiter, long bytesLength) {
-    while (bytesLength >= Integer.MAX_VALUE) {
-      limiter.acquire(Integer.MAX_VALUE);
-      bytesLength -= Integer.MAX_VALUE;
-    }
-    if (bytesLength > 0) {
-      limiter.acquire((int) bytesLength);
-    }
-  }
 
   public synchronized void removeRunningTaskFuture(AbstractCompactionTask task) {
     String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java
index 72b364adb8..ac9e3b17c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction.cross.utils;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.task.SubCompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.modification.Modification;
@@ -40,6 +41,8 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
+import com.google.common.util.concurrent.RateLimiter;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -54,6 +57,9 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
 
   private final List<IMeasurementSchema> measurementSchemas;
 
+  private final RateLimiter rateLimiter =
+      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+
   public AlignedSeriesCompactionExecutor(
       AbstractCompactionWriter compactionWriter,
       Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap,
@@ -281,6 +287,7 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
   void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException {
     AlignedChunkMetadata alignedChunkMetadata =
         (AlignedChunkMetadata) chunkMetadataElement.chunkMetadata;
+    rateLimiter.acquire(1);
     chunkMetadataElement.chunk =
         readerCacheMap
             .get(chunkMetadataElement.fileElement.resource)
@@ -292,6 +299,7 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
         valueChunks.add(null);
         continue;
       }
+      rateLimiter.acquire(1);
       valueChunks.add(
           readerCacheMap
               .get(chunkMetadataElement.fileElement.resource)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java
index da26d515c7..1c0d3a1e75 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction.cross.utils;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.task.SubCompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.modification.Modification;
@@ -171,6 +172,7 @@ public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor
 
   @Override
   void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException {
+    CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
     chunkMetadataElement.chunk =
         readerCacheMap
             .get(chunkMetadataElement.fileElement.resource)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 9a54d45c64..4c19e415dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -162,8 +162,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
 
   protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId)
       throws IOException {
-    CompactionTaskManager.compactionIORateLimiterAcquire(
-        compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
+    CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
     synchronized (targetWriter) {
       iChunkWriter.writeToFileWriter(targetWriter);
     }
@@ -184,12 +183,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   protected void flushNonAlignedChunkToFileWriter(
       TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId)
       throws IOException {
-    CompactionTaskManager.compactionIORateLimiterAcquire(
-        compactionRateLimiter, getChunkSize(chunk));
     synchronized (targetWriter) {
       // seal last chunk to file writer
       chunkWriters[subTaskId].writeToFileWriter(targetWriter);
       chunkPointNumArray[subTaskId] = 0;
+      CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
       targetWriter.writeChunk(chunk, chunkMetadata);
     }
   }
@@ -209,8 +207,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
       chunkPointNumArray[subTaskId] = 0;
 
       // flush time chunk
-      CompactionTaskManager.compactionIORateLimiterAcquire(
-          compactionRateLimiter, getChunkSize(timeChunk));
+      CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
       targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata);
 
       // flush value chunks
@@ -227,8 +224,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
               valueChunkWriter.getStatistics());
           continue;
         }
-        CompactionTaskManager.compactionIORateLimiterAcquire(
-            compactionRateLimiter, getChunkSize(valueChunk));
+        CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
         targetWriter.writeChunk(valueChunk, (ChunkMetadata) valueChunkMetadatas.get(i));
       }
     }