You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/10 08:27:32 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] Refactor compaction task metrics (#9783)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new e917076637 [To rel/1.1] Refactor compaction task metrics (#9783)
e917076637 is described below
commit e917076637fddc26ba0450607d0c33b82efb2ae3
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Wed May 10 16:27:24 2023 +0800
[To rel/1.1] Refactor compaction task metrics (#9783)
---
.../compaction/constant/CompactionTaskStatus.java | 27 ++++++++
.../compaction/constant/CompactionTaskType.java | 26 +++++++
.../execute/task/AbstractCompactionTask.java | 1 -
.../compaction/schedule/CompactionTaskManager.java | 70 ++++++++++++++++---
.../compaction/schedule/CompactionWorker.java | 3 -
.../metrics/recorder/CompactionMetricsManager.java | 81 +++++++++++++---------
.../datastructure/FixedPriorityBlockingQueue.java | 10 +++
7 files changed, 171 insertions(+), 47 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskStatus.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskStatus.java
new file mode 100644
index 0000000000..c6b3d3323f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskStatus.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.constant;
+
+public enum CompactionTaskStatus {
+ WAITING,
+ RUNNING,
+ FINISHED,
+ ABORTED
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskType.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskType.java
new file mode 100644
index 0000000000..c7805b52ab
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/constant/CompactionTaskType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.constant;
+
+public enum CompactionTaskType {
+ INNER_SEQ,
+ INNER_UNSEQ,
+ CROSS
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
index 7fb2d23dc8..ea389b47df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
@@ -74,7 +74,6 @@ public abstract class AbstractCompactionTask {
public boolean start() {
currentTaskNum.incrementAndGet();
boolean isSuccess = false;
- CompactionMetricsManager.getInstance().reportTaskStartRunning(crossTask, innerSeqTask);
try {
summary.start();
isSuccess = doCompaction();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 04f2cda472..158cdaa34f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -28,10 +28,12 @@ import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
+import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskType;
import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask;
import org.apache.iotdb.db.engine.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import com.google.common.util.concurrent.RateLimiter;
@@ -39,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -96,10 +99,6 @@ public class CompactionTaskManager implements IService {
currentTaskNum = new AtomicInteger(0);
candidateCompactionTaskQueue.regsitPollLastHook(
AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
- candidateCompactionTaskQueue.regsitPollLastHook(
- x ->
- CompactionMetricsManager.getInstance()
- .reportPollTaskFromWaitingQueue(x.isCrossTask(), x.isInnerSeqTask()));
init = true;
}
logger.info("Compaction task manager started.");
@@ -225,11 +224,6 @@ public class CompactionTaskManager implements IService {
compactionTask.setSourceFilesToCompactionCandidate();
candidateCompactionTaskQueue.put(compactionTask);
- // add metrics
- CompactionMetricsManager.getInstance()
- .reportAddTaskToWaitingQueue(
- compactionTask.isCrossTask(), compactionTask.isInnerSeqTask());
-
return true;
}
return false;
@@ -351,6 +345,62 @@ public class CompactionTaskManager implements IService {
.put(task, summary);
}
+ private void getWaitingTaskStatus(
+ Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> statistic) {
+ List<AbstractCompactionTask> waitingTaskList =
+ this.candidateCompactionTaskQueue.getAllElementAsList();
+ for (AbstractCompactionTask task : waitingTaskList) {
+ if (task instanceof InnerSpaceCompactionTask) {
+ statistic
+ .computeIfAbsent(
+ task.isInnerSeqTask()
+ ? CompactionTaskType.INNER_SEQ
+ : CompactionTaskType.INNER_UNSEQ,
+ x -> new EnumMap<>(CompactionTaskStatus.class))
+ .compute(CompactionTaskStatus.WAITING, (k, v) -> v == null ? 1 : v + 1);
+ } else {
+ statistic
+ .computeIfAbsent(
+ CompactionTaskType.CROSS, x -> new EnumMap<>(CompactionTaskStatus.class))
+ .compute(CompactionTaskStatus.WAITING, (k, v) -> v == null ? 1 : v + 1);
+ }
+ }
+ }
+
+ private void getRunningTaskStatus(
+ Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> statistic) {
+ List<AbstractCompactionTask> runningTaskList = this.getRunningCompactionTaskList();
+ for (AbstractCompactionTask task : runningTaskList) {
+ if (task instanceof InnerSpaceCompactionTask) {
+ statistic
+ .computeIfAbsent(
+ task.isInnerSeqTask()
+ ? CompactionTaskType.INNER_SEQ
+ : CompactionTaskType.INNER_UNSEQ,
+ x -> new EnumMap<>(CompactionTaskStatus.class))
+ .compute(CompactionTaskStatus.RUNNING, (k, v) -> v == null ? 1 : v + 1);
+ } else {
+ statistic
+ .computeIfAbsent(
+ CompactionTaskType.CROSS, x -> new EnumMap<>(CompactionTaskStatus.class))
+ .compute(CompactionTaskStatus.RUNNING, (k, v) -> v == null ? 1 : v + 1);
+ }
+ }
+ }
+
+ public Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> getCompactionTaskStatistic() {
+ Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> statistic =
+ new EnumMap<>(CompactionTaskType.class);
+
+ // update statistic of waiting tasks
+ getWaitingTaskStatus(statistic);
+
+ // update statistic of running tasks
+ getRunningTaskStatus(statistic);
+
+ return statistic;
+ }
+
public static String getSGWithRegionId(String storageGroupName, String dataRegionId) {
return storageGroupName + "-" + dataRegionId;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
index b38f247f01..3c1cbad6c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.schedule;
import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import org.jetbrains.annotations.NotNull;
@@ -54,8 +53,6 @@ public class CompactionWorker implements Runnable {
log.warn("CompactionThread-{} terminates because interruption", threadId);
return;
}
- CompactionMetricsManager.getInstance()
- .reportPollTaskFromWaitingQueue(task.isCrossTask(), task.isInnerSeqTask());
if (task != null) {
// add metrics
if (task.checkValidAndSetMerging()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java
index 24a466b341..45b51b80ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java
@@ -21,16 +21,23 @@ package org.apache.iotdb.db.service.metrics.recorder;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
+import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskType;
import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
import org.apache.iotdb.metrics.utils.MetricLevel;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class CompactionMetricsManager {
private static final CompactionMetricsManager INSTANCE = new CompactionMetricsManager();
+ private long lastUpdateTime = 0L;
+ private static final long UPDATE_INTERVAL = 10_000L;
private final AtomicInteger waitingSeqInnerCompactionTaskNum = new AtomicInteger(0);
private final AtomicInteger waitingUnseqInnerCompactionTaskNum = new AtomicInteger(0);
private final AtomicInteger waitingCrossCompactionTaskNum = new AtomicInteger(0);
@@ -120,39 +127,8 @@ public class CompactionMetricsManager {
"compaction");
}
- public void reportAddTaskToWaitingQueue(boolean isCrossTask, boolean isSeq) {
- if (isCrossTask) {
- waitingCrossCompactionTaskNum.incrementAndGet();
- } else if (isSeq) {
- waitingSeqInnerCompactionTaskNum.incrementAndGet();
- } else {
- waitingUnseqInnerCompactionTaskNum.incrementAndGet();
- }
- }
-
- public void reportPollTaskFromWaitingQueue(boolean isCrossTask, boolean isSeq) {
- if (isCrossTask) {
- waitingCrossCompactionTaskNum.decrementAndGet();
- } else if (isSeq) {
- waitingSeqInnerCompactionTaskNum.decrementAndGet();
- } else {
- waitingUnseqInnerCompactionTaskNum.decrementAndGet();
- }
- }
-
- public void reportTaskStartRunning(boolean isCrossTask, boolean isSeq) {
- if (isCrossTask) {
- runningCrossCompactionTaskNum.incrementAndGet();
- } else if (isSeq) {
- runningSeqInnerCompactionTaskNum.incrementAndGet();
- } else {
- runningUnseqInnerCompactionTaskNum.incrementAndGet();
- }
- }
-
public void reportTaskFinishOrAbort(boolean isCrossTask, boolean isSeq, long timeCost) {
if (isCrossTask) {
- runningCrossCompactionTaskNum.decrementAndGet();
finishCrossCompactionTaskNum.incrementAndGet();
MetricService.getInstance()
.timer(
@@ -163,7 +139,6 @@ public class CompactionMetricsManager {
Tag.NAME.toString(),
"cross_compaction");
} else if (isSeq) {
- runningSeqInnerCompactionTaskNum.decrementAndGet();
finishSeqInnerCompactionTaskNum.incrementAndGet();
MetricService.getInstance()
.timer(
@@ -174,7 +149,6 @@ public class CompactionMetricsManager {
Tag.NAME.toString(),
"inner_seq_compaction");
} else {
- runningUnseqInnerCompactionTaskNum.decrementAndGet();
finishUnseqInnerCompactionTaskNum.incrementAndGet();
MetricService.getInstance()
.timer(
@@ -192,34 +166,75 @@ public class CompactionMetricsManager {
}
public int getWaitingUnseqInnerCompactionTaskNum() {
+ updateCompactionTaskInfo();
return waitingUnseqInnerCompactionTaskNum.get();
}
public int getWaitingCrossCompactionTaskNum() {
+ updateCompactionTaskInfo();
return waitingCrossCompactionTaskNum.get();
}
public int getRunningSeqInnerCompactionTaskNum() {
+ updateCompactionTaskInfo();
return runningSeqInnerCompactionTaskNum.get();
}
public int getRunningUnseqInnerCompactionTaskNum() {
+ updateCompactionTaskInfo();
return runningUnseqInnerCompactionTaskNum.get();
}
public int getRunningCrossCompactionTaskNum() {
+ updateCompactionTaskInfo();
return runningCrossCompactionTaskNum.get();
}
public int getFinishSeqInnerCompactionTaskNum() {
+ updateCompactionTaskInfo();
return finishSeqInnerCompactionTaskNum.get();
}
public int getFinishUnseqInnerCompactionTaskNum() {
+ updateCompactionTaskInfo();
return finishUnseqInnerCompactionTaskNum.get();
}
public int getFinishCrossCompactionTaskNum() {
+ updateCompactionTaskInfo();
return finishCrossCompactionTaskNum.get();
}
+
+ private void updateCompactionTaskInfo() {
+ if (System.currentTimeMillis() - lastUpdateTime < UPDATE_INTERVAL) {
+ return;
+ }
+ lastUpdateTime = System.currentTimeMillis();
+ Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> compactionTaskStatisticMap =
+ CompactionTaskManager.getInstance().getCompactionTaskStatistic();
+ this.waitingSeqInnerCompactionTaskNum.set(
+ compactionTaskStatisticMap
+ .getOrDefault(CompactionTaskType.INNER_SEQ, Collections.emptyMap())
+ .getOrDefault(CompactionTaskStatus.WAITING, 0));
+ this.waitingUnseqInnerCompactionTaskNum.set(
+ compactionTaskStatisticMap
+ .getOrDefault(CompactionTaskType.INNER_UNSEQ, Collections.emptyMap())
+ .getOrDefault(CompactionTaskStatus.WAITING, 0));
+ this.waitingCrossCompactionTaskNum.set(
+ compactionTaskStatisticMap
+ .getOrDefault(CompactionTaskType.CROSS, Collections.emptyMap())
+ .getOrDefault(CompactionTaskStatus.WAITING, 0));
+ this.runningSeqInnerCompactionTaskNum.set(
+ compactionTaskStatisticMap
+ .getOrDefault(CompactionTaskType.INNER_SEQ, Collections.emptyMap())
+ .getOrDefault(CompactionTaskStatus.RUNNING, 0));
+ this.runningUnseqInnerCompactionTaskNum.set(
+ compactionTaskStatisticMap
+ .getOrDefault(CompactionTaskType.INNER_UNSEQ, Collections.emptyMap())
+ .getOrDefault(CompactionTaskStatus.RUNNING, 0));
+ this.runningCrossCompactionTaskNum.set(
+ compactionTaskStatisticMap
+ .getOrDefault(CompactionTaskType.CROSS, Collections.emptyMap())
+ .getOrDefault(CompactionTaskStatus.RUNNING, 0));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java
index b8a1b04cf4..86d659c991 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
import com.google.common.collect.MinMaxPriorityQueue;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -187,4 +188,13 @@ public class FixedPriorityBlockingQueue<T> {
public String toString() {
return queue.toString();
}
+
+ public List<T> getAllElementAsList() {
+ this.lock.lock();
+ try {
+ return new ArrayList<>(queue);
+ } finally {
+ this.lock.unlock();
+ }
+ }
}