You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/12/12 05:15:25 UTC
[iotdb] branch rel/1.0 updated: [IOTDB-4595] Add monitoring for compaction temporal files
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new 9df2aa939a [IOTDB-4595] Add monitoring for compaction temporal files
9df2aa939a is described below
commit 9df2aa939a3fcc99f93858cac57cec6e0e7bb155
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Mon Dec 12 13:15:19 2022 +0800
[IOTDB-4595] Add monitoring for compaction temporal files
---
.../iotdb/db/engine/TsFileMetricManager.java | 54 ++++++++++++++--
.../utils/AlignedSeriesCompactionExecutor.java | 6 ++
.../utils/SingleSeriesCompactionExecutor.java | 1 +
.../performer/impl/FastCompactionPerformer.java | 16 ++++-
.../impl/ReadChunkCompactionPerformer.java | 11 ++++
.../impl/ReadPointCompactionPerformer.java | 19 ++++++
.../writer/AbstractCompactionWriter.java | 2 +
.../writer/AbstractCrossCompactionWriter.java | 9 +++
.../writer/AbstractInnerCompactionWriter.java | 5 ++
.../iotdb/db/service/metrics/FileMetrics.java | 73 ++++++++++++++++++++++
10 files changed, 191 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/TsFileMetricManager.java b/server/src/main/java/org/apache/iotdb/db/engine/TsFileMetricManager.java
index afc6018e43..e08a063e0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/TsFileMetricManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/TsFileMetricManager.java
@@ -27,10 +27,18 @@ import java.util.concurrent.atomic.AtomicLong;
/** This class collect the number and size of tsfile, and send it to the {@link FileMetrics} */
public class TsFileMetricManager {
private static final TsFileMetricManager INSTANCE = new TsFileMetricManager();
- private AtomicLong seqFileSize = new AtomicLong(0);
- private AtomicLong unseqFileSize = new AtomicLong(0);
- private AtomicInteger seqFileNum = new AtomicInteger(0);
- private AtomicInteger unseqFileNum = new AtomicInteger(0);
+ private final AtomicLong seqFileSize = new AtomicLong(0);
+ private final AtomicLong unseqFileSize = new AtomicLong(0);
+ private final AtomicInteger seqFileNum = new AtomicInteger(0);
+ private final AtomicInteger unseqFileNum = new AtomicInteger(0);
+
+ // compaction temporal files
+ private final AtomicLong innerSeqCompactionTempFileSize = new AtomicLong(0);
+ private final AtomicLong innerUnseqCompactionTempFileSize = new AtomicLong(0);
+ private final AtomicLong crossCompactionTempFileSize = new AtomicLong(0);
+ private final AtomicInteger innerSeqCompactionTempFileNum = new AtomicInteger(0);
+ private final AtomicInteger innerUnseqCompactionTempFileNum = new AtomicInteger(0);
+ private final AtomicInteger crossCompactionTempFileNum = new AtomicInteger(0);
private TsFileMetricManager() {}
@@ -65,4 +73,42 @@ public class TsFileMetricManager {
public long getFileNum(boolean seq) {
return seq ? seqFileNum.get() : unseqFileNum.get();
}
+
+ public void addCompactionTempFileSize(boolean innerSpace, boolean seq, long delta) {
+ if (innerSpace) {
+ long unused =
+ seq
+ ? innerSeqCompactionTempFileSize.addAndGet(delta)
+ : innerUnseqCompactionTempFileSize.addAndGet(delta);
+ } else {
+ crossCompactionTempFileSize.addAndGet(delta);
+ }
+ }
+
+ public void addCompactionTempFileNum(boolean innerSpace, boolean seq, int delta) {
+ if (innerSpace) {
+ long unused =
+ seq
+ ? innerSeqCompactionTempFileNum.addAndGet(delta)
+ : innerUnseqCompactionTempFileNum.addAndGet(delta);
+ } else {
+ crossCompactionTempFileNum.addAndGet(delta);
+ }
+ }
+
+ public long getCompactionTempFileSize(boolean innerSpace, boolean seq) {
+ if (innerSpace) {
+ return seq ? innerSeqCompactionTempFileSize.get() : innerUnseqCompactionTempFileSize.get();
+ } else {
+ return crossCompactionTempFileSize.get();
+ }
+ }
+
+ public int getCompactionTempFileNum(boolean innerSpace, boolean seq) {
+ if (innerSpace) {
+ return seq ? innerSeqCompactionTempFileNum.get() : innerUnseqCompactionTempFileNum.get();
+ } else {
+ return crossCompactionTempFileNum.get();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index 3de4c64a36..3f4d475679 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.compaction.inner.utils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.TsFileMetricManager;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.constant.CompactionType;
@@ -127,6 +128,7 @@ public class AlignedSeriesCompactionExecutor {
}
public void execute() throws IOException {
+ long originTempFileSize = writer.getPos();
while (readerAndChunkMetadataList.size() > 0) {
Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair =
readerAndChunkMetadataList.removeFirst();
@@ -152,6 +154,10 @@ public class AlignedSeriesCompactionExecutor {
chunkWriter.writeToFileWriter(writer);
}
writer.checkMetadataSizeAndMayFlush();
+
+ // update temporal file metrics
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileSize(true, true, writer.getPos() - originTempFileSize);
}
private void compactOneAlignedChunk(AlignedChunkReader chunkReader) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index d1d4a366e7..9575757906 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -111,6 +111,7 @@ public class SingleSeriesCompactionExecutor {
* series compaction may contain more than one chunk.
*/
public void execute() throws IOException {
+ long originTempFileSize = fileWriter.getPos();
while (readerAndChunkMetadataList.size() > 0) {
Pair<TsFileSequenceReader, List<ChunkMetadata>> readerListPair =
readerAndChunkMetadataList.removeFirst();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java
index 60650e6b0c..84f77b2a82 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.TsFileMetricManager;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.FastCompactionPerformerSubTask;
@@ -84,6 +85,8 @@ public class FastCompactionPerformer
private boolean isCrossCompaction;
+ private long tempFileSize = 0L;
+
public FastCompactionPerformer(
List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles,
@@ -106,6 +109,8 @@ public class FastCompactionPerformer
@Override
public void perform()
throws IOException, MetadataException, StorageEngineException, InterruptedException {
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileNum(!isCrossCompaction, seqFiles.size() > 0, targetFiles.size());
try (MultiTsFileDeviceIterator deviceIterator =
new MultiTsFileDeviceIterator(seqFiles, unseqFiles, readerCacheMap);
AbstractCompactionWriter compactionWriter =
@@ -117,7 +122,6 @@ public class FastCompactionPerformer
Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
String device = deviceInfo.left;
boolean isAligned = deviceInfo.right;
-
// sort the resources by the start time of current device from old to new, and remove
// resource that does not contain the current device. Notice: when the level of time index
// is file, there will be a false positive judgment problem, that is, the device does not
@@ -138,6 +142,12 @@ public class FastCompactionPerformer
compactionWriter.endChunkGroup();
// check whether to flush chunk metadata or not
compactionWriter.checkAndMayFlushChunkMetadata();
+ // Add temp file metrics
+ long currentTempFileSize = compactionWriter.getWriterSize();
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileSize(
+ !isCrossCompaction, seqFiles.size() > 0, currentTempFileSize - tempFileSize);
+ tempFileSize = currentTempFileSize;
sortedSourceFiles.clear();
}
compactionWriter.endFile();
@@ -150,6 +160,10 @@ public class FastCompactionPerformer
sortedSourceFiles = null;
readerCacheMap = null;
modificationCache = null;
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileNum(!isCrossCompaction, seqFiles.size() > 0, -targetFiles.size());
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileSize(!isCrossCompaction, seqFiles.size() > 0, -tempFileSize);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
index 389cb7f87c..443db403d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.TsFileMetricManager;
import org.apache.iotdb.db.engine.compaction.inner.utils.AlignedSeriesCompactionExecutor;
import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.compaction.inner.utils.SingleSeriesCompactionExecutor;
@@ -49,6 +50,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
private TsFileResource targetResource;
private List<TsFileResource> seqFiles;
private CompactionTaskSummary summary;
+ private long tempFileSize = 0L;
public ReadChunkCompactionPerformer(List<TsFileResource> sourceFiles, TsFileResource targetFile) {
this.seqFiles = sourceFiles;
@@ -70,6 +72,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
(SystemInfo.getInstance().getMemorySizeForCompaction()
/ IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
* IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
+ TsFileMetricManager.getInstance().addCompactionTempFileNum(true, true, 1);
try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles);
TsFileIOWriter writer =
new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) {
@@ -83,6 +86,11 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
} else {
compactNotAlignedSeries(device, targetResource, writer, deviceIterator);
}
+ // update temporal file metrics
+ long newTempFileSize = writer.getPos();
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileSize(true, true, newTempFileSize - tempFileSize);
+ tempFileSize = newTempFileSize;
}
for (TsFileResource tsFileResource : seqFiles) {
@@ -90,6 +98,9 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
}
writer.endFile();
targetResource.close();
+ } finally {
+ TsFileMetricManager.getInstance().addCompactionTempFileSize(true, true, -tempFileSize);
+ TsFileMetricManager.getInstance().addCompactionTempFileNum(true, true, -1);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index 34259131d1..3e7e2e9516 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.TsFileMetricManager;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.ReadPointPerformerSubTask;
@@ -75,6 +76,7 @@ public class ReadPointCompactionPerformer
private CompactionTaskSummary summary;
private List<TsFileResource> targetFiles = Collections.emptyList();
+ private long tempFileSize = 0L;
public ReadPointCompactionPerformer(
List<TsFileResource> seqFiles,
@@ -102,6 +104,8 @@ public class ReadPointCompactionPerformer
QueryResourceManager.getInstance()
.getQueryFileManager()
.addUsedFilesForQuery(queryId, queryDataSource);
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileNum(seqFiles.size() == 0, false, targetFiles.size());
try (AbstractCompactionWriter compactionWriter =
getCompactionWriter(seqFiles, unseqFiles, targetFiles)) {
// Do not close device iterator, because tsfile reader is managed by FileReaderManager.
@@ -128,6 +132,10 @@ public class ReadPointCompactionPerformer
} finally {
QueryResourceManager.getInstance().endQuery(queryId);
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileNum(seqFiles.size() == 0, false, -targetFiles.size());
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileSize(seqFiles.size() == 0, false, tempFileSize);
}
}
@@ -177,6 +185,11 @@ public class ReadPointCompactionPerformer
// check whether to flush chunk metadata or not
compactionWriter.checkAndMayFlushChunkMetadata();
}
+ // add temp file metrics
+ long currentWriterSize = compactionWriter.getWriterSize();
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileSize(seqFiles.size() == 0, false, currentWriterSize - tempFileSize);
+ tempFileSize = currentWriterSize;
}
private void compactNonAlignedSeries(
@@ -224,6 +237,12 @@ public class ReadPointCompactionPerformer
// check whether to flush chunk metadata or not
compactionWriter.checkAndMayFlushChunkMetadata();
}
+
+ // add temp file metrics
+ long currentWriterSize = compactionWriter.getWriterSize();
+ TsFileMetricManager.getInstance()
+ .addCompactionTempFileSize(seqFiles.size() == 0, false, currentWriterSize - tempFileSize);
+ tempFileSize = currentWriterSize;
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 24a82136ce..1a0973602d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -123,6 +123,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
public abstract void endFile() throws IOException;
+ public abstract long getWriterSize() throws IOException;
+
/**
* Update startTime and endTime of the current device in each target resources, and check whether
* to flush chunk metadatas or not.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java
index c00ca5d231..f1ef4433e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java
@@ -230,4 +230,13 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr
fileIndex++;
}
}
+
+ @Override
+ public long getWriterSize() throws IOException {
+ long totalSize = 0;
+ for (TsFileIOWriter writer : targetFileWriters) {
+ totalSize += writer.getPos();
+ }
+ return totalSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractInnerCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractInnerCompactionWriter.java
index 1d77d35ad8..5762c8f845 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractInnerCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractInnerCompactionWriter.java
@@ -105,4 +105,9 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr
public void checkAndMayFlushChunkMetadata() throws IOException {
fileWriter.checkMetadataSizeAndMayFlush();
}
+
+ @Override
+ public long getWriterSize() throws IOException {
+ return fileWriter.getPos();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
index 765e00452f..b6436d70b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
@@ -49,6 +49,13 @@ public class FileMetrics implements IMetricSet {
private long unsequenceFileTotalSize = 0L;
private long unsequenceFileTotalCount = 0L;
+ private long innerSeqCompactionTempFileSize = 0L;
+ private long innerUnseqCompactionTempFileSize = 0L;
+ private long crossCompactionTempFileSize = 0L;
+ private long innerSeqCompactionTempFileNum = 0L;
+ private long innerUnseqCompactionTempFileNum = 0L;
+ private long crossCompactionTempFileNum = 0L;
+
@Override
public void bindTo(AbstractMetricService metricService) {
metricService.createAutoGauge(
@@ -93,6 +100,48 @@ public class FileMetrics implements IMetricSet {
FileMetrics::getUnsequenceFileTotalCount,
Tag.NAME.toString(),
"unseq");
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ FileMetrics::getInnerSeqCompactionTempFileNum,
+ Tag.NAME.toString(),
+ "inner-seq-temp-num");
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ FileMetrics::getInnerUnseqCompactionTempFileNum,
+ Tag.NAME.toString(),
+ "inner-unseq-temp-num");
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ FileMetrics::getCrossCompactionTempFileNum,
+ Tag.NAME.toString(),
+ "cross-temp-num");
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ FileMetrics::getInnerSeqCompactionTempFileSize,
+ Tag.NAME.toString(),
+ "inner-seq-temp-size");
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ FileMetrics::getInnerUnseqCompactionTempFileSize,
+ Tag.NAME.toString(),
+ "inner-unseq-temp-size");
+ metricService.createAutoGauge(
+ Metric.FILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ FileMetrics::getCrossCompactionTempFileSize,
+ Tag.NAME.toString(),
+ "cross-temp-size");
// finally start to update the value of some metrics in async way
if (null == currentServiceFuture) {
@@ -162,4 +211,28 @@ public class FileMetrics implements IMetricSet {
public long getUnsequenceFileTotalCount() {
return unsequenceFileTotalCount;
}
+
+ public long getInnerSeqCompactionTempFileSize() {
+ return innerSeqCompactionTempFileSize;
+ }
+
+ public long getInnerUnseqCompactionTempFileSize() {
+ return innerUnseqCompactionTempFileSize;
+ }
+
+ public long getCrossCompactionTempFileSize() {
+ return crossCompactionTempFileSize;
+ }
+
+ public long getInnerSeqCompactionTempFileNum() {
+ return innerSeqCompactionTempFileNum;
+ }
+
+ public long getInnerUnseqCompactionTempFileNum() {
+ return innerUnseqCompactionTempFileNum;
+ }
+
+ public long getCrossCompactionTempFileNum() {
+ return crossCompactionTempFileNum;
+ }
}