You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/10 08:27:32 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1] Refactor compaction task metrics (#9783)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new e917076637 [To rel/1.1] Refactor compaction task metrics (#9783)
e917076637 is described below

commit e917076637fddc26ba0450607d0c33b82efb2ae3
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Wed May 10 16:27:24 2023 +0800

    [To rel/1.1] Refactor compaction task metrics (#9783)
---
 .../compaction/constant/CompactionTaskStatus.java  | 27 ++++++++
 .../compaction/constant/CompactionTaskType.java    | 26 +++++++
 .../execute/task/AbstractCompactionTask.java       |  1 -
 .../compaction/schedule/CompactionTaskManager.java | 70 ++++++++++++++++---
 .../compaction/schedule/CompactionWorker.java      |  3 -
 .../metrics/recorder/CompactionMetricsManager.java | 81 +++++++++++++---------
 .../datastructure/FixedPriorityBlockingQueue.java  | 10 +++
 7 files changed, 171 insertions(+), 47 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskStatus.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskStatus.java
new file mode 100644
index 0000000000..c6b3d3323f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskStatus.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.constant;
+
+public enum CompactionTaskStatus {
+  WAITING,
+  RUNNING,
+  FINISHED,
+  ABORTED
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskType.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskType.java
new file mode 100644
index 0000000000..c7805b52ab
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.constant;
+
+public enum CompactionTaskType {
+  INNER_SEQ,
+  INNER_UNSEQ,
+  CROSS
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
index 7fb2d23dc8..ea389b47df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
@@ -74,7 +74,6 @@ public abstract class AbstractCompactionTask {
   public boolean start() {
     currentTaskNum.incrementAndGet();
     boolean isSuccess = false;
-    CompactionMetricsManager.getInstance().reportTaskStartRunning(crossTask, innerSeqTask);
     try {
       summary.start();
       isSuccess = doCompaction();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 04f2cda472..158cdaa34f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -28,10 +28,12 @@ import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
+import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskType;
 import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
 import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
 import org.apache.iotdb.db.engine.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
 import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 
 import com.google.common.util.concurrent.RateLimiter;
@@ -39,6 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -96,10 +99,6 @@ public class CompactionTaskManager implements IService {
       currentTaskNum = new AtomicInteger(0);
       candidateCompactionTaskQueue.regsitPollLastHook(
           AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
-      candidateCompactionTaskQueue.regsitPollLastHook(
-          x ->
-              CompactionMetricsManager.getInstance()
-                  .reportPollTaskFromWaitingQueue(x.isCrossTask(), x.isInnerSeqTask()));
       init = true;
     }
     logger.info("Compaction task manager started.");
@@ -225,11 +224,6 @@ public class CompactionTaskManager implements IService {
       compactionTask.setSourceFilesToCompactionCandidate();
       candidateCompactionTaskQueue.put(compactionTask);
 
-      // add metrics
-      CompactionMetricsManager.getInstance()
-          .reportAddTaskToWaitingQueue(
-              compactionTask.isCrossTask(), compactionTask.isInnerSeqTask());
-
       return true;
     }
     return false;
@@ -351,6 +345,62 @@ public class CompactionTaskManager implements IService {
         .put(task, summary);
   }
 
+  private void getWaitingTaskStatus(
+      Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> statistic) {
+    List<AbstractCompactionTask> waitingTaskList =
+        this.candidateCompactionTaskQueue.getAllElementAsList();
+    for (AbstractCompactionTask task : waitingTaskList) {
+      if (task instanceof InnerSpaceCompactionTask) {
+        statistic
+            .computeIfAbsent(
+                task.isInnerSeqTask()
+                    ? CompactionTaskType.INNER_SEQ
+                    : CompactionTaskType.INNER_UNSEQ,
+                x -> new EnumMap<>(CompactionTaskStatus.class))
+            .compute(CompactionTaskStatus.WAITING, (k, v) -> v == null ? 1 : v + 1);
+      } else {
+        statistic
+            .computeIfAbsent(
+                CompactionTaskType.CROSS, x -> new EnumMap<>(CompactionTaskStatus.class))
+            .compute(CompactionTaskStatus.WAITING, (k, v) -> v == null ? 1 : v + 1);
+      }
+    }
+  }
+
+  private void getRunningTaskStatus(
+      Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> statistic) {
+    List<AbstractCompactionTask> runningTaskList = this.getRunningCompactionTaskList();
+    for (AbstractCompactionTask task : runningTaskList) {
+      if (task instanceof InnerSpaceCompactionTask) {
+        statistic
+            .computeIfAbsent(
+                task.isInnerSeqTask()
+                    ? CompactionTaskType.INNER_SEQ
+                    : CompactionTaskType.INNER_UNSEQ,
+                x -> new EnumMap<>(CompactionTaskStatus.class))
+            .compute(CompactionTaskStatus.RUNNING, (k, v) -> v == null ? 1 : v + 1);
+      } else {
+        statistic
+            .computeIfAbsent(
+                CompactionTaskType.CROSS, x -> new EnumMap<>(CompactionTaskStatus.class))
+            .compute(CompactionTaskStatus.RUNNING, (k, v) -> v == null ? 1 : v + 1);
+      }
+    }
+  }
+
+  public Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> getCompactionTaskStatistic() {
+    Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> statistic =
+        new EnumMap<>(CompactionTaskType.class);
+
+    // update statistic of waiting tasks
+    getWaitingTaskStatus(statistic);
+
+    // update statistic of running tasks
+    getRunningTaskStatus(statistic);
+
+    return statistic;
+  }
+
   public static String getSGWithRegionId(String storageGroupName, String dataRegionId) {
     return storageGroupName + "-" + dataRegionId;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
index b38f247f01..3c1cbad6c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.schedule;
 
 import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
 import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
 import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 
 import org.jetbrains.annotations.NotNull;
@@ -54,8 +53,6 @@ public class CompactionWorker implements Runnable {
           log.warn("CompactionThread-{} terminates because interruption", threadId);
           return;
         }
-        CompactionMetricsManager.getInstance()
-            .reportPollTaskFromWaitingQueue(task.isCrossTask(), task.isInnerSeqTask());
         if (task != null) {
           // add metrics
           if (task.checkValidAndSetMerging()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java
index 24a466b341..45b51b80ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java
@@ -21,16 +21,23 @@ package org.apache.iotdb.db.service.metrics.recorder;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
+import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskType;
 import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class CompactionMetricsManager {
   private static final CompactionMetricsManager INSTANCE = new CompactionMetricsManager();
+  private long lastUpdateTime = 0L;
+  private static final long UPDATE_INTERVAL = 10_000L;
   private final AtomicInteger waitingSeqInnerCompactionTaskNum = new AtomicInteger(0);
   private final AtomicInteger waitingUnseqInnerCompactionTaskNum = new AtomicInteger(0);
   private final AtomicInteger waitingCrossCompactionTaskNum = new AtomicInteger(0);
@@ -120,39 +127,8 @@ public class CompactionMetricsManager {
             "compaction");
   }
 
-  public void reportAddTaskToWaitingQueue(boolean isCrossTask, boolean isSeq) {
-    if (isCrossTask) {
-      waitingCrossCompactionTaskNum.incrementAndGet();
-    } else if (isSeq) {
-      waitingSeqInnerCompactionTaskNum.incrementAndGet();
-    } else {
-      waitingUnseqInnerCompactionTaskNum.incrementAndGet();
-    }
-  }
-
-  public void reportPollTaskFromWaitingQueue(boolean isCrossTask, boolean isSeq) {
-    if (isCrossTask) {
-      waitingCrossCompactionTaskNum.decrementAndGet();
-    } else if (isSeq) {
-      waitingSeqInnerCompactionTaskNum.decrementAndGet();
-    } else {
-      waitingUnseqInnerCompactionTaskNum.decrementAndGet();
-    }
-  }
-
-  public void reportTaskStartRunning(boolean isCrossTask, boolean isSeq) {
-    if (isCrossTask) {
-      runningCrossCompactionTaskNum.incrementAndGet();
-    } else if (isSeq) {
-      runningSeqInnerCompactionTaskNum.incrementAndGet();
-    } else {
-      runningUnseqInnerCompactionTaskNum.incrementAndGet();
-    }
-  }
-
   public void reportTaskFinishOrAbort(boolean isCrossTask, boolean isSeq, long timeCost) {
     if (isCrossTask) {
-      runningCrossCompactionTaskNum.decrementAndGet();
       finishCrossCompactionTaskNum.incrementAndGet();
       MetricService.getInstance()
           .timer(
@@ -163,7 +139,6 @@ public class CompactionMetricsManager {
               Tag.NAME.toString(),
               "cross_compaction");
     } else if (isSeq) {
-      runningSeqInnerCompactionTaskNum.decrementAndGet();
       finishSeqInnerCompactionTaskNum.incrementAndGet();
       MetricService.getInstance()
           .timer(
@@ -174,7 +149,6 @@ public class CompactionMetricsManager {
               Tag.NAME.toString(),
               "inner_seq_compaction");
     } else {
-      runningUnseqInnerCompactionTaskNum.decrementAndGet();
       finishUnseqInnerCompactionTaskNum.incrementAndGet();
       MetricService.getInstance()
           .timer(
@@ -192,34 +166,75 @@ public class CompactionMetricsManager {
   }
 
   public int getWaitingUnseqInnerCompactionTaskNum() {
+    updateCompactionTaskInfo();
     return waitingUnseqInnerCompactionTaskNum.get();
   }
 
   public int getWaitingCrossCompactionTaskNum() {
+    updateCompactionTaskInfo();
     return waitingCrossCompactionTaskNum.get();
   }
 
   public int getRunningSeqInnerCompactionTaskNum() {
+    updateCompactionTaskInfo();
     return runningSeqInnerCompactionTaskNum.get();
   }
 
   public int getRunningUnseqInnerCompactionTaskNum() {
+    updateCompactionTaskInfo();
     return runningUnseqInnerCompactionTaskNum.get();
   }
 
   public int getRunningCrossCompactionTaskNum() {
+    updateCompactionTaskInfo();
     return runningCrossCompactionTaskNum.get();
   }
 
   public int getFinishSeqInnerCompactionTaskNum() {
+    updateCompactionTaskInfo();
     return finishSeqInnerCompactionTaskNum.get();
   }
 
   public int getFinishUnseqInnerCompactionTaskNum() {
+    updateCompactionTaskInfo();
     return finishUnseqInnerCompactionTaskNum.get();
   }
 
   public int getFinishCrossCompactionTaskNum() {
+    updateCompactionTaskInfo();
     return finishCrossCompactionTaskNum.get();
   }
+
+  private void updateCompactionTaskInfo() {
+    if (System.currentTimeMillis() - lastUpdateTime < UPDATE_INTERVAL) {
+      return;
+    }
+    lastUpdateTime = System.currentTimeMillis();
+    Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> compactionTaskStatisticMap =
+        CompactionTaskManager.getInstance().getCompactionTaskStatistic();
+    this.waitingSeqInnerCompactionTaskNum.set(
+        compactionTaskStatisticMap
+            .getOrDefault(CompactionTaskType.INNER_SEQ, Collections.emptyMap())
+            .getOrDefault(CompactionTaskStatus.WAITING, 0));
+    this.waitingUnseqInnerCompactionTaskNum.set(
+        compactionTaskStatisticMap
+            .getOrDefault(CompactionTaskType.INNER_UNSEQ, Collections.emptyMap())
+            .getOrDefault(CompactionTaskStatus.WAITING, 0));
+    this.waitingCrossCompactionTaskNum.set(
+        compactionTaskStatisticMap
+            .getOrDefault(CompactionTaskType.CROSS, Collections.emptyMap())
+            .getOrDefault(CompactionTaskStatus.WAITING, 0));
+    this.runningSeqInnerCompactionTaskNum.set(
+        compactionTaskStatisticMap
+            .getOrDefault(CompactionTaskType.INNER_SEQ, Collections.emptyMap())
+            .getOrDefault(CompactionTaskStatus.RUNNING, 0));
+    this.runningUnseqInnerCompactionTaskNum.set(
+        compactionTaskStatisticMap
+            .getOrDefault(CompactionTaskType.INNER_UNSEQ, Collections.emptyMap())
+            .getOrDefault(CompactionTaskStatus.RUNNING, 0));
+    this.runningCrossCompactionTaskNum.set(
+        compactionTaskStatisticMap
+            .getOrDefault(CompactionTaskType.CROSS, Collections.emptyMap())
+            .getOrDefault(CompactionTaskStatus.RUNNING, 0));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java
index b8a1b04cf4..86d659c991 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
 
 import com.google.common.collect.MinMaxPriorityQueue;
 
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -187,4 +188,13 @@ public class FixedPriorityBlockingQueue<T> {
   public String toString() {
     return queue.toString();
   }
+
+  public List<T> getAllElementAsList() {
+    this.lock.lock();
+    try {
+      return new ArrayList<>(queue);
+    } finally {
+      this.lock.unlock();
+    }
+  }
 }