You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by sp...@apache.org on 2023/02/10 08:21:46 UTC
[iotdb] branch rel/1.0 updated: [To Rel/1.0] [IOTDB-5470] Refactor compaction metrics (#9039)
This is an automated email from the ASF dual-hosted git repository.
spricoder pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new d5e379051c [To Rel/1.0] [IOTDB-5470] Refactor compaction metrics (#9039)
d5e379051c is described below
commit d5e379051c1175fd40527a1abc91b000f597c043
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Fri Feb 10 16:21:38 2023 +0800
[To Rel/1.0] [IOTDB-5470] Refactor compaction metrics (#9039)
* cherry pick IOTDB-5470 and fix conflict
* update dashboard
---
.../Apache-IoTDB-DataNode-Dashboard.json | 20 +-
.../execute/task/AbstractCompactionTask.java | 14 ++
.../execute/task/CrossSpaceCompactionTask.java | 6 +-
.../execute/task/InnerSpaceCompactionTask.java | 6 +-
.../readchunk/AlignedSeriesCompactionExecutor.java | 26 +--
.../readchunk/SingleSeriesCompactionExecutor.java | 42 ++--
.../utils/writer/AbstractCompactionWriter.java | 15 +-
.../compaction/schedule/CompactionTaskManager.java | 15 +-
.../compaction/schedule/CompactionWorker.java | 7 +-
.../db/service/metrics/CompactionMetrics.java | 232 +++++++++++++++++++++
.../db/service/metrics/DataNodeMetricsHelper.java | 1 +
.../metrics/recorder/CompactionMetricsManager.java | 225 ++++++++++++++++++++
.../recorder/CompactionMetricsRecorder.java | 190 -----------------
13 files changed, 545 insertions(+), 254 deletions(-)
diff --git a/docs/UserGuide/Monitor-Alert/Apache-IoTDB-DataNode-Dashboard.json b/docs/UserGuide/Monitor-Alert/Apache-IoTDB-DataNode-Dashboard.json
index 5ac1e2f629..f481e8f979 100644
--- a/docs/UserGuide/Monitor-Alert/Apache-IoTDB-DataNode-Dashboard.json
+++ b/docs/UserGuide/Monitor-Alert/Apache-IoTDB-DataNode-Dashboard.json
@@ -63,7 +63,7 @@
"fiscalYearStartMonth": 0,
"graphTooltip": 1,
"id": null,
- "iteration": 1675061569475,
+ "iteration": 1675999111798,
"links": [],
"liveNow": false,
"panels": [
@@ -1501,10 +1501,6 @@
"type": "timeseries"
},
{
- "datasource": {
- "type": "prometheus",
- "uid": "${DS_PROMETHEUS}"
- },
"fieldConfig": {
"defaults": {
"color": {
@@ -1579,7 +1575,7 @@
},
"editorMode": "code",
"exemplar": true,
- "expr": "sum(rate(compaction_task_count_total{instance=\"$instance\", name = \"inner_compaction\", type=\"sequence\"}[1m]))*60",
+ "expr": "sum(rate(compaction_task_count{instance=\"$instance\", name = \"inner_unseq\"}[1m]))*60",
"interval": "",
"legendFormat": "sequence",
"range": true,
@@ -1591,8 +1587,10 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "sum(rate(compaction_task_count_total{instance=\"$instance\", name = \"inner_compaction\", type=\"unsequence\"}[1m]))*60",
+ "exemplar": true,
+ "expr": "sum(rate(compaction_task_count{instance=\"$instance\", name = \"inner_seq\"}[1m]))*60",
"hide": false,
+ "interval": "",
"legendFormat": "unsequence",
"range": true,
"refId": "B"
@@ -1603,8 +1601,10 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "sum(rate(compaction_task_count_total{instance=\"$instance\", name = \"cross_compaction\", type=\"cross\"}[1m]))*60",
+ "exemplar": true,
+ "expr": "sum(rate(compaction_task_count{instance=\"$instance\", name = \"cross\"}[1m]))*60",
"hide": false,
+ "interval": "",
"legendFormat": "cross",
"range": true,
"refId": "C"
@@ -4794,8 +4794,8 @@
]
},
"time": {
- "from": "2023-01-30T05:44:55.019Z",
- "to": "2023-01-30T07:44:55.023Z"
+ "from": "now-5m",
+ "to": "now"
},
"timepicker": {
"refresh_intervals": [
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
index 0583811d6e..482194705d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer;
import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,8 @@ public abstract class AbstractCompactionTask {
protected int hashCode = -1;
protected CompactionTaskSummary summary;
protected long serialId;
+ protected boolean crossTask;
+ protected boolean innerSeqTask;
public AbstractCompactionTask(
String storageGroupName,
@@ -71,6 +74,7 @@ public abstract class AbstractCompactionTask {
public void start() {
currentTaskNum.incrementAndGet();
boolean isSuccess = false;
+ CompactionMetricsManager.getInstance().reportTaskStartRunning(crossTask, innerSeqTask);
try {
summary.start();
doCompaction();
@@ -79,6 +83,8 @@ public abstract class AbstractCompactionTask {
this.currentTaskNum.decrementAndGet();
summary.finish(isSuccess);
CompactionTaskManager.getInstance().removeRunningTaskFuture(this);
+ CompactionMetricsManager.getInstance()
+ .reportTaskFinishOrAbort(crossTask, innerSeqTask, summary.getTimeCost());
}
}
@@ -150,4 +156,12 @@ public abstract class AbstractCompactionTask {
}
protected abstract void createSummary();
+
+ public boolean isCrossTask() {
+ return crossTask;
+ }
+
+ public boolean isInnerSeqTask() {
+ return innerSeqTask;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
index a87cdd3d29..59efe6c880 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.rescon.SystemInfo;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -90,6 +90,8 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
this.performer = performer;
this.hashCode = this.toString().hashCode();
this.memoryCost = memoryCost;
+ this.crossTask = true;
+ this.innerSeqTask = false;
createSummary();
}
@@ -234,7 +236,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
TsFileMetricManager.getInstance()
.deleteFile(unsequenceFileSize, false, selectedUnsequenceFiles.size());
- CompactionMetricsRecorder.updateSummary(summary);
+ CompactionMetricsManager.getInstance().updateSummary(summary);
long costTime = (System.currentTimeMillis() - startTime) / 1000;
LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index a5c4b9fbd5..c1bb3eb505 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
@@ -95,6 +95,8 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
tsFileResourceList = tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition);
}
this.hashCode = this.toString().hashCode();
+ this.innerSeqTask = sequence;
+ this.crossTask = false;
collectSelectedFilesInfo();
createSummary();
}
@@ -246,7 +248,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
TsFileMetricManager.getInstance()
.deleteFile(totalSizeOfDeletedFile, sequence, selectedTsFileResourceList.size());
- CompactionMetricsRecorder.updateSummary(summary);
+ CompactionMetricsManager.getInstance().updateSummary(summary);
double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
index af90e81cd2..9e292a45af 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -145,7 +145,7 @@ public class AlignedSeriesCompactionExecutor {
readerIterator.nextReader();
summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum());
summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum());
- CompactionMetricsRecorder.recordReadInfo(nextAlignedChunkInfo.getTotalSize());
+ CompactionMetricsManager.getInstance().recordReadInfo(nextAlignedChunkInfo.getTotalSize());
compactOneAlignedChunk(
nextAlignedChunkInfo.getReader(), nextAlignedChunkInfo.getNotNullChunkNum());
}
@@ -154,11 +154,12 @@ public class AlignedSeriesCompactionExecutor {
if (remainingPointInChunkWriter != 0L) {
CompactionTaskManager.mergeRateLimiterAcquire(
rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetricsRecorder.recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- true,
- chunkWriter.estimateMaxSeriesMemSize());
+ CompactionMetricsManager.getInstance()
+ .recordWriteInfo(
+ CompactionType.INNER_SEQ_COMPACTION,
+ ProcessChunkType.DESERIALIZE_CHUNK,
+ true,
+ chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(writer);
}
writer.checkMetadataSizeAndMayFlush();
@@ -199,11 +200,12 @@ public class AlignedSeriesCompactionExecutor {
|| chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) {
CompactionTaskManager.mergeRateLimiterAcquire(
rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetricsRecorder.recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- true,
- chunkWriter.estimateMaxSeriesMemSize());
+ CompactionMetricsManager.getInstance()
+ .recordWriteInfo(
+ CompactionType.INNER_SEQ_COMPACTION,
+ ProcessChunkType.DESERIALIZE_CHUNK,
+ true,
+ chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(writer);
remainingPointInChunkWriter = 0L;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index d5184dcea4..d8d3d71172 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -128,9 +128,10 @@ public class SingleSeriesCompactionExecutor {
if (this.chunkWriter == null) {
constructChunkWriterFromReadChunk(currentChunk);
}
- CompactionMetricsRecorder.recordReadInfo(
- (long) currentChunk.getHeader().getSerializedSize()
- + currentChunk.getHeader().getDataSize());
+ CompactionMetricsManager.getInstance()
+ .recordReadInfo(
+ (long) currentChunk.getHeader().getSerializedSize()
+ + currentChunk.getHeader().getDataSize());
// if this chunk is modified, deserialize it into points
if (chunkMetadata.getDeleteIntervalList() != null) {
@@ -321,11 +322,12 @@ public class SingleSeriesCompactionExecutor {
if (chunkMetadata.getEndTime() > maxEndTimestamp) {
maxEndTimestamp = chunkMetadata.getEndTime();
}
- CompactionMetricsRecorder.recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- isCachedChunk ? ProcessChunkType.MERGE_CHUNK : ProcessChunkType.FLUSH_CHUNK,
- false,
- getChunkSize(chunk));
+ CompactionMetricsManager.getInstance()
+ .recordWriteInfo(
+ CompactionType.INNER_SEQ_COMPACTION,
+ isCachedChunk ? ProcessChunkType.MERGE_CHUNK : ProcessChunkType.FLUSH_CHUNK,
+ false,
+ getChunkSize(chunk));
fileWriter.writeChunk(chunk, chunkMetadata);
}
@@ -334,11 +336,12 @@ public class SingleSeriesCompactionExecutor {
|| chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
CompactionTaskManager.mergeRateLimiterAcquire(
compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetricsRecorder.recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- false,
- chunkWriter.estimateMaxSeriesMemSize());
+ CompactionMetricsManager.getInstance()
+ .recordWriteInfo(
+ CompactionType.INNER_SEQ_COMPACTION,
+ ProcessChunkType.DESERIALIZE_CHUNK,
+ false,
+ chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(fileWriter);
pointCountInChunkWriter = 0L;
}
@@ -356,11 +359,12 @@ public class SingleSeriesCompactionExecutor {
private void flushChunkWriter() throws IOException {
CompactionTaskManager.mergeRateLimiterAcquire(
compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
- CompactionMetricsRecorder.recordWriteInfo(
- CompactionType.INNER_SEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- false,
- chunkWriter.estimateMaxSeriesMemSize());
+ CompactionMetricsManager.getInstance()
+ .recordWriteInfo(
+ CompactionType.INNER_SEQ_COMPACTION,
+ ProcessChunkType.DESERIALIZE_CHUNK,
+ false,
+ chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(fileWriter);
pointCountInChunkWriter = 0L;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 92ff3008e5..553167b1c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -296,11 +296,14 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
if (iChunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) {
sealChunk(fileWriter, iChunkWriter, subTaskId);
lastCheckIndex = 0;
- CompactionMetricsRecorder.recordWriteInfo(
- isCrossSpace ? CompactionType.CROSS_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION,
- ProcessChunkType.DESERIALIZE_CHUNK,
- isAlign,
- iChunkWriter.estimateMaxSeriesMemSize());
+ CompactionMetricsManager.getInstance()
+ .recordWriteInfo(
+ isCrossSpace
+ ? CompactionType.CROSS_COMPACTION
+ : CompactionType.INNER_UNSEQ_COMPACTION,
+ ProcessChunkType.DESERIALIZE_CHUNK,
+ isAlign,
+ iChunkWriter.estimateMaxSeriesMemSize());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 0e51e046a1..76bf9db322 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -31,8 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.engine.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import com.google.common.util.concurrent.RateLimiter;
@@ -97,8 +96,8 @@ public class CompactionTaskManager implements IService {
AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
candidateCompactionTaskQueue.regsitPollLastHook(
x ->
- CompactionMetricsRecorder.recordTaskInfo(
- x, CompactionTaskStatus.POLL_FROM_QUEUE, candidateCompactionTaskQueue.size()));
+ CompactionMetricsManager.getInstance()
+ .reportPollTaskFromWaitingQueue(x.isCrossTask(), x.isInnerSeqTask()));
init = true;
}
logger.info("Compaction task manager started.");
@@ -225,8 +224,9 @@ public class CompactionTaskManager implements IService {
candidateCompactionTaskQueue.put(compactionTask);
// add metrics
- CompactionMetricsRecorder.recordTaskInfo(
- compactionTask, CompactionTaskStatus.ADD_TO_QUEUE, candidateCompactionTaskQueue.size());
+ CompactionMetricsManager.getInstance()
+ .reportAddTaskToWaitingQueue(
+ compactionTask.isCrossTask(), compactionTask.isInnerSeqTask());
return true;
}
@@ -272,9 +272,6 @@ public class CompactionTaskManager implements IService {
if (storageGroupTasks.containsKey(regionWithSG)) {
storageGroupTasks.get(regionWithSG).remove(task);
}
- // add metrics
- CompactionMetricsRecorder.recordTaskInfo(
- task, CompactionTaskStatus.FINISHED, currentTaskNum.get());
finishedTaskNum.incrementAndGet();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
index 18e7875349..b38f247f01 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
@@ -20,8 +20,7 @@ package org.apache.iotdb.db.engine.compaction.schedule;
import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import org.jetbrains.annotations.NotNull;
@@ -55,10 +54,10 @@ public class CompactionWorker implements Runnable {
log.warn("CompactionThread-{} terminates because interruption", threadId);
return;
}
+ CompactionMetricsManager.getInstance()
+ .reportPollTaskFromWaitingQueue(task.isCrossTask(), task.isInnerSeqTask());
if (task != null) {
// add metrics
- CompactionMetricsRecorder.recordTaskInfo(
- task, CompactionTaskStatus.POLL_FROM_QUEUE, compactionTaskQueue.size());
if (task.checkValidAndSetMerging()) {
CompactionTaskSummary summary = task.getSummary();
CompactionTaskFuture future = new CompactionTaskFuture(summary);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
new file mode 100644
index 0000000000..6ef7ad973a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with COMPACTION_METRICS_MANAGER work for additional information
+ * regarding copyright ownership. The ASF licenses COMPACTION_METRICS_MANAGER file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use COMPACTION_METRICS_MANAGER file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.metrics;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class CompactionMetrics implements IMetricSet {
+ private final CompactionMetricsManager COMPACTION_METRICS_MANAGER =
+ CompactionMetricsManager.getInstance();
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ bindTaskInfo(metricService);
+ bindPerformanceInfo(metricService);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ unbindTaskInfo(metricService);
+ unbindPerformanceInfo(metricService);
+ }
+
+ private void bindTaskInfo(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ COMPACTION_METRICS_MANAGER,
+ CompactionMetricsManager::getWaitingCrossCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_cross",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ COMPACTION_METRICS_MANAGER,
+ CompactionMetricsManager::getWaitingSeqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_inner_seq",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ COMPACTION_METRICS_MANAGER,
+ CompactionMetricsManager::getWaitingUnseqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_inner_unseq",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ COMPACTION_METRICS_MANAGER,
+ CompactionMetricsManager::getRunningCrossCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_cross",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ COMPACTION_METRICS_MANAGER,
+ CompactionMetricsManager::getRunningSeqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_inner_seq",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.createAutoGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ COMPACTION_METRICS_MANAGER,
+ CompactionMetricsManager::getRunningUnseqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "compaction_inner_unseq",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.createAutoGauge(
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ COMPACTION_METRICS_MANAGER,
+ CompactionMetricsManager::getFinishSeqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "inner_seq");
+ metricService.createAutoGauge(
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ COMPACTION_METRICS_MANAGER,
+ CompactionMetricsManager::getFinishUnseqInnerCompactionTaskNum,
+ Tag.NAME.toString(),
+ "inner_unseq");
+ metricService.createAutoGauge(
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ COMPACTION_METRICS_MANAGER,
+ CompactionMetricsManager::getFinishCrossCompactionTaskNum,
+ Tag.NAME.toString(),
+ "cross");
+ metricService.getOrCreateTimer(
+ Metric.COST_TASK.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "inner_seq_compaction");
+ metricService.getOrCreateTimer(
+ Metric.COST_TASK.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "inner_unseq_compaction");
+ metricService.getOrCreateTimer(
+ Metric.COST_TASK.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "cross_compaction");
+ }
+
+ private void unbindTaskInfo(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_cross",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_inner_seq",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_inner_unseq",
+ Tag.STATUS.toString(),
+ "waiting");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_cross",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_inner_seq",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.QUEUE.toString(),
+ Tag.NAME.toString(),
+ "compaction_inner_unseq",
+ Tag.STATUS.toString(),
+ "running");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ Tag.NAME.toString(),
+ "inner_seq");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ Tag.NAME.toString(),
+ "inner_unseq");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.COMPACTION_TASK_COUNT.toString(),
+ Tag.NAME.toString(),
+ "cross");
+ metricService.remove(
+ MetricType.TIMER, Metric.COST_TASK.toString(), Tag.NAME.toString(), "inner_seq_compaction");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.COST_TASK.toString(),
+ Tag.NAME.toString(),
+ "inner_unseq_compaction");
+ metricService.remove(
+ MetricType.TIMER, Metric.COST_TASK.toString(), Tag.NAME.toString(), "cross_compaction");
+ }
+
+ private void bindPerformanceInfo(AbstractMetricService metricService) {
+ metricService.getOrCreateCounter(
+ "Compacted_Point_Num", MetricLevel.IMPORTANT, Tag.NAME.toString(), "compaction");
+ metricService.getOrCreateCounter(
+ "Compacted_Chunk_Num", MetricLevel.IMPORTANT, Tag.NAME.toString(), "compaction");
+ metricService.getOrCreateCounter(
+ "Directly_Flush_Chunk_Num", MetricLevel.NORMAL, Tag.NAME.toString(), "compaction");
+ metricService.getOrCreateCounter(
+ "Deserialized_Chunk_Num", MetricLevel.NORMAL, Tag.NAME.toString(), "compaction");
+ metricService.getOrCreateCounter(
+ "Merged_Chunk_Num", MetricLevel.NORMAL, Tag.NAME.toString(), "compaction");
+ }
+
+ private void unbindPerformanceInfo(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.COUNTER, "Compacted_Point_Num", Tag.NAME.toString(), "compaction");
+ metricService.remove(
+ MetricType.COUNTER, "Compacted_Chunk_Num", Tag.NAME.toString(), "compaction");
+ metricService.remove(
+ MetricType.COUNTER, "Directly_Flush_Chunk_Num", Tag.NAME.toString(), "compaction");
+ metricService.remove(
+ MetricType.COUNTER, "Deserialized_Chunk_Num", Tag.NAME.toString(), "compaction");
+ metricService.remove(MetricType.COUNTER, "Merged_Chunk_Num", Tag.NAME.toString(), "compaction");
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index 1674b22eb4..7efd413183 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -29,6 +29,7 @@ public class DataNodeMetricsHelper {
MetricService.getInstance().addMetricSet(new JvmMetrics());
MetricService.getInstance().addMetricSet(new LogbackMetrics());
MetricService.getInstance().addMetricSet(new FileMetrics());
+ MetricService.getInstance().addMetricSet(new CompactionMetrics());
MetricService.getInstance().addMetricSet(new ProcessMetrics());
MetricService.getInstance().addMetricSet(new SystemMetrics(true));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java
new file mode 100644
index 0000000000..6f531373aa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.metrics.recorder;
+
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CompactionMetricsManager {
+ private static final CompactionMetricsManager INSTANCE = new CompactionMetricsManager();
+ private final AtomicInteger waitingSeqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger waitingUnseqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger waitingCrossCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger runningSeqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger runningUnseqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger runningCrossCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger finishSeqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger finishUnseqInnerCompactionTaskNum = new AtomicInteger(0);
+ private final AtomicInteger finishCrossCompactionTaskNum = new AtomicInteger(0);
+
+ private CompactionMetricsManager() {}
+
+ public static CompactionMetricsManager getInstance() {
+ return INSTANCE;
+ }
+
+ public void recordWriteInfo(
+ CompactionType compactionType,
+ ProcessChunkType processChunkType,
+ boolean aligned,
+ long byteNum) {
+ MetricService.getInstance()
+ .count(
+ byteNum / 1024L,
+ Metric.DATA_WRITTEN.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "compaction_" + compactionType.toString(),
+ Tag.TYPE.toString(),
+ (aligned ? "ALIGNED" : "NOT_ALIGNED") + "_" + processChunkType.toString());
+ MetricService.getInstance()
+ .count(
+ byteNum / 1024L,
+ Metric.DATA_WRITTEN.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "compaction",
+ Tag.TYPE.toString(),
+ "total");
+ }
+
+ public void recordReadInfo(long byteNum) {
+ MetricService.getInstance()
+ .count(
+ byteNum,
+ Metric.DATA_READ.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "compaction");
+ }
+
+ public void updateSummary(CompactionTaskSummary summary) {
+ MetricService.getInstance()
+ .count(
+ summary.getProcessPointNum(),
+ "Compacted_Point_Num",
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "compaction");
+ MetricService.getInstance()
+ .count(
+ summary.getProcessChunkNum(),
+ "Compacted_Chunk_Num",
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "compaction");
+ MetricService.getInstance()
+ .count(
+ summary.getDirectlyFlushChunkNum(),
+ "Directly_Flush_Chunk_Num",
+ MetricLevel.NORMAL,
+ Tag.NAME.toString(),
+ "compaction");
+ MetricService.getInstance()
+ .count(
+ summary.getDeserializeChunkCount(),
+ "Deserialized_Chunk_Num",
+ MetricLevel.NORMAL,
+ Tag.NAME.toString(),
+ "compaction");
+ MetricService.getInstance()
+ .count(
+ summary.getMergedChunkNum(),
+ "Merged_Chunk_Num",
+ MetricLevel.NORMAL,
+ Tag.NAME.toString(),
+ "compaction");
+ }
+
+ public void reportAddTaskToWaitingQueue(boolean isCrossTask, boolean isSeq) {
+ if (isCrossTask) {
+ waitingCrossCompactionTaskNum.incrementAndGet();
+ } else if (isSeq) {
+ waitingSeqInnerCompactionTaskNum.incrementAndGet();
+ } else {
+ waitingUnseqInnerCompactionTaskNum.incrementAndGet();
+ }
+ }
+
+ public void reportPollTaskFromWaitingQueue(boolean isCrossTask, boolean isSeq) {
+ if (isCrossTask) {
+ waitingCrossCompactionTaskNum.decrementAndGet();
+ } else if (isSeq) {
+ waitingSeqInnerCompactionTaskNum.decrementAndGet();
+ } else {
+ waitingUnseqInnerCompactionTaskNum.decrementAndGet();
+ }
+ }
+
+ public void reportTaskStartRunning(boolean isCrossTask, boolean isSeq) {
+ if (isCrossTask) {
+ runningCrossCompactionTaskNum.incrementAndGet();
+ } else if (isSeq) {
+ runningSeqInnerCompactionTaskNum.incrementAndGet();
+ } else {
+ runningUnseqInnerCompactionTaskNum.incrementAndGet();
+ }
+ }
+
+ public void reportTaskFinishOrAbort(boolean isCrossTask, boolean isSeq, long timeCost) {
+ if (isCrossTask) {
+ runningCrossCompactionTaskNum.decrementAndGet();
+ finishCrossCompactionTaskNum.incrementAndGet();
+ MetricService.getInstance()
+ .timer(
+ timeCost,
+ TimeUnit.MILLISECONDS,
+ Metric.COST_TASK.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "cross_compaction");
+ } else if (isSeq) {
+ runningSeqInnerCompactionTaskNum.decrementAndGet();
+ finishSeqInnerCompactionTaskNum.incrementAndGet();
+ MetricService.getInstance()
+ .timer(
+ timeCost,
+ TimeUnit.MILLISECONDS,
+ Metric.COST_TASK.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "inner_seq_compaction");
+ } else {
+ runningUnseqInnerCompactionTaskNum.decrementAndGet();
+ finishUnseqInnerCompactionTaskNum.incrementAndGet();
+ MetricService.getInstance()
+ .timer(
+ timeCost,
+ TimeUnit.MILLISECONDS,
+ Metric.COST_TASK.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "inner_unseq_compaction");
+ }
+ }
+
+ public int getWaitingSeqInnerCompactionTaskNum() {
+ return waitingSeqInnerCompactionTaskNum.get();
+ }
+
+ public int getWaitingUnseqInnerCompactionTaskNum() {
+ return waitingUnseqInnerCompactionTaskNum.get();
+ }
+
+ public int getWaitingCrossCompactionTaskNum() {
+ return waitingCrossCompactionTaskNum.get();
+ }
+
+ public int getRunningSeqInnerCompactionTaskNum() {
+ return runningSeqInnerCompactionTaskNum.get();
+ }
+
+ public int getRunningUnseqInnerCompactionTaskNum() {
+ return runningUnseqInnerCompactionTaskNum.get();
+ }
+
+ public int getRunningCrossCompactionTaskNum() {
+ return runningCrossCompactionTaskNum.get();
+ }
+
+ public int getFinishSeqInnerCompactionTaskNum() {
+ return finishSeqInnerCompactionTaskNum.get();
+ }
+
+ public int getFinishUnseqInnerCompactionTaskNum() {
+ return finishUnseqInnerCompactionTaskNum.get();
+ }
+
+ public int getFinishCrossCompactionTaskNum() {
+ return finishCrossCompactionTaskNum.get();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java
deleted file mode 100644
index 568fb76fb0..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service.metrics.recorder;
-
-import org.apache.iotdb.commons.service.metric.MetricService;
-import org.apache.iotdb.commons.service.metric.enums.Metric;
-import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
-import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.engine.compaction.execute.task.CrossSpaceCompactionTask;
-import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
-import org.apache.iotdb.metrics.utils.MetricLevel;
-
-import java.util.concurrent.TimeUnit;
-
-public class CompactionMetricsRecorder {
-
- public static void recordWriteInfo(
- CompactionType compactionType,
- ProcessChunkType processChunkType,
- boolean aligned,
- long byteNum) {
- MetricService.getInstance()
- .count(
- byteNum / 1024L,
- Metric.DATA_WRITTEN.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction_" + compactionType.toString(),
- Tag.TYPE.toString(),
- (aligned ? "ALIGNED" : "NOT_ALIGNED") + "_" + processChunkType.toString());
- MetricService.getInstance()
- .count(
- byteNum / 1024L,
- Metric.DATA_WRITTEN.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction",
- Tag.TYPE.toString(),
- "total");
- }
-
- public static void recordReadInfo(long byteNum) {
- MetricService.getInstance()
- .count(
- byteNum,
- Metric.DATA_READ.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction");
- }
-
- public static void updateSummary(CompactionTaskSummary summary) {
- MetricService.getInstance()
- .count(
- summary.getProcessPointNum(),
- "Compacted_Point_Num",
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction");
- MetricService.getInstance()
- .count(
- summary.getProcessChunkNum(),
- "Compacted_Chunk_Num",
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction");
- MetricService.getInstance()
- .count(
- summary.getDirectlyFlushChunkNum(),
- "Directly_Flush_Chunk_Num",
- MetricLevel.NORMAL,
- Tag.NAME.toString(),
- "compaction");
- MetricService.getInstance()
- .count(
- summary.getDeserializeChunkCount(),
- "Deserialized_Chunk_Num",
- MetricLevel.NORMAL,
- Tag.NAME.toString(),
- "compaction");
- MetricService.getInstance()
- .count(
- summary.getMergedChunkNum(),
- "Merged_Chunk_Num",
- MetricLevel.NORMAL,
- Tag.NAME.toString(),
- "compaction");
- }
-
- public static void recordTaskInfo(
- AbstractCompactionTask task, CompactionTaskStatus status, int size) {
- String taskType = "unknown";
- boolean isInnerTask = false;
- if (task instanceof InnerSpaceCompactionTask) {
- isInnerTask = true;
- taskType = "inner";
- } else if (task instanceof CrossSpaceCompactionTask) {
- taskType = "cross";
- }
-
- switch (status) {
- case ADD_TO_QUEUE:
- case POLL_FROM_QUEUE:
- MetricService.getInstance()
- .getOrCreateGauge(
- Metric.QUEUE.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction_" + taskType,
- Tag.STATUS.toString(),
- "waiting")
- .set(size);
- break;
- case READY_TO_EXECUTE:
- MetricService.getInstance()
- .getOrCreateGauge(
- Metric.QUEUE.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction_" + taskType,
- Tag.STATUS.toString(),
- "running")
- .set(size);
- break;
- case FINISHED:
- MetricService.getInstance()
- .getOrCreateGauge(
- Metric.QUEUE.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "compaction_" + taskType,
- Tag.STATUS.toString(),
- "running")
- .set(size);
- MetricService.getInstance()
- .timer(
- task.getTimeCost(),
- TimeUnit.MILLISECONDS,
- Metric.COST_TASK.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- isInnerTask ? "inner_compaction" : "cross_compaction");
- if (isInnerTask) {
- MetricService.getInstance()
- .count(
- 1,
- Metric.COMPACTION_TASK_COUNT.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "inner_compaction",
- Tag.TYPE.toString(),
- ((InnerSpaceCompactionTask) task).isSequence() ? "sequence" : "unsequence");
- } else {
- MetricService.getInstance()
- .count(
- 1,
- Metric.COMPACTION_TASK_COUNT.toString(),
- MetricLevel.IMPORTANT,
- Tag.NAME.toString(),
- "cross_compaction",
- Tag.TYPE.toString(),
- "cross");
- }
- break;
- default:
- // do nothing
- break;
- }
- }
-}