You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/02/09 08:53:42 UTC
[iotdb] 01/01: refactor compaction metrics
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-5470
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c34828c46a0e7e488c0baa28bacc8dac135875fd
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Feb 9 16:53:23 2023 +0800
refactor compaction metrics
---
.../execute/task/AbstractCompactionTask.java | 14 +
.../execute/task/CrossSpaceCompactionTask.java | 6 +-
.../execute/task/InnerSpaceCompactionTask.java | 6 +-
.../readchunk/AlignedSeriesCompactionExecutor.java | 26 +-
.../readchunk/SingleSeriesCompactionExecutor.java | 42 +--
.../utils/writer/AbstractCompactionWriter.java | 15 +-
.../compaction/schedule/CompactionTaskManager.java | 15 +-
.../compaction/schedule/CompactionWorker.java | 7 +-
.../db/service/metrics/DataNodeMetricsHelper.java | 2 +
.../metrics/recorder/CompactionMetrics.java | 376 +++++++++++++++++++++
.../recorder/CompactionMetricsRecorder.java | 190 -----------
11 files changed, 455 insertions(+), 244 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
index 0583811d6e..ed47faedcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer;
import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,8 @@ public abstract class AbstractCompactionTask {
protected int hashCode = -1;
protected CompactionTaskSummary summary;
protected long serialId;
+ protected boolean crossTask;
+ protected boolean innerSeqTask;
public AbstractCompactionTask(
String storageGroupName,
@@ -71,6 +74,7 @@ public abstract class AbstractCompactionTask {
public void start() {
currentTaskNum.incrementAndGet();
boolean isSuccess = false;
+ CompactionMetrics.getInstance().reportTaskStartRunning(crossTask, innerSeqTask);
try {
summary.start();
doCompaction();
@@ -79,6 +83,8 @@ public abstract class AbstractCompactionTask {
this.currentTaskNum.decrementAndGet();
summary.finish(isSuccess);
CompactionTaskManager.getInstance().removeRunningTaskFuture(this);
+ CompactionMetrics.getInstance()
+ .reportTaskFinishOrAbort(crossTask, innerSeqTask, summary.getTimeCost());
}
}
@@ -150,4 +156,12 @@ public abstract class AbstractCompactionTask {
}
protected abstract void createSummary();
+
+ public boolean isCrossTask() {
+ return crossTask;
+ }
+
+ public boolean isInnerSeqTask() {
+ return innerSeqTask;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
index 034fd59b1d..6649627669 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.rescon.SystemInfo;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -90,6 +90,8 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
this.performer = performer;
this.hashCode = this.toString().hashCode();
this.memoryCost = memoryCost;
+ this.crossTask = true;
+ this.innerSeqTask = false;
createSummary();
}
@@ -232,7 +234,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
TsFileMetricManager.getInstance()
.deleteFile(unsequenceFileSize, false, selectedUnsequenceFiles.size());
- CompactionMetricsRecorder.updateSummary(summary);
+ CompactionMetrics.getInstance().updateSummary(summary);
long costTime = (System.currentTimeMillis() - startTime) / 1000;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index a5c4b9fbd5..8336e082fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
@@ -95,6 +95,8 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
tsFileResourceList = tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition);
}
this.hashCode = this.toString().hashCode();
+ this.innerSeqTask = sequence;
+ this.crossTask = false;
collectSelectedFilesInfo();
createSummary();
}
@@ -246,7 +248,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
TsFileMetricManager.getInstance()
.deleteFile(totalSizeOfDeletedFile, sequence, selectedTsFileResourceList.size());
- CompactionMetricsRecorder.updateSummary(summary);
+ CompactionMetrics.getInstance().updateSummary(summary);
double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
index af90e81cd2..b07010b72a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -145,7 +145,7 @@ public class AlignedSeriesCompactionExecutor {
readerIterator.nextReader();
summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum());
summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum());
- CompactionMetricsRecorder.recordReadInfo(nextAlignedChunkInfo.getTotalSize());
+ CompactionMetrics.getInstance().recordReadInfo(nextAlignedChunkInfo.getTotalSize());
compactOneAlignedChunk(
nextAlignedChunkInfo.getReader(), nextAlignedChunkInfo.getNotNullChunkNum());
}
@@ -154,11 +154,12 @@ public class AlignedSeriesCompactionExecutor {
if (remainingPointInChunkWriter != 0L) {
CompactionTaskManager.mergeRateLimiterAcquire(
rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetricsRecorder.recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- true,
- chunkWriter.estimateMaxSeriesMemSize());
+ CompactionMetrics.getInstance()
+ .recordWriteInfo(
+ CompactionType.INNER_SEQ_COMPACTION,
+ ProcessChunkType.DESERIALIZE_CHUNK,
+ true,
+ chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(writer);
}
writer.checkMetadataSizeAndMayFlush();
@@ -199,11 +200,12 @@ public class AlignedSeriesCompactionExecutor {
|| chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) {
CompactionTaskManager.mergeRateLimiterAcquire(
rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetricsRecorder.recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- true,
- chunkWriter.estimateMaxSeriesMemSize());
+ CompactionMetrics.getInstance()
+ .recordWriteInfo(
+ CompactionType.INNER_SEQ_COMPACTION,
+ ProcessChunkType.DESERIALIZE_CHUNK,
+ true,
+ chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(writer);
remainingPointInChunkWriter = 0L;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index d5184dcea4..2a278a5fb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -128,9 +128,10 @@ public class SingleSeriesCompactionExecutor {
if (this.chunkWriter == null) {
constructChunkWriterFromReadChunk(currentChunk);
}
- CompactionMetricsRecorder.recordReadInfo(
- (long) currentChunk.getHeader().getSerializedSize()
- + currentChunk.getHeader().getDataSize());
+ CompactionMetrics.getInstance()
+ .recordReadInfo(
+ (long) currentChunk.getHeader().getSerializedSize()
+ + currentChunk.getHeader().getDataSize());
// if this chunk is modified, deserialize it into points
if (chunkMetadata.getDeleteIntervalList() != null) {
@@ -321,11 +322,12 @@ public class SingleSeriesCompactionExecutor {
if (chunkMetadata.getEndTime() > maxEndTimestamp) {
maxEndTimestamp = chunkMetadata.getEndTime();
}
- CompactionMetricsRecorder.recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- isCachedChunk ? ProcessChunkType.MERGE_CHUNK : ProcessChunkType.FLUSH_CHUNK,
- false,
- getChunkSize(chunk));
+ CompactionMetrics.getInstance()
+ .recordWriteInfo(
+ CompactionType.INNER_SEQ_COMPACTION,
+ isCachedChunk ? ProcessChunkType.MERGE_CHUNK : ProcessChunkType.FLUSH_CHUNK,
+ false,
+ getChunkSize(chunk));
fileWriter.writeChunk(chunk, chunkMetadata);
}
@@ -334,11 +336,12 @@ public class SingleSeriesCompactionExecutor {
|| chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
CompactionTaskManager.mergeRateLimiterAcquire(
compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetricsRecorder.recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- false,
- chunkWriter.estimateMaxSeriesMemSize());
+ CompactionMetrics.getInstance()
+ .recordWriteInfo(
+ CompactionType.INNER_SEQ_COMPACTION,
+ ProcessChunkType.DESERIALIZE_CHUNK,
+ false,
+ chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(fileWriter);
pointCountInChunkWriter = 0L;
}
@@ -356,11 +359,12 @@ public class SingleSeriesCompactionExecutor {
private void flushChunkWriter() throws IOException {
CompactionTaskManager.mergeRateLimiterAcquire(
compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetricsRecorder.recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- false,
- chunkWriter.estimateMaxSeriesMemSize());
+ CompactionMetrics.getInstance()
+ .recordWriteInfo(
+ CompactionType.INNER_SEQ_COMPACTION,
+ ProcessChunkType.DESERIALIZE_CHUNK,
+ false,
+ chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(fileWriter);
pointCountInChunkWriter = 0L;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 92ff3008e5..efa84edce8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -296,11 +296,14 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
if (iChunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) {
sealChunk(fileWriter, iChunkWriter, subTaskId);
lastCheckIndex = 0;
- CompactionMetricsRecorder.recordWriteInfo(
- isCrossSpace ? CompactionType.CROSS_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- isAlign,
- iChunkWriter.estimateMaxSeriesMemSize());
+ CompactionMetrics.getInstance()
+ .recordWriteInfo(
+ isCrossSpace
+ ? CompactionType.CROSS_COMPACTION
+ : CompactionType.INNER_UNSEQ_COMPACTION,
+ ProcessChunkType.DESERIALIZE_CHUNK,
+ isAlign,
+ iChunkWriter.estimateMaxSeriesMemSize());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 0e51e046a1..dd5a2aa9e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -31,8 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.engine.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import com.google.common.util.concurrent.RateLimiter;
@@ -97,8 +96,8 @@ public class CompactionTaskManager implements IService {
AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
candidateCompactionTaskQueue.regsitPollLastHook(
x ->
- CompactionMetricsRecorder.recordTaskInfo(
- x, CompactionTaskStatus.POLL_FROM_QUEUE, candidateCompactionTaskQueue.size()));
+ CompactionMetrics.getInstance()
+ .reportPollTaskFromWaitingQueue(x.isCrossTask(), x.isInnerSeqTask()));
init = true;
}
logger.info("Compaction task manager started.");
@@ -225,8 +224,9 @@ public class CompactionTaskManager implements IService {
candidateCompactionTaskQueue.put(compactionTask);
// add metrics
- CompactionMetricsRecorder.recordTaskInfo(
- compactionTask, CompactionTaskStatus.ADD_TO_QUEUE, candidateCompactionTaskQueue.size());
+ CompactionMetrics.getInstance()
+ .reportAddTaskToWaitingQueue(
+ compactionTask.isCrossTask(), compactionTask.isInnerSeqTask());
return true;
}
@@ -272,9 +272,6 @@ public class CompactionTaskManager implements IService {
if (storageGroupTasks.containsKey(regionWithSG)) {
storageGroupTasks.get(regionWithSG).remove(task);
}
- // add metrics
- CompactionMetricsRecorder.recordTaskInfo(
- task, CompactionTaskStatus.FINISHED, currentTaskNum.get());
finishedTaskNum.incrementAndGet();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
index 18e7875349..aa37314eee 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
@@ -20,8 +20,7 @@ package org.apache.iotdb.db.engine.compaction.schedule;
import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import org.jetbrains.annotations.NotNull;
@@ -55,10 +54,10 @@ public class CompactionWorker implements Runnable {
log.warn("CompactionThread-{} terminates because interruption", threadId);
return;
}
+ CompactionMetrics.getInstance()
+ .reportPollTaskFromWaitingQueue(task.isCrossTask(), task.isInnerSeqTask());
if (task != null) {
// add metrics
- CompactionMetricsRecorder.recordTaskInfo(
- task, CompactionTaskStatus.POLL_FROM_QUEUE, compactionTaskQueue.size());
if (task.checkValidAndSetMerging()) {
CompactionTaskSummary summary = task.getSummary();
CompactionTaskFuture future = new CompactionTaskFuture(summary);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index 074b7345a6..55b7b4b303 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.mpp.metric.QueryResourceMetricSet;
import org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
@@ -36,6 +37,7 @@ public class DataNodeMetricsHelper {
MetricService.getInstance().addMetricSet(new JvmMetrics());
MetricService.getInstance().addMetricSet(new LogbackMetrics());
MetricService.getInstance().addMetricSet(new FileMetrics());
+ MetricService.getInstance().addMetricSet(CompactionMetrics.getInstance());
MetricService.getInstance().addMetricSet(new ProcessMetrics());
MetricService.getInstance().addMetricSet(new SystemMetrics(true));
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetrics.java
new file mode 100644
index 0000000000..127136dffc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetrics.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.metrics.recorder;
+
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CompactionMetrics implements IMetricSet {
+ private static final CompactionMetrics INSTANCE = new CompactionMetrics();
+ private final AtomicInteger waitingSeqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger waitingUnseqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger waitingCrossCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger runningSeqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger runningUnseqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger runningCrossCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger finishSeqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger finishUnseqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger finishCrossCompactionTaskNum = new AtomicInteger(0);
+
+ private CompactionMetrics() {}
+
+ public static CompactionMetrics getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ bindTaskInfo(metricService);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ unbindTaskInfo(metricService);
+ }
+
+ private void bindTaskInfo(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ CompactionMetrics::getWaitingCrossCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_cross",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ CompactionMetrics::getWaitingSeqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_inner_seq",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ CompactionMetrics::getWaitingUnseqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_inner_unseq",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ CompactionMetrics::getRunningCrossCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_cross",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ CompactionMetrics::getRunningSeqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_inner_seq",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ CompactionMetrics::getRunningUnseqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_inner_unseq",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.createAutoGauge(
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ CompactionMetrics::getFinishSeqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "inner_seq");
+ metricService.createAutoGauge(
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ CompactionMetrics::getFinishUnseqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "inner_unseq");
+ metricService.createAutoGauge(
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ this,
+ CompactionMetrics::getFinishCrossCompactionTaskNum,
+ Tag.NAME.toString(),
+ "cross");
+ }
+
+ private void unbindTaskInfo(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_cross",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_inner_seq",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_inner_unseq",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_cross",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_inner_seq",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_inner_unseq",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ Tag.NAME.toString(),
+ "inner_seq");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ Tag.NAME.toString(),
+ "inner_unseq");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ Tag.NAME.toString(),
+ "cross");
+ }
+
+ public void recordWriteInfo(
+ CompactionType compactionType,
+ ProcessChunkType processChunkType,
+ boolean aligned,
+ long byteNum) {
+ MetricService.getInstance()
+ .count(
+ byteNum / 1024L,
+ Metric.DATA_WRITTEN.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "compaction_" + compactionType.toString(),
+ Tag.TYPE.toString(),
+ (aligned ? "ALIGNED" : "NOT_ALIGNED") + "_" + processChunkType.toString());
+ MetricService.getInstance()
+ .count(
+ byteNum / 1024L,
+ Metric.DATA_WRITTEN.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "compaction",
+ Tag.TYPE.toString(),
+ "total");
+ }
+
+ public void recordReadInfo(long byteNum) {
+ MetricService.getInstance()
+ .count(
+ byteNum,
+ Metric.DATA_READ.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "compaction");
+ }
+
+ public void updateSummary(CompactionTaskSummary summary) {
+ MetricService.getInstance()
+ .count(
+ summary.getProcessPointNum(),
+ "Compacted_Point_Num",
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "compaction");
+ MetricService.getInstance()
+ .count(
+ summary.getProcessChunkNum(),
+ "Compacted_Chunk_Num",
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "compaction");
+ MetricService.getInstance()
+ .count(
+ summary.getDirectlyFlushChunkNum(),
+ "Directly_Flush_Chunk_Num",
+ MetricLevel.NORMAL,
+ Tag.NAME.toString(),
+ "compaction");
+ MetricService.getInstance()
+ .count(
+ summary.getDeserializeChunkCount(),
+ "Deserialized_Chunk_Num",
+ MetricLevel.NORMAL,
+ Tag.NAME.toString(),
+ "compaction");
+ MetricService.getInstance()
+ .count(
+ summary.getMergedChunkNum(),
+ "Merged_Chunk_Num",
+ MetricLevel.NORMAL,
+ Tag.NAME.toString(),
+ "compaction");
+ }
+
+ public void reportAddTaskToWaitingQueue(boolean isCrossTask, boolean isSeq) {
+ if (isCrossTask) {
+ waitingCrossCompactionTaskNum.incrementAndGet();
+ } else if (isSeq) {
+ waitingSeqInnerCompactionTaskNum.incrementAndGet();
+ } else {
+ waitingUnseqInnerCompactionTaskNum.incrementAndGet();
+ }
+ }
+
+ public void reportPollTaskFromWaitingQueue(boolean isCrossTask, boolean isSeq) {
+ if (isCrossTask) {
+ waitingCrossCompactionTaskNum.decrementAndGet();
+ } else if (isSeq) {
+ waitingSeqInnerCompactionTaskNum.decrementAndGet();
+ } else {
+ waitingUnseqInnerCompactionTaskNum.decrementAndGet();
+ }
+ }
+
+ public void reportTaskStartRunning(boolean isCrossTask, boolean isSeq) {
+ if (isCrossTask) {
+ runningCrossCompactionTaskNum.incrementAndGet();
+ } else if (isSeq) {
+ runningSeqInnerCompactionTaskNum.incrementAndGet();
+ } else {
+ runningUnseqInnerCompactionTaskNum.incrementAndGet();
+ }
+ }
+
+ public void reportTaskFinishOrAbort(boolean isCrossTask, boolean isSeq, long timeCost) {
+ if (isCrossTask) {
+ runningCrossCompactionTaskNum.decrementAndGet();
+ finishCrossCompactionTaskNum.incrementAndGet();
+ MetricService.getInstance()
+ .timer(
+ timeCost,
+ TimeUnit.MILLISECONDS,
+ Metric.COST_TASK.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "cross_compaction");
+ } else if (isSeq) {
+ runningSeqInnerCompactionTaskNum.decrementAndGet();
+ finishSeqInnerCompactionTaskNum.incrementAndGet();
+ MetricService.getInstance()
+ .timer(
+ timeCost,
+ TimeUnit.MILLISECONDS,
+ Metric.COST_TASK.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "inner_seq_compaction");
+ } else {
+ runningUnseqInnerCompactionTaskNum.decrementAndGet();
+ finishUnseqInnerCompactionTaskNum.incrementAndGet();
+ MetricService.getInstance()
+ .timer(
+ timeCost,
+ TimeUnit.MILLISECONDS,
+ Metric.COST_TASK.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "inner_unseq_compaction");
+ }
+ }
+
+ public int getWaitingSeqInnerCompactionTaskNum() {
+ return waitingSeqInnerCompactionTaskNum.get();
+ }
+
+ public int getWaitingUnseqInnerCompactionTaskNum() {
+ return waitingUnseqInnerCompactionTaskNum.get();
+ }
+
+ public int getWaitingCrossCompactionTaskNum() {
+ return waitingCrossCompactionTaskNum.get();
+ }
+
+ public int getRunningSeqInnerCompactionTaskNum() {
+ return runningSeqInnerCompactionTaskNum.get();
+ }
+
+ public int getRunningUnseqInnerCompactionTaskNum() {
+ return runningUnseqInnerCompactionTaskNum.get();
+ }
+
+ public int getRunningCrossCompactionTaskNum() {
+ return runningCrossCompactionTaskNum.get();
+ }
+
+ public int getFinishSeqInnerCompactionTaskNum() {
+ return finishSeqInnerCompactionTaskNum.get();
+ }
+
+ public int getFinishUnseqInnerCompactionTaskNum() {
+ return finishUnseqInnerCompactionTaskNum.get();
+ }
+
+ public int getFinishCrossCompactionTaskNum() {
+ return finishCrossCompactionTaskNum.get();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java
deleted file mode 100644
index 568fb76fb0..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service.metrics.recorder;
-
-import org.apache.iotdb.commons.service.metric.MetricService;
-import org.apache.iotdb.commons.service.metric.enums.Metric;
-import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
-import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.engine.compaction.execute.task.CrossSpaceCompactionTask;
-import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
-import org.apache.iotdb.metrics.utils.MetricLevel;
-
-import java.util.concurrent.TimeUnit;
-
-public class CompactionMetricsRecorder {
-
- public static void recordWriteInfo(
- CompactionType compactionType,
- ProcessChunkType processChunkType,
- boolean aligned,
- long byteNum) {
- MetricService.getInstance()
- .count(
- byteNum / 1024L,
- Metric.DATA_WRITTEN.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction_" + compactionType.toString(),
- Tag.TYPE.toString(),
- (aligned ? "ALIGNED" : "NOT_ALIGNED") + "_" + processChunkType.toString());
- MetricService.getInstance()
- .count(
- byteNum / 1024L,
- Metric.DATA_WRITTEN.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction",
- Tag.TYPE.toString(),
- "total");
- }
-
- public static void recordReadInfo(long byteNum) {
- MetricService.getInstance()
- .count(
- byteNum,
- Metric.DATA_READ.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction");
- }
-
- public static void updateSummary(CompactionTaskSummary summary) {
- MetricService.getInstance()
- .count(
- summary.getProcessPointNum(),
- "Compacted_Point_Num",
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction");
- MetricService.getInstance()
- .count(
- summary.getProcessChunkNum(),
- "Compacted_Chunk_Num",
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction");
- MetricService.getInstance()
- .count(
- summary.getDirectlyFlushChunkNum(),
- "Directly_Flush_Chunk_Num",
- MetricLevel.NORMAL,
- Tag.NAME.toString(),
- "compaction");
- MetricService.getInstance()
- .count(
- summary.getDeserializeChunkCount(),
- "Deserialized_Chunk_Num",
- MetricLevel.NORMAL,
- Tag.NAME.toString(),
- "compaction");
- MetricService.getInstance()
- .count(
- summary.getMergedChunkNum(),
- "Merged_Chunk_Num",
- MetricLevel.NORMAL,
- Tag.NAME.toString(),
- "compaction");
- }
-
- public static void recordTaskInfo(
- AbstractCompactionTask task, CompactionTaskStatus status, int size) {
- String taskType = "unknown";
- boolean isInnerTask = false;
- if (task instanceof InnerSpaceCompactionTask) {
- isInnerTask = true;
- taskType = "inner";
- } else if (task instanceof CrossSpaceCompactionTask) {
- taskType = "cross";
- }
-
- switch (status) {
- case ADD_TO_QUEUE:
- case POLL_FROM_QUEUE:
- MetricService.getInstance()
- .getOrCreateGauge(
- Metric.QUEUE.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction_" + taskType,
- Tag.STATUS.toString(),
- "waiting")
- .set(size);
- break;
- case READY_TO_EXECUTE:
- MetricService.getInstance()
- .getOrCreateGauge(
- Metric.QUEUE.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction_" + taskType,
- Tag.STATUS.toString(),
- "running")
- .set(size);
- break;
- case FINISHED:
- MetricService.getInstance()
- .getOrCreateGauge(
- Metric.QUEUE.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction_" + taskType,
- Tag.STATUS.toString(),
- "running")
- .set(size);
- MetricService.getInstance()
- .timer(
- task.getTimeCost(),
- TimeUnit.MILLISECONDS,
- Metric.COST_TASK.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- isInnerTask ? "inner_compaction" : "cross_compaction");
- if (isInnerTask) {
- MetricService.getInstance()
- .count(
- 1,
- Metric.COMPACTION_TASK_COUNT.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "inner_compaction",
- Tag.TYPE.toString(),
- ((InnerSpaceCompactionTask) task).isSequence() ? "sequence" : "unsequence");
- } else {
- MetricService.getInstance()
- .count(
- 1,
- Metric.COMPACTION_TASK_COUNT.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "cross_compaction",
- Tag.TYPE.toString(),
- "cross");
- }
- break;
- default:
- // do nothing
- break;
- }
- }
-}