You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/19 03:36:27 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1][IOTDB-5776]Update memory estimation of cross space compaction (#9628)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 21338fe774 [To rel/1.1][IOTDB-5776]Update memory estimation of cross space compaction (#9628)
21338fe774 is described below

commit 21338fe7741abbf673c5f6652e69510d58614ad9
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Wed Apr 19 11:36:21 2023 +0800

    [To rel/1.1][IOTDB-5776]Update memory estimation of cross space compaction (#9628)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  15 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 ++
 .../estimator/AbstractCompactionEstimator.java     |   6 ++
 .../ReadPointCrossCompactionEstimator.java         | 108 +++++++++++++--------
 .../impl/RewriteCrossSpaceCompactionSelector.java  |   7 +-
 .../utils/CrossCompactionTaskResource.java         |  12 +++
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |  15 +--
 .../cross/RewriteCompactionFileSelectorTest.java   |   1 +
 8 files changed, 118 insertions(+), 52 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5650b54638..73af082000 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -428,6 +428,13 @@ public class IoTDBConfig {
    */
   private CompactionPriority compactionPriority = CompactionPriority.BALANCE;
 
+  /**
+   * Enable compaction memory control or not. If true and estimated memory size of one compaction
+   * task exceeds the threshold, system will block the compaction. It only works for cross space
+   * compaction currently.
+   */
+  private boolean enableCompactionMemControl = true;
+
   private double chunkMetadataSizeProportion = 0.1;
 
   /** The target tsfile size in compaction, 1 GB by default */
@@ -2734,6 +2741,14 @@ public class IoTDBConfig {
     this.compactionPriority = compactionPriority;
   }
 
+  public boolean isEnableCompactionMemControl() {
+    return enableCompactionMemControl;
+  }
+
+  public void setEnableCompactionMemControl(boolean enableCompactionMemControl) {
+    this.enableCompactionMemControl = enableCompactionMemControl;
+  }
+
   public long getTargetCompactionFileSize() {
     return targetCompactionFileSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5b99924801..c45cfd7ee6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -500,6 +500,12 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "compaction_priority", conf.getCompactionPriority().toString())));
 
+    conf.setEnableCompactionMemControl(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_compaction_mem_control",
+                Boolean.toString(conf.isEnableCompactionMemControl()))));
+
     int subtaskNum =
         Integer.parseInt(
             properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java
index f4b085deb9..50be5b5872 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/AbstractCompactionEstimator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.engine.compaction.selector.estimator;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
@@ -34,6 +36,10 @@ public abstract class AbstractCompactionEstimator {
 
   protected Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
 
+  protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  protected int compressionRatio = 5;
+
   /**
    * Estimate the memory cost of compacting the unseq file and its corresponding overlapped seq
    * files in cross space compaction task.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java
index d50e5a3af2..2ba25aab69 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/estimator/ReadPointCrossCompactionEstimator.java
@@ -23,13 +23,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -41,13 +39,10 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
   // task
   private long maxCostOfReadingSeqFile;
 
-  // left is the max chunk num in chunkgroup of unseq file, right is the total chunk num of unseq
-  // file.
-  private Pair<Integer, Integer> maxUnseqChunkNumInDevice;
+  // the max cost of writing target file
+  private long maxCostOfWritingTargetFile;
 
-  // it stores all chunk info of seq files. Left is the max chunk num in chunkgroup of seq file,
-  // right is the total chunk num of seq file.
-  private final List<Pair<Integer, Integer>> maxSeqChunkNumInDeviceList;
+  private int maxConcurrentSeriesNum = 1;
 
   // the number of timeseries being compacted at the same time
   private final int subCompactionTaskNum =
@@ -55,7 +50,7 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
 
   public ReadPointCrossCompactionEstimator() {
     this.maxCostOfReadingSeqFile = 0;
-    this.maxSeqChunkNumInDeviceList = new ArrayList<>();
+    this.maxCostOfWritingTargetFile = 0;
   }
 
   @Override
@@ -65,7 +60,6 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
     cost += calculateReadingUnseqFile(unseqResource);
     cost += calculateReadingSeqFiles(seqResources);
     cost += calculatingWritingTargetFiles(seqResources, unseqResource);
-    maxSeqChunkNumInDeviceList.clear();
     return cost;
   }
 
@@ -75,25 +69,28 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
    */
   private long calculateReadingUnseqFile(TsFileResource unseqResource) throws IOException {
     TsFileSequenceReader reader = getFileReader(unseqResource);
-    int[] fileInfo = getSeriesAndDeviceChunkNum(reader);
+    FileInfo fileInfo = getSeriesAndDeviceChunkNum(reader);
     // it is max aligned series num of one device when tsfile contains aligned series,
     // else is sub compaction task num.
-    int concurrentSeriesNum = fileInfo[2] == -1 ? subCompactionTaskNum : fileInfo[2];
-    maxUnseqChunkNumInDevice = new Pair<>(fileInfo[3], fileInfo[0]);
-    // it means the max size of a timeseries in this file when reading all of its chunk into memory.
-    // Not only reading chunk into chunk cache, but also need to deserialize data point into merge
-    // reader, so we have to double the cost here.
-    if (fileInfo[0] == 0) { // If totalChunkNum ==0, i.e. this unSeq tsFile has no chunk.
+    int concurrentSeriesNum =
+        fileInfo.maxAlignedSeriesNumInDevice == -1
+            ? subCompactionTaskNum
+            : fileInfo.maxAlignedSeriesNumInDevice;
+    maxConcurrentSeriesNum = Math.max(maxConcurrentSeriesNum, concurrentSeriesNum);
+    if (fileInfo.totalChunkNum == 0) { // If totalChunkNum ==0, i.e. this unSeq tsFile has no chunk.
       logger.warn(
           "calculateReadingUnseqFile(), find 1 empty unSeq tsFile: {}.",
           unseqResource.getTsFilePath());
       return 0;
     }
-    return 2 * concurrentSeriesNum * (unseqResource.getTsFileSize() * fileInfo[1] / fileInfo[0]);
+    // it means the max size of a timeseries in this file when reading all of its chunk into memory.
+    return compressionRatio
+        * concurrentSeriesNum
+        * (unseqResource.getTsFileSize() * fileInfo.maxSeriesChunkNum / fileInfo.totalChunkNum);
   }
 
   /**
-   * Calculate memory cost of reading source seq files in the cross space compaction. Double the
+   * Calculate memory cost of reading source seq files in the cross space compaction. Select the
    * maximun size of the timeseries to be compacted at the same time in one seq file, because only
    * one seq file will be queried at the same time.
    */
@@ -101,38 +98,41 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
     long cost = 0;
     for (TsFileResource seqResource : seqResources) {
       TsFileSequenceReader reader = getFileReader(seqResource);
-      int[] fileInfo = getSeriesAndDeviceChunkNum(reader);
+      FileInfo fileInfo = getSeriesAndDeviceChunkNum(reader);
       // it is max aligned series num of one device when tsfile contains aligned series,
       // else is sub compaction task num.
-      int concurrentSeriesNum = fileInfo[2] == -1 ? subCompactionTaskNum : fileInfo[2];
-      long seqFileCost = 0;
-      if (fileInfo[0] == 0) { // If totalChunkNum ==0, i.e. this seq tsFile has no chunk.
+      int concurrentSeriesNum =
+          fileInfo.maxAlignedSeriesNumInDevice == -1
+              ? subCompactionTaskNum
+              : fileInfo.maxAlignedSeriesNumInDevice;
+      maxConcurrentSeriesNum = Math.max(maxConcurrentSeriesNum, concurrentSeriesNum);
+      long seqFileCost;
+      if (fileInfo.totalChunkNum == 0) { // If totalChunkNum ==0, i.e. this seq tsFile has no chunk.
         logger.warn(
             "calculateReadingSeqFiles(), find 1 empty seq tsFile: {}.",
             seqResource.getTsFilePath());
         seqFileCost = 0;
       } else {
-        seqFileCost =
-            concurrentSeriesNum * (seqResource.getTsFileSize() * fileInfo[1] / fileInfo[0]);
+        // We need to multiply the compression ratio here.
+        seqFileCost = compressionRatio * concurrentSeriesNum * config.getTargetChunkSize();
       }
 
       if (seqFileCost > maxCostOfReadingSeqFile) {
         // Only one seq file will be read at the same time.
         // not only reading chunk into chunk cache, but also need to deserialize data point into
-        // merge reader, so we have to double the cost here.
-        cost -= 2 * maxCostOfReadingSeqFile;
-        cost += 2 * seqFileCost;
+        // merge reader. We have to add the cost in merge reader here and the cost of chunk cache is
+        // unnecessary.
+        cost -= maxCostOfReadingSeqFile;
+        cost += seqFileCost;
         maxCostOfReadingSeqFile = seqFileCost;
       }
-      maxSeqChunkNumInDeviceList.add(new Pair<>(fileInfo[3], fileInfo[0]));
     }
     return cost;
   }
 
   /**
    * Calculate memory cost of writing target files in the cross space compaction. Including metadata
-   * size of all seq files, max chunk group size of each seq file and max chunk group size of
-   * corresponding overlapped unseq file.
+   * size of all source files and size of concurrent target chunks.
    */
   private long calculatingWritingTargetFiles(
       List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException {
@@ -141,17 +141,16 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
       TsFileSequenceReader reader = getFileReader(seqResource);
       // add seq file metadata size
       cost += reader.getFileMetadataSize();
-      // add max chunk group size of this seq tsfile
-      int totalSeqChunkNum = maxSeqChunkNumInDeviceList.get(0).right;
-      if (totalSeqChunkNum > 0) {
-        cost +=
-            seqResource.getTsFileSize() * maxSeqChunkNumInDeviceList.get(0).left / totalSeqChunkNum;
-      }
     }
-    // add max chunk group size of overlapped unseq tsfile
-    int totalUnSeqChunkNum = maxUnseqChunkNumInDevice.right;
-    if (totalUnSeqChunkNum > 0) {
-      cost += unseqResource.getTsFileSize() * maxUnseqChunkNumInDevice.left / totalUnSeqChunkNum;
+    // add unseq file metadata size
+    cost += getFileReader(unseqResource).getFileMetadataSize();
+
+    // concurrent series chunk size
+    long writingTargetCost = maxConcurrentSeriesNum * config.getTargetChunkSize();
+    if (writingTargetCost > maxCostOfWritingTargetFile) {
+      cost -= maxCostOfWritingTargetFile;
+      cost += writingTargetCost;
+      maxCostOfWritingTargetFile = writingTargetCost;
     }
 
     return cost;
@@ -169,7 +168,7 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
    *
    * <p>max chunk num of one device in this tsfile
    */
-  private int[] getSeriesAndDeviceChunkNum(TsFileSequenceReader reader) throws IOException {
+  private FileInfo getSeriesAndDeviceChunkNum(TsFileSequenceReader reader) throws IOException {
     int totalChunkNum = 0;
     int maxChunkNum = 0;
     int maxAlignedSeriesNumInDevice = -1;
@@ -190,6 +189,29 @@ public class ReadPointCrossCompactionEstimator extends AbstractCrossSpaceEstimat
       }
       maxDeviceChunkNum = Math.max(maxDeviceChunkNum, deviceChunkNum);
     }
-    return new int[] {totalChunkNum, maxChunkNum, maxAlignedSeriesNumInDevice, maxDeviceChunkNum};
+    return new FileInfo(totalChunkNum, maxChunkNum, maxAlignedSeriesNumInDevice, maxDeviceChunkNum);
+  }
+
+  private class FileInfo {
+    // total chunk num in this tsfile
+    public int totalChunkNum = 0;
+    // max chunk num of one timeseries in this tsfile
+    public int maxSeriesChunkNum = 0;
+    // max aligned series num in one device. If there is no aligned series in this file, then it
+    // turns to be -1.
+    public int maxAlignedSeriesNumInDevice = -1;
+    // max chunk num of one device in this tsfile
+    public int maxDeviceChunkNum = 0;
+
+    public FileInfo(
+        int totalChunkNum,
+        int maxSeriesChunkNum,
+        int maxAlignedSeriesNumInDevice,
+        int maxDeviceChunkNum) {
+      this.totalChunkNum = totalChunkNum;
+      this.maxSeriesChunkNum = maxSeriesChunkNum;
+      this.maxAlignedSeriesNumInDevice = maxAlignedSeriesNumInDevice;
+      this.maxDeviceChunkNum = maxDeviceChunkNum;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index f96f52ac64..9f63c7f108 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -279,7 +279,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
         return Collections.emptyList();
       }
       LOGGER.info(
-          "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Selected source files: {} seqFiles, {} unseqFiles, total memory cost {}, time consumption {}ms.",
+          "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Selected source files: {} seqFiles, {} unseqFiles, total memory cost {} MB, total selected file size is {} MB, total selected seq file size is {} MB, total selected unseq file size is {} MB, time consumption {}ms.",
           logicalStorageGroupName + "-" + dataRegionId,
           sequenceFileList.size(),
           unsequenceFileList.size(),
@@ -287,7 +287,10 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
           candidate.getUnseqFiles().size(),
           taskResources.getSeqFiles().size(),
           taskResources.getUnseqFiles().size(),
-          taskResources.getTotalMemoryCost(),
+          (float) (taskResources.getTotalMemoryCost()) / 1024 / 1024,
+          (float) (taskResources.getTotalFileSize()) / 1024 / 1024,
+          taskResources.getTotalSeqFileSize() / 1024 / 1024,
+          taskResources.getTotalUnseqFileSize() / 1024 / 1024,
           System.currentTimeMillis() - startTime);
       hasPrintedLog = false;
       return Collections.singletonList(taskResources);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossCompactionTaskResource.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossCompactionTaskResource.java
index 3a7e0afef5..1c9ae6f3d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossCompactionTaskResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossCompactionTaskResource.java
@@ -32,6 +32,8 @@ public class CrossCompactionTaskResource {
 
   private long totalMemoryCost;
   private long totalFileSize;
+  private float totalSeqFileSize;
+  private float totalUnseqFileSize;
   private long totalFileNums;
 
   public CrossCompactionTaskResource() {
@@ -72,6 +74,7 @@ public class CrossCompactionTaskResource {
 
   private void addUnseqFile(TsFileResource file) {
     unseqFiles.add(file);
+    totalUnseqFileSize += file.getTsFileSize();
     countStatistic(file);
   }
 
@@ -81,6 +84,7 @@ public class CrossCompactionTaskResource {
 
   private void addSeqFile(TsFileResource file) {
     seqFiles.add(file);
+    totalSeqFileSize += file.getTsFileSize();
     countStatistic(file);
   }
 
@@ -105,6 +109,14 @@ public class CrossCompactionTaskResource {
     return totalFileSize;
   }
 
+  public float getTotalSeqFileSize() {
+    return totalSeqFileSize;
+  }
+
+  public float getTotalUnseqFileSize() {
+    return totalUnseqFileSize;
+  }
+
   public long getTotalFileNums() {
     return totalFileNums;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index cad4629046..d85531a509 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -182,13 +182,14 @@ public class SystemInfo {
   }
 
   public void addCompactionMemoryCost(long memoryCost) throws InterruptedException {
-    if (config.isEnableMemControl()) {
-      long originSize = this.compactionMemoryCost.get();
-      while (originSize + memoryCost > memorySizeForCompaction
-          || !compactionMemoryCost.compareAndSet(originSize, originSize + memoryCost)) {
-        Thread.sleep(100);
-        originSize = this.compactionMemoryCost.get();
-      }
+    if (!config.isEnableCompactionMemControl()) {
+      return;
+    }
+    long originSize = this.compactionMemoryCost.get();
+    while (originSize + memoryCost > memorySizeForCompaction
+        || !compactionMemoryCost.compareAndSet(originSize, originSize + memoryCost)) {
+      Thread.sleep(100);
+      originSize = this.compactionMemoryCost.get();
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
index 11bb18a7de..f94c78e0a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
@@ -67,6 +67,7 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
   public void setUp() throws IOException, MetadataException, WriteProcessException {
     super.setUp();
     IoTDBDescriptor.getInstance().getConfig().setMinCrossCompactionUnseqFileLevel(0);
+    IoTDBDescriptor.getInstance().getConfig().setCompactionThreadCount(1);
   }
 
   @After