You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/01/09 02:48:48 UTC

[iotdb] branch revert_limit_013 created (now d85e0a375f)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch revert_limit_013
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at d85e0a375f Revert "[IOTDB-5209] Limit the read rate of compaction execution"

This branch includes the following new commits:

     new d85e0a375f Revert "[IOTDB-5209] Limit the read rate of compaction execution"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Revert "[IOTDB-5209] Limit the read rate of compaction execution"

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch revert_limit_013
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d85e0a375fed335b2b84d79e79a78960ee13d083
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Jan 9 10:48:33 2023 +0800

    Revert "[IOTDB-5209] Limit the read rate of compaction execution"
    
    This reverts commit c5d91a8c284eff1674b42fa049a2b3beec1df616.
---
 docs/UserGuide/Reference/Config-Manual.md                | 16 ++++++++--------
 docs/zh/UserGuide/Reference/Config-Manual.md             | 16 ++++++++--------
 .../src/assembly/resources/conf/iotdb-engine.properties  |  4 ++--
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java  | 12 ++++++------
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java   |  8 ++++----
 .../db/engine/compaction/CompactionTaskManager.java      | 13 +++++++------
 .../iotdb/db/engine/compaction/CompactionUtils.java      |  3 ---
 .../compaction/cross/rewrite/task/SubCompactionTask.java |  2 --
 .../inner/utils/AlignedSeriesCompactionExecutor.java     |  9 +++++----
 .../inner/utils/SingleSeriesCompactionExecutor.java      | 12 ++++++------
 .../compaction/writer/AbstractCompactionWriter.java      |  3 ++-
 .../compaction/utils/CompactionConfigRestorer.java       |  2 +-
 12 files changed, 49 insertions(+), 51 deletions(-)

diff --git a/docs/UserGuide/Reference/Config-Manual.md b/docs/UserGuide/Reference/Config-Manual.md
index 0694d40d31..5596cb2098 100644
--- a/docs/UserGuide/Reference/Config-Manual.md
+++ b/docs/UserGuide/Reference/Config-Manual.md
@@ -913,14 +913,14 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
 |Default| 60000 |
 |Effective|After restart system|
 
-* compaction\_io\_rate\_per\_sec
-
-|Name| compaction\_io\_rate\_per\_sec                    |
-|:---:|:--------------------------------------------------|
-|Description| The io rate of all compaction tasks in one second |
-|Type| Int32                                             |
-|Default| 50                                                |
-|Effective| After restart system                              |
+* compaction\_write\_throughput\_mb\_per\_sec
+
+|Name| compaction\_write\_throughput\_mb\_per\_sec |
+|:---:|:---|
+|Description| The write rate of all compaction tasks in MB/s |
+|Type| Int32 |
+|Default| 16 |
+|Effective|After restart system|
 
 ### Insertion
 
diff --git a/docs/zh/UserGuide/Reference/Config-Manual.md b/docs/zh/UserGuide/Reference/Config-Manual.md
index 5fa2653b8b..33f71497b7 100644
--- a/docs/zh/UserGuide/Reference/Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Config-Manual.md
@@ -952,14 +952,14 @@ Server,客户端的使用方式详见 [SQL 命令行终端(CLI)](https://i
 |默认值| 60000 |
 |改后生效方式|重启服务生效|
 
-* compaction\_io\_rate\_per\_sec
-
-|名字| compaction\_io\_rate\_per\_sec |
-|:---:|:-------------------------------|
-|描述| 合并每秒 IO 的次数                    |
-|类型| Int32                          |
-|默认值| 50                             |
-|改后生效方式| 重启服务生效                         |
+* compaction\_write\_throughput\_mb\_per\_sec
+
+|名字| compaction\_write\_throughput\_mb\_per\_sec |
+|:---:|:---|
+|描述| 每秒可达到的写入吞吐量合并限制。|
+|类型| Int32 |
+|默认值| 16 |
+|改后生效方式| 重启服务生效|
 
 
 #### 插入配置
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 33f4926348..ac166426b3 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -509,9 +509,9 @@ timestamp_precision=ms
 # Datatype: long, Unit: ms
 # compaction_submission_interval_in_ms=60000
 
-# The limit of io rate of compaction can reach per second
+# The limit of write throughput merge can reach per second
 # Datatype: int
-# compaction_io_rate_per_sec=50
+# compaction_write_throughput_mb_per_sec=16
 
 # The maximum session idle time. unit: ms
 # Idle sessions are the ones that performs neither query or non-query operations for a period of time
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c0ce1248d5..b195d3df55 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -600,8 +600,8 @@ public class IoTDBConfig {
    */
   private long mergeIntervalSec = 0L;
 
-  /** The limit of compaction io times can reach per second */
-  private int compactionIORatePerSec = 50;
+  /** The limit of compaction merge can reach per second */
+  private int compactionWriteThroughputMbPerSec = 16;
 
   /**
    * How many thread will be set up to perform compaction, 10 by default. Set to 1 when less than or
@@ -1718,12 +1718,12 @@ public class IoTDBConfig {
         insertMultiTabletEnableMultithreadingColumnThreshold;
   }
 
-  public int getCompactionIORatePerSec() {
-    return compactionIORatePerSec;
+  public int getCompactionWriteThroughputMbPerSec() {
+    return compactionWriteThroughputMbPerSec;
   }
 
-  public void setCompactionIORatePerSec(int compactionIORatePerSec) {
-    this.compactionIORatePerSec = compactionIORatePerSec;
+  public void setCompactionWriteThroughputMbPerSec(int compactionWriteThroughputMbPerSec) {
+    this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
   }
 
   public boolean isEnableMemControl() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 1d0a89d2b9..2616bbb15e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -591,11 +591,11 @@ public class IoTDBDescriptor {
                 "max_cross_compaction_candidate_file_size",
                 Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
 
-    conf.setCompactionIORatePerSec(
+    conf.setCompactionWriteThroughputMbPerSec(
         Integer.parseInt(
             properties.getProperty(
                 "compaction_write_throughput_mb_per_sec",
-                Integer.toString(conf.getCompactionIORatePerSec()))));
+                Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
     conf.setEnablePartialInsert(
         Boolean.parseBoolean(
@@ -1364,11 +1364,11 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "slow_query_threshold", Long.toString(conf.getSlowQueryThreshold()))));
       // update merge_write_throughput_mb_per_sec
-      conf.setCompactionIORatePerSec(
+      conf.setCompactionWriteThroughputMbPerSec(
           Integer.parseInt(
               properties.getProperty(
                   "merge_write_throughput_mb_per_sec",
-                  Integer.toString(conf.getCompactionIORatePerSec()))));
+                  Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
       // update insert-tablet-plan's row limit for select-into
       conf.setSelectIntoInsertTabletPlanRowLimit(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 7fdd197fc0..ddac1512ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -83,7 +83,7 @@ public class CompactionTaskManager implements IService {
   private final long TASK_SUBMIT_INTERVAL =
       IoTDBDescriptor.getInstance().getConfig().getCompactionSubmissionIntervalInMs();
 
-  private final RateLimiter compactionIORateLimiter = RateLimiter.create(Double.MAX_VALUE);
+  private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
 
   public static CompactionTaskManager getInstance() {
     return INSTANCE;
@@ -274,9 +274,10 @@ public class CompactionTaskManager implements IService {
     }
   }
 
-  public RateLimiter getCompactionIORateLimiter() {
-    setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getCompactionIORatePerSec());
-    return compactionIORateLimiter;
+  public RateLimiter getMergeWriteRateLimiter() {
+    setWriteMergeRate(
+        IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
+    return mergeWriteRateLimiter;
   }
 
   private void setWriteMergeRate(final double throughoutMbPerSec) {
@@ -285,8 +286,8 @@ public class CompactionTaskManager implements IService {
     if (throughout == 0) {
       throughout = Double.MAX_VALUE;
     }
-    if (compactionIORateLimiter.getRate() != throughout) {
-      compactionIORateLimiter.setRate(throughout);
+    if (mergeWriteRateLimiter.getRate() != throughout) {
+      mergeWriteRateLimiter.setRate(throughout);
     }
   }
   /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 2a786d7d95..7c94873747 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -150,9 +150,6 @@ public class CompactionUtils {
       compactionWriter.startChunkGroup(device, true);
       compactionWriter.startMeasurement(measurementSchemas, 0);
       writeWithReader(compactionWriter, dataBatchReader, 0);
-      CompactionTaskManager.getInstance()
-          .getCompactionIORateLimiter()
-          .acquire(measurementSchemas.size() + 1);
       compactionWriter.endMeasurement(0);
       compactionWriter.endChunkGroup();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
index f75e934b87..00e7eaa011 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.CompactionUtils;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -92,7 +91,6 @@ public class SubCompactionTask implements Callable<Void> {
         if (dataBatchReader.hasNextBatch()) {
           compactionWriter.startMeasurement(measurementSchemas, taskId);
           CompactionUtils.writeWithReader(compactionWriter, dataBatchReader, taskId);
-          CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
           compactionWriter.endMeasurement(taskId);
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index 990b9815c0..fd1ec218b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -58,7 +58,7 @@ public class AlignedSeriesCompactionExecutor {
   private final List<IMeasurementSchema> schemaList;
   private long remainingPointInChunkWriter = 0L;
   private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
 
   private final long chunkSizeThreshold =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -120,7 +120,6 @@ public class AlignedSeriesCompactionExecutor {
       TsFileAlignedSeriesReaderIterator readerIterator =
           new TsFileAlignedSeriesReaderIterator(reader, alignedChunkMetadataList, schemaList);
       while (readerIterator.hasNext()) {
-        rateLimiter.acquire(schemaList.size() + 1);
         Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader();
         CompactionMetricsManager.recordReadInfo(chunkReaderAndChunkSize.right);
         compactOneAlignedChunk(chunkReaderAndChunkSize.left);
@@ -128,7 +127,8 @@ public class AlignedSeriesCompactionExecutor {
     }
 
     if (remainingPointInChunkWriter != 0L) {
-      rateLimiter.acquire(schemaList.size() + 1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsManager.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -167,7 +167,8 @@ public class AlignedSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (remainingPointInChunkWriter >= chunkPointNumThreshold
         || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) {
-      rateLimiter.acquire(schemaList.size() + 1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsManager.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index fa9ee0efd8..2d49094f44 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -58,13 +58,11 @@ public class SingleSeriesCompactionExecutor {
   private Chunk cachedChunk;
   private ChunkMetadata cachedChunkMetadata;
   private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
   // record the min time and max time to update the target resource
   private long minStartTimestamp = Long.MAX_VALUE;
   private long maxEndTimestamp = Long.MIN_VALUE;
   private long pointCountInChunkWriter = 0;
-  private final RateLimiter ioLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
 
   private final long targetChunkSize =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -284,7 +282,7 @@ public class SingleSeriesCompactionExecutor {
 
   private void flushChunkToFileWriter(
       Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws IOException {
-    ioLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk));
     if (chunkMetadata.getStartTime() < minStartTimestamp) {
       minStartTimestamp = chunkMetadata.getStartTime();
     }
@@ -302,7 +300,8 @@ public class SingleSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (pointCountInChunkWriter >= targetChunkPointNum
         || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
-      ioLimiter.acquire(1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsManager.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -323,7 +322,8 @@ public class SingleSeriesCompactionExecutor {
   }
 
   private void flushChunkWriter() throws IOException {
-    ioLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(
+        compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
     CompactionMetricsManager.recordWriteInfo(
         CompactionType.INNER_SEQ_COMPACTION,
         ProcessChunkType.DESERIALIZE_CHUNK,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 683001b31c..72096069e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -142,6 +142,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
 
   protected void flushChunkToFileWriter(TsFileIOWriter targetWriter, int subTaskId)
       throws IOException {
+    writeRateLimit(chunkWriters[subTaskId].estimateMaxSeriesMemSize());
     synchronized (targetWriter) {
       chunkWriters[subTaskId].writeToFileWriter(targetWriter);
     }
@@ -172,7 +173,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
 
   protected void writeRateLimit(long bytesLength) {
     CompactionTaskManager.mergeRateLimiterAcquire(
-        CompactionTaskManager.getInstance().getCompactionIORateLimiter(), bytesLength);
+        CompactionTaskManager.getInstance().getMergeWriteRateLimiter(), bytesLength);
   }
 
   public abstract List<TsFileIOWriter> getFileIOWriter();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
index 2f6110e8a7..014ad1afd5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
@@ -64,6 +64,6 @@ public class CompactionConfigRestorer {
     config.setConcurrentCompactionThread(concurrentCompactionThread);
     config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs);
     config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs);
-    config.setCompactionIORatePerSec(compactionWriteThroughputMbPerSec);
+    config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec);
   }
 }