You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/02/09 08:53:41 UTC

[iotdb] branch IOTDB-5470 created (now c34828c46a)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch IOTDB-5470
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at c34828c46a refactor compaction metrics

This branch includes the following new commits:

     new c34828c46a refactor compaction metrics

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: refactor compaction metrics

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-5470
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c34828c46a0e7e488c0baa28bacc8dac135875fd
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Feb 9 16:53:23 2023 +0800

    refactor compaction metrics
---
 .../execute/task/AbstractCompactionTask.java       |  14 +
 .../execute/task/CrossSpaceCompactionTask.java     |   6 +-
 .../execute/task/InnerSpaceCompactionTask.java     |   6 +-
 .../readchunk/AlignedSeriesCompactionExecutor.java |  26 +-
 .../readchunk/SingleSeriesCompactionExecutor.java  |  42 +--
 .../utils/writer/AbstractCompactionWriter.java     |  15 +-
 .../compaction/schedule/CompactionTaskManager.java |  15 +-
 .../compaction/schedule/CompactionWorker.java      |   7 +-
 .../db/service/metrics/DataNodeMetricsHelper.java  |   2 +
 .../metrics/recorder/CompactionMetrics.java        | 376 +++++++++++++++++++++
 .../recorder/CompactionMetricsRecorder.java        | 190 -----------
 11 files changed, 455 insertions(+), 244 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
index 0583811d6e..ed47faedcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +49,8 @@ public abstract class AbstractCompactionTask {
   protected int hashCode = -1;
   protected CompactionTaskSummary summary;
   protected long serialId;
+  protected boolean crossTask;
+  protected boolean innerSeqTask;
 
   public AbstractCompactionTask(
       String storageGroupName,
@@ -71,6 +74,7 @@ public abstract class AbstractCompactionTask {
   public void start() {
     currentTaskNum.incrementAndGet();
     boolean isSuccess = false;
+    CompactionMetrics.getInstance().reportTaskStartRunning(crossTask, innerSeqTask);
     try {
       summary.start();
       doCompaction();
@@ -79,6 +83,8 @@ public abstract class AbstractCompactionTask {
       this.currentTaskNum.decrementAndGet();
       summary.finish(isSuccess);
       CompactionTaskManager.getInstance().removeRunningTaskFuture(this);
+      CompactionMetrics.getInstance()
+          .reportTaskFinishOrAbort(crossTask, innerSeqTask, summary.getTimeCost());
     }
   }
 
@@ -150,4 +156,12 @@ public abstract class AbstractCompactionTask {
   }
 
   protected abstract void createSummary();
+
+  public boolean isCrossTask() {
+    return crossTask;
+  }
+
+  public boolean isInnerSeqTask() {
+    return innerSeqTask;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
index 034fd59b1d..6649627669 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.rescon.SystemInfo;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -90,6 +90,8 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
     this.performer = performer;
     this.hashCode = this.toString().hashCode();
     this.memoryCost = memoryCost;
+    this.crossTask = true;
+    this.innerSeqTask = false;
     createSummary();
   }
 
@@ -232,7 +234,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
         TsFileMetricManager.getInstance()
             .deleteFile(unsequenceFileSize, false, selectedUnsequenceFiles.size());
 
-        CompactionMetricsRecorder.updateSummary(summary);
+        CompactionMetrics.getInstance().updateSummary(summary);
 
         long costTime = (System.currentTimeMillis() - startTime) / 1000;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index a5c4b9fbd5..8336e082fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 
@@ -95,6 +95,8 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
       tsFileResourceList = tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition);
     }
     this.hashCode = this.toString().hashCode();
+    this.innerSeqTask = sequence;
+    this.crossTask = false;
     collectSelectedFilesInfo();
     createSummary();
   }
@@ -246,7 +248,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
       TsFileMetricManager.getInstance()
           .deleteFile(totalSizeOfDeletedFile, sequence, selectedTsFileResourceList.size());
 
-      CompactionMetricsRecorder.updateSummary(summary);
+      CompactionMetrics.getInstance().updateSummary(summary);
 
       double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
       LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
index af90e81cd2..b07010b72a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -145,7 +145,7 @@ public class AlignedSeriesCompactionExecutor {
             readerIterator.nextReader();
         summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum());
         summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum());
-        CompactionMetricsRecorder.recordReadInfo(nextAlignedChunkInfo.getTotalSize());
+        CompactionMetrics.getInstance().recordReadInfo(nextAlignedChunkInfo.getTotalSize());
         compactOneAlignedChunk(
             nextAlignedChunkInfo.getReader(), nextAlignedChunkInfo.getNotNullChunkNum());
       }
@@ -154,11 +154,12 @@ public class AlignedSeriesCompactionExecutor {
     if (remainingPointInChunkWriter != 0L) {
       CompactionTaskManager.mergeRateLimiterAcquire(
           rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-      CompactionMetricsRecorder.recordWriteInfo(
-          CompactionType.INNER_SEQ_COMPACTION,
-          ProcessChunkType.DESERIALIZE_CHUNK,
-          true,
-          chunkWriter.estimateMaxSeriesMemSize());
+      CompactionMetrics.getInstance()
+          .recordWriteInfo(
+              CompactionType.INNER_SEQ_COMPACTION,
+              ProcessChunkType.DESERIALIZE_CHUNK,
+              true,
+              chunkWriter.estimateMaxSeriesMemSize());
       chunkWriter.writeToFileWriter(writer);
     }
     writer.checkMetadataSizeAndMayFlush();
@@ -199,11 +200,12 @@ public class AlignedSeriesCompactionExecutor {
         || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) {
       CompactionTaskManager.mergeRateLimiterAcquire(
           rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-      CompactionMetricsRecorder.recordWriteInfo(
-          CompactionType.INNER_SEQ_COMPACTION,
-          ProcessChunkType.DESERIALIZE_CHUNK,
-          true,
-          chunkWriter.estimateMaxSeriesMemSize());
+      CompactionMetrics.getInstance()
+          .recordWriteInfo(
+              CompactionType.INNER_SEQ_COMPACTION,
+              ProcessChunkType.DESERIALIZE_CHUNK,
+              true,
+              chunkWriter.estimateMaxSeriesMemSize());
       chunkWriter.writeToFileWriter(writer);
       remainingPointInChunkWriter = 0L;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index d5184dcea4..2a278a5fb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -128,9 +128,10 @@ public class SingleSeriesCompactionExecutor {
         if (this.chunkWriter == null) {
           constructChunkWriterFromReadChunk(currentChunk);
         }
-        CompactionMetricsRecorder.recordReadInfo(
-            (long) currentChunk.getHeader().getSerializedSize()
-                + currentChunk.getHeader().getDataSize());
+        CompactionMetrics.getInstance()
+            .recordReadInfo(
+                (long) currentChunk.getHeader().getSerializedSize()
+                    + currentChunk.getHeader().getDataSize());
 
         // if this chunk is modified, deserialize it into points
         if (chunkMetadata.getDeleteIntervalList() != null) {
@@ -321,11 +322,12 @@ public class SingleSeriesCompactionExecutor {
     if (chunkMetadata.getEndTime() > maxEndTimestamp) {
       maxEndTimestamp = chunkMetadata.getEndTime();
     }
-    CompactionMetricsRecorder.recordWriteInfo(
-        CompactionType.INNER_SEQ_COMPACTION,
-        isCachedChunk ? ProcessChunkType.MERGE_CHUNK : ProcessChunkType.FLUSH_CHUNK,
-        false,
-        getChunkSize(chunk));
+    CompactionMetrics.getInstance()
+        .recordWriteInfo(
+            CompactionType.INNER_SEQ_COMPACTION,
+            isCachedChunk ? ProcessChunkType.MERGE_CHUNK : ProcessChunkType.FLUSH_CHUNK,
+            false,
+            getChunkSize(chunk));
     fileWriter.writeChunk(chunk, chunkMetadata);
   }
 
@@ -334,11 +336,12 @@ public class SingleSeriesCompactionExecutor {
         || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
       CompactionTaskManager.mergeRateLimiterAcquire(
           compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-      CompactionMetricsRecorder.recordWriteInfo(
-          CompactionType.INNER_SEQ_COMPACTION,
-          ProcessChunkType.DESERIALIZE_CHUNK,
-          false,
-          chunkWriter.estimateMaxSeriesMemSize());
+      CompactionMetrics.getInstance()
+          .recordWriteInfo(
+              CompactionType.INNER_SEQ_COMPACTION,
+              ProcessChunkType.DESERIALIZE_CHUNK,
+              false,
+              chunkWriter.estimateMaxSeriesMemSize());
       chunkWriter.writeToFileWriter(fileWriter);
       pointCountInChunkWriter = 0L;
     }
@@ -356,11 +359,12 @@ public class SingleSeriesCompactionExecutor {
   private void flushChunkWriter() throws IOException {
     CompactionTaskManager.mergeRateLimiterAcquire(
         compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-    CompactionMetricsRecorder.recordWriteInfo(
-        CompactionType.INNER_SEQ_COMPACTION,
-        ProcessChunkType.DESERIALIZE_CHUNK,
-        false,
-        chunkWriter.estimateMaxSeriesMemSize());
+    CompactionMetrics.getInstance()
+        .recordWriteInfo(
+            CompactionType.INNER_SEQ_COMPACTION,
+            ProcessChunkType.DESERIALIZE_CHUNK,
+            false,
+            chunkWriter.estimateMaxSeriesMemSize());
     chunkWriter.writeToFileWriter(fileWriter);
     pointCountInChunkWriter = 0L;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 92ff3008e5..efa84edce8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
 import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -296,11 +296,14 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
       if (iChunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) {
         sealChunk(fileWriter, iChunkWriter, subTaskId);
         lastCheckIndex = 0;
-        CompactionMetricsRecorder.recordWriteInfo(
-            isCrossSpace ? CompactionType.CROSS_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION,
-            ProcessChunkType.DESERIALIZE_CHUNK,
-            isAlign,
-            iChunkWriter.estimateMaxSeriesMemSize());
+        CompactionMetrics.getInstance()
+            .recordWriteInfo(
+                isCrossSpace
+                    ? CompactionType.CROSS_COMPACTION
+                    : CompactionType.INNER_UNSEQ_COMPACTION,
+                ProcessChunkType.DESERIALIZE_CHUNK,
+                isAlign,
+                iChunkWriter.estimateMaxSeriesMemSize());
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 0e51e046a1..dd5a2aa9e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -31,8 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
 import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
 import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 
 import com.google.common.util.concurrent.RateLimiter;
@@ -97,8 +96,8 @@ public class CompactionTaskManager implements IService {
           AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
       candidateCompactionTaskQueue.regsitPollLastHook(
           x ->
-              CompactionMetricsRecorder.recordTaskInfo(
-                  x, CompactionTaskStatus.POLL_FROM_QUEUE, candidateCompactionTaskQueue.size()));
+              CompactionMetrics.getInstance()
+                  .reportPollTaskFromWaitingQueue(x.isCrossTask(), x.isInnerSeqTask()));
       init = true;
     }
     logger.info("Compaction task manager started.");
@@ -225,8 +224,9 @@ public class CompactionTaskManager implements IService {
       candidateCompactionTaskQueue.put(compactionTask);
 
       // add metrics
-      CompactionMetricsRecorder.recordTaskInfo(
-          compactionTask, CompactionTaskStatus.ADD_TO_QUEUE, candidateCompactionTaskQueue.size());
+      CompactionMetrics.getInstance()
+          .reportAddTaskToWaitingQueue(
+              compactionTask.isCrossTask(), compactionTask.isInnerSeqTask());
 
       return true;
     }
@@ -272,9 +272,6 @@ public class CompactionTaskManager implements IService {
     if (storageGroupTasks.containsKey(regionWithSG)) {
       storageGroupTasks.get(regionWithSG).remove(task);
     }
-    // add metrics
-    CompactionMetricsRecorder.recordTaskInfo(
-        task, CompactionTaskStatus.FINISHED, currentTaskNum.get());
     finishedTaskNum.incrementAndGet();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
index 18e7875349..aa37314eee 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
@@ -20,8 +20,7 @@ package org.apache.iotdb.db.engine.compaction.schedule;
 
 import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
 import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
 import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 
 import org.jetbrains.annotations.NotNull;
@@ -55,10 +54,10 @@ public class CompactionWorker implements Runnable {
           log.warn("CompactionThread-{} terminates because interruption", threadId);
           return;
         }
+        CompactionMetrics.getInstance()
+            .reportPollTaskFromWaitingQueue(task.isCrossTask(), task.isInnerSeqTask());
         if (task != null) {
           // add metrics
-          CompactionMetricsRecorder.recordTaskInfo(
-              task, CompactionTaskStatus.POLL_FROM_QUEUE, compactionTaskQueue.size());
           if (task.checkValidAndSetMerging()) {
             CompactionTaskSummary summary = task.getSummary();
             CompactionTaskFuture future = new CompactionTaskFuture(summary);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index 074b7345a6..55b7b4b303 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet;
 import org.apache.iotdb.db.mpp.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetrics;
 import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
 import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
 
@@ -36,6 +37,7 @@ public class DataNodeMetricsHelper {
     MetricService.getInstance().addMetricSet(new JvmMetrics());
     MetricService.getInstance().addMetricSet(new LogbackMetrics());
     MetricService.getInstance().addMetricSet(new FileMetrics());
+    MetricService.getInstance().addMetricSet(CompactionMetrics.getInstance());
     MetricService.getInstance().addMetricSet(new ProcessMetrics());
     MetricService.getInstance().addMetricSet(new SystemMetrics(true));
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetrics.java
new file mode 100644
index 0000000000..127136dffc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetrics.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.metrics.recorder;
+
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CompactionMetrics implements IMetricSet {
+  private static final CompactionMetrics INSTANCE = new CompactionMetrics();
+  private final AtomicInteger waitingSeqInnerCompactionTaskNum = new AtomicInteger(0);
+  private final AtomicInteger waitingUnseqInnerCompactionTaskNum = new AtomicInteger(0);
+  private final AtomicInteger waitingCrossCompactionTaskNum = new AtomicInteger(0);
+  private final AtomicInteger runningSeqInnerCompactionTaskNum = new AtomicInteger(0);
+  private final AtomicInteger runningUnseqInnerCompactionTaskNum = new AtomicInteger(0);
+  private final AtomicInteger runningCrossCompactionTaskNum = new AtomicInteger(0);
+  private final AtomicInteger finishSeqInnerCompactionTaskNum = new AtomicInteger(0);
+  private final AtomicInteger finishUnseqInnerCompactionTaskNum = new AtomicInteger(0);
+  private final AtomicInteger finishCrossCompactionTaskNum = new AtomicInteger(0);
+
+  private CompactionMetrics() {}
+
+  public static CompactionMetrics getInstance() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindTaskInfo(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbindTaskInfo(metricService);
+  }
+
+  private void bindTaskInfo(AbstractMetricService metricService) {
+    metricService.createAutoGauge(
+        Metric.QUEUE.toString(),
+        MetricLevel.IMPORTANT,
+        this,
+        CompactionMetrics::getWaitingCrossCompactionTaskNum,
+        Tag.NAME.toString(),
+        "compaction_cross",
+        Tag.STATUS.toString(),
+        "waiting");
+    metricService.createAutoGauge(
+        Metric.QUEUE.toString(),
+        MetricLevel.IMPORTANT,
+        this,
+        CompactionMetrics::getWaitingSeqInnerCompactionTaskNum,
+        Tag.NAME.toString(),
+        "compaction_inner_seq",
+        Tag.STATUS.toString(),
+        "waiting");
+    metricService.createAutoGauge(
+        Metric.QUEUE.toString(),
+        MetricLevel.IMPORTANT,
+        this,
+        CompactionMetrics::getWaitingUnseqInnerCompactionTaskNum,
+        Tag.NAME.toString(),
+        "compaction_inner_unseq",
+        Tag.STATUS.toString(),
+        "waiting");
+    metricService.createAutoGauge(
+        Metric.QUEUE.toString(),
+        MetricLevel.IMPORTANT,
+        this,
+        CompactionMetrics::getRunningCrossCompactionTaskNum,
+        Tag.NAME.toString(),
+        "compaction_cross",
+        Tag.STATUS.toString(),
+        "running");
+    metricService.createAutoGauge(
+        Metric.QUEUE.toString(),
+        MetricLevel.IMPORTANT,
+        this,
+        CompactionMetrics::getRunningSeqInnerCompactionTaskNum,
+        Tag.NAME.toString(),
+        "compaction_inner_seq",
+        Tag.STATUS.toString(),
+        "running");
+    metricService.createAutoGauge(
+        Metric.QUEUE.toString(),
+        MetricLevel.IMPORTANT,
+        this,
+        CompactionMetrics::getRunningUnseqInnerCompactionTaskNum,
+        Tag.NAME.toString(),
+        "compaction_inner_unseq",
+        Tag.STATUS.toString(),
+        "running");
+    metricService.createAutoGauge(
+        Metric.COMPACTION_TASK_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        this,
+        CompactionMetrics::getFinishSeqInnerCompactionTaskNum,
+        Tag.NAME.toString(),
+        "inner_seq");
+    metricService.createAutoGauge(
+        Metric.COMPACTION_TASK_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        this,
+        CompactionMetrics::getFinishUnseqInnerCompactionTaskNum,
+        Tag.NAME.toString(),
+        "inner_unseq");
+    metricService.createAutoGauge(
+        Metric.COMPACTION_TASK_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        this,
+        CompactionMetrics::getFinishCrossCompactionTaskNum,
+        Tag.NAME.toString(),
+        "cross");
+  }
+
+  private void unbindTaskInfo(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.QUEUE.toString(),
+        Tag.NAME.toString(),
+        "compaction_cross",
+        Tag.STATUS.toString(),
+        "waiting");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.QUEUE.toString(),
+        Tag.NAME.toString(),
+        "compaction_inner_seq",
+        Tag.STATUS.toString(),
+        "waiting");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.QUEUE.toString(),
+        Tag.NAME.toString(),
+        "compaction_inner_unseq",
+        Tag.STATUS.toString(),
+        "waiting");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.QUEUE.toString(),
+        Tag.NAME.toString(),
+        "compaction_cross",
+        Tag.STATUS.toString(),
+        "running");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.QUEUE.toString(),
+        Tag.NAME.toString(),
+        "compaction_inner_seq",
+        Tag.STATUS.toString(),
+        "running");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.QUEUE.toString(),
+        Tag.NAME.toString(),
+        "compaction_inner_unseq",
+        Tag.STATUS.toString(),
+        "running");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.COMPACTION_TASK_COUNT.toString(),
+        Tag.NAME.toString(),
+        "inner_seq");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.COMPACTION_TASK_COUNT.toString(),
+        Tag.NAME.toString(),
+        "inner_unseq");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.COMPACTION_TASK_COUNT.toString(),
+        Tag.NAME.toString(),
+        "cross");
+  }
+
+  public void recordWriteInfo(
+      CompactionType compactionType,
+      ProcessChunkType processChunkType,
+      boolean aligned,
+      long byteNum) {
+    MetricService.getInstance()
+        .count(
+            byteNum / 1024L,
+            Metric.DATA_WRITTEN.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            "compaction_" + compactionType.toString(),
+            Tag.TYPE.toString(),
+            (aligned ? "ALIGNED" : "NOT_ALIGNED") + "_" + processChunkType.toString());
+    MetricService.getInstance()
+        .count(
+            byteNum / 1024L,
+            Metric.DATA_WRITTEN.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            "compaction",
+            Tag.TYPE.toString(),
+            "total");
+  }
+
+  public void recordReadInfo(long byteNum) {
+    MetricService.getInstance()
+        .count(
+            byteNum,
+            Metric.DATA_READ.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            "compaction");
+  }
+
+  public void updateSummary(CompactionTaskSummary summary) {
+    MetricService.getInstance()
+        .count(
+            summary.getProcessPointNum(),
+            "Compacted_Point_Num",
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            "compaction");
+    MetricService.getInstance()
+        .count(
+            summary.getProcessChunkNum(),
+            "Compacted_Chunk_Num",
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            "compaction");
+    MetricService.getInstance()
+        .count(
+            summary.getDirectlyFlushChunkNum(),
+            "Directly_Flush_Chunk_Num",
+            MetricLevel.NORMAL,
+            Tag.NAME.toString(),
+            "compaction");
+    MetricService.getInstance()
+        .count(
+            summary.getDeserializeChunkCount(),
+            "Deserialized_Chunk_Num",
+            MetricLevel.NORMAL,
+            Tag.NAME.toString(),
+            "compaction");
+    MetricService.getInstance()
+        .count(
+            summary.getMergedChunkNum(),
+            "Merged_Chunk_Num",
+            MetricLevel.NORMAL,
+            Tag.NAME.toString(),
+            "compaction");
+  }
+
+  public void reportAddTaskToWaitingQueue(boolean isCrossTask, boolean isSeq) {
+    if (isCrossTask) {
+      waitingCrossCompactionTaskNum.incrementAndGet();
+    } else if (isSeq) {
+      waitingSeqInnerCompactionTaskNum.incrementAndGet();
+    } else {
+      waitingUnseqInnerCompactionTaskNum.incrementAndGet();
+    }
+  }
+
+  public void reportPollTaskFromWaitingQueue(boolean isCrossTask, boolean isSeq) {
+    if (isCrossTask) {
+      waitingCrossCompactionTaskNum.decrementAndGet();
+    } else if (isSeq) {
+      waitingSeqInnerCompactionTaskNum.decrementAndGet();
+    } else {
+      waitingUnseqInnerCompactionTaskNum.decrementAndGet();
+    }
+  }
+
+  public void reportTaskStartRunning(boolean isCrossTask, boolean isSeq) {
+    if (isCrossTask) {
+      runningCrossCompactionTaskNum.incrementAndGet();
+    } else if (isSeq) {
+      runningSeqInnerCompactionTaskNum.incrementAndGet();
+    } else {
+      runningUnseqInnerCompactionTaskNum.incrementAndGet();
+    }
+  }
+
+  public void reportTaskFinishOrAbort(boolean isCrossTask, boolean isSeq, long timeCost) {
+    if (isCrossTask) {
+      runningCrossCompactionTaskNum.decrementAndGet();
+      finishCrossCompactionTaskNum.incrementAndGet();
+      MetricService.getInstance()
+          .timer(
+              timeCost,
+              TimeUnit.MILLISECONDS,
+              Metric.COST_TASK.toString(),
+              MetricLevel.IMPORTANT,
+              Tag.NAME.toString(),
+              "cross_compaction");
+    } else if (isSeq) {
+      runningSeqInnerCompactionTaskNum.decrementAndGet();
+      finishSeqInnerCompactionTaskNum.incrementAndGet();
+      MetricService.getInstance()
+          .timer(
+              timeCost,
+              TimeUnit.MILLISECONDS,
+              Metric.COST_TASK.toString(),
+              MetricLevel.IMPORTANT,
+              Tag.NAME.toString(),
+              "inner_seq_compaction");
+    } else {
+      runningUnseqInnerCompactionTaskNum.decrementAndGet();
+      finishUnseqInnerCompactionTaskNum.incrementAndGet();
+      MetricService.getInstance()
+          .timer(
+              timeCost,
+              TimeUnit.MILLISECONDS,
+              Metric.COST_TASK.toString(),
+              MetricLevel.IMPORTANT,
+              Tag.NAME.toString(),
+              "inner_unseq_compaction");
+    }
+  }
+
+  public int getWaitingSeqInnerCompactionTaskNum() {
+    return waitingSeqInnerCompactionTaskNum.get();
+  }
+
+  public int getWaitingUnseqInnerCompactionTaskNum() {
+    return waitingUnseqInnerCompactionTaskNum.get();
+  }
+
+  public int getWaitingCrossCompactionTaskNum() {
+    return waitingCrossCompactionTaskNum.get();
+  }
+
+  public int getRunningSeqInnerCompactionTaskNum() {
+    return runningSeqInnerCompactionTaskNum.get();
+  }
+
+  public int getRunningUnseqInnerCompactionTaskNum() {
+    return runningUnseqInnerCompactionTaskNum.get();
+  }
+
+  public int getRunningCrossCompactionTaskNum() {
+    return runningCrossCompactionTaskNum.get();
+  }
+
+  public int getFinishSeqInnerCompactionTaskNum() {
+    return finishSeqInnerCompactionTaskNum.get();
+  }
+
+  public int getFinishUnseqInnerCompactionTaskNum() {
+    return finishUnseqInnerCompactionTaskNum.get();
+  }
+
+  public int getFinishCrossCompactionTaskNum() {
+    return finishCrossCompactionTaskNum.get();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java
deleted file mode 100644
index 568fb76fb0..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service.metrics.recorder;
-
-import org.apache.iotdb.commons.service.metric.MetricService;
-import org.apache.iotdb.commons.service.metric.enums.Metric;
-import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
-import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.engine.compaction.execute.task.CrossSpaceCompactionTask;
-import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
-import org.apache.iotdb.metrics.utils.MetricLevel;
-
-import java.util.concurrent.TimeUnit;
-
-public class CompactionMetricsRecorder {
-
-  public static void recordWriteInfo(
-      CompactionType compactionType,
-      ProcessChunkType processChunkType,
-      boolean aligned,
-      long byteNum) {
-    MetricService.getInstance()
-        .count(
-            byteNum / 1024L,
-            Metric.DATA_WRITTEN.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.NAME.toString(),
-            "compaction_" + compactionType.toString(),
-            Tag.TYPE.toString(),
-            (aligned ? "ALIGNED" : "NOT_ALIGNED") + "_" + processChunkType.toString());
-    MetricService.getInstance()
-        .count(
-            byteNum / 1024L,
-            Metric.DATA_WRITTEN.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.NAME.toString(),
-            "compaction",
-            Tag.TYPE.toString(),
-            "total");
-  }
-
-  public static void recordReadInfo(long byteNum) {
-    MetricService.getInstance()
-        .count(
-            byteNum,
-            Metric.DATA_READ.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.NAME.toString(),
-            "compaction");
-  }
-
-  public static void updateSummary(CompactionTaskSummary summary) {
-    MetricService.getInstance()
-        .count(
-            summary.getProcessPointNum(),
-            "Compacted_Point_Num",
-            MetricLevel.IMPORTANT,
-            Tag.NAME.toString(),
-            "compaction");
-    MetricService.getInstance()
-        .count(
-            summary.getProcessChunkNum(),
-            "Compacted_Chunk_Num",
-            MetricLevel.IMPORTANT,
-            Tag.NAME.toString(),
-            "compaction");
-    MetricService.getInstance()
-        .count(
-            summary.getDirectlyFlushChunkNum(),
-            "Directly_Flush_Chunk_Num",
-            MetricLevel.NORMAL,
-            Tag.NAME.toString(),
-            "compaction");
-    MetricService.getInstance()
-        .count(
-            summary.getDeserializeChunkCount(),
-            "Deserialized_Chunk_Num",
-            MetricLevel.NORMAL,
-            Tag.NAME.toString(),
-            "compaction");
-    MetricService.getInstance()
-        .count(
-            summary.getMergedChunkNum(),
-            "Merged_Chunk_Num",
-            MetricLevel.NORMAL,
-            Tag.NAME.toString(),
-            "compaction");
-  }
-
-  public static void recordTaskInfo(
-      AbstractCompactionTask task, CompactionTaskStatus status, int size) {
-    String taskType = "unknown";
-    boolean isInnerTask = false;
-    if (task instanceof InnerSpaceCompactionTask) {
-      isInnerTask = true;
-      taskType = "inner";
-    } else if (task instanceof CrossSpaceCompactionTask) {
-      taskType = "cross";
-    }
-
-    switch (status) {
-      case ADD_TO_QUEUE:
-      case POLL_FROM_QUEUE:
-        MetricService.getInstance()
-            .getOrCreateGauge(
-                Metric.QUEUE.toString(),
-                MetricLevel.IMPORTANT,
-                Tag.NAME.toString(),
-                "compaction_" + taskType,
-                Tag.STATUS.toString(),
-                "waiting")
-            .set(size);
-        break;
-      case READY_TO_EXECUTE:
-        MetricService.getInstance()
-            .getOrCreateGauge(
-                Metric.QUEUE.toString(),
-                MetricLevel.IMPORTANT,
-                Tag.NAME.toString(),
-                "compaction_" + taskType,
-                Tag.STATUS.toString(),
-                "running")
-            .set(size);
-        break;
-      case FINISHED:
-        MetricService.getInstance()
-            .getOrCreateGauge(
-                Metric.QUEUE.toString(),
-                MetricLevel.IMPORTANT,
-                Tag.NAME.toString(),
-                "compaction_" + taskType,
-                Tag.STATUS.toString(),
-                "running")
-            .set(size);
-        MetricService.getInstance()
-            .timer(
-                task.getTimeCost(),
-                TimeUnit.MILLISECONDS,
-                Metric.COST_TASK.toString(),
-                MetricLevel.IMPORTANT,
-                Tag.NAME.toString(),
-                isInnerTask ? "inner_compaction" : "cross_compaction");
-        if (isInnerTask) {
-          MetricService.getInstance()
-              .count(
-                  1,
-                  Metric.COMPACTION_TASK_COUNT.toString(),
-                  MetricLevel.IMPORTANT,
-                  Tag.NAME.toString(),
-                  "inner_compaction",
-                  Tag.TYPE.toString(),
-                  ((InnerSpaceCompactionTask) task).isSequence() ? "sequence" : "unsequence");
-        } else {
-          MetricService.getInstance()
-              .count(
-                  1,
-                  Metric.COMPACTION_TASK_COUNT.toString(),
-                  MetricLevel.IMPORTANT,
-                  Tag.NAME.toString(),
-                  "cross_compaction",
-                  Tag.TYPE.toString(),
-                  "cross");
-        }
-        break;
-      default:
-        // do nothing
-        break;
-    }
-  }
-}