You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/12/20 09:55:33 UTC
[iotdb] 01/01: Revert "[IOTDB-2095] [To rel/0.12] Monitoring compaction performance (#4521)"
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch revert-4521-rel_0.12_compaction_monitor
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8c1a7f44b50bdfff1e5415d7bf6f1fae4fd76ef4
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Mon Dec 20 17:55:02 2021 +0800
Revert "[IOTDB-2095] [To rel/0.12] Monitoring compaction performance (#4521)"
This reverts commit 8fdaf7934119c8969323aae8b1300caa67116a65.
---
.../resources/conf/iotdb-engine.properties | 7 -
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 -
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 -
.../compaction/CompactionMergeTaskPoolManager.java | 4 +-
.../level/LevelCompactionTsFileManagement.java | 9 -
.../compaction/monitor/CompactionMonitor.java | 519 ---------------------
.../iotdb/db/engine/merge/task/MergeTask.java | 18 -
.../java/org/apache/iotdb/db/service/IoTDB.java | 5 -
.../org/apache/iotdb/db/service/ServiceType.java | 1 -
9 files changed, 2 insertions(+), 594 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 350b000..8e63e07 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -191,13 +191,6 @@ timestamp_precision=ms
# It may cause memTable size smaller if it is a large value
# seq_tsfile_size=1
-# Enable Compaction Monitor to monitor the status of the compaction thread
-# enable_compaction_monitor=false
-
-# Monitoring period of Compaction Monitor
-# Datatype: long, Unit: ms
-# compaction_monitor_period=60000
-
# Size of log buffer in each metadata operation plan(in byte).
# If the size of a metadata operation plan is larger than this parameter, then it will be rejected by MManager
# If it sets a value smaller than 0, use the default value 1024*1024
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 585c2bc..756b88d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -370,12 +370,6 @@ public class IoTDBConfig {
*/
private int maxSelectUnseqFileNumInEachUnseqCompaction = 2000;
- /** Enable Compaction Monitor to monitor the status of the compaction thread */
- private boolean enableCompactionMonitor = false;
-
- /** Monitoring period of Compaction Monitor */
- private long compactionMonitorPeriod = 60000L;
-
/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
private boolean metaDataCacheEnable = true;
@@ -2275,20 +2269,4 @@ public class IoTDBConfig {
public void setAdminPassword(String adminPassword) {
this.adminPassword = adminPassword;
}
-
- public boolean isEnableCompactionMonitor() {
- return enableCompactionMonitor;
- }
-
- public void setEnableCompactionMonitor(boolean enableCompactionMonitor) {
- this.enableCompactionMonitor = enableCompactionMonitor;
- }
-
- public long getCompactionMonitorPeriod() {
- return compactionMonitorPeriod;
- }
-
- public void setCompactionMonitorPeriod(long compactionMonitorPeriod) {
- this.compactionMonitorPeriod = compactionMonitorPeriod;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7e81086..8df773d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -357,17 +357,6 @@ public class IoTDBDescriptor {
"max_select_unseq_file_num_in_each_unseq_compaction",
Integer.toString(conf.getMaxSelectUnseqFileNumInEachUnseqCompaction()))));
- conf.setEnableCompactionMonitor(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_compaction_monitor",
- Boolean.toString(conf.isEnableCompactionMonitor()))));
-
- conf.setCompactionMonitorPeriod(
- Long.parseLong(
- properties.getProperty(
- "compaction_monitor_period", Long.toString(conf.getCompactionMonitorPeriod()))));
-
conf.setUnseqFileNumInEachLevel(
Integer.parseInt(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 1098faa..2d21074 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -65,10 +65,10 @@ public class CompactionMergeTaskPoolManager implements IService {
@Override
public void start() {
if (pool == null) {
- int threadNum = IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum();
pool =
IoTDBThreadPoolFactory.newScheduledThreadPool(
- threadNum, ThreadName.COMPACTION_SERVICE.getName());
+ IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
+ ThreadName.COMPACTION_SERVICE.getName());
}
logger.info("Compaction task manager started.");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 8de5e80..0e1c12a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
-import org.apache.iotdb.db.engine.compaction.monitor.CompactionMonitor;
import org.apache.iotdb.db.engine.compaction.utils.CompactionFileInfo;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
@@ -602,10 +601,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
} finally {
compactionSelectionLock.unlock();
}
- if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMonitor()) {
- CompactionMonitor.getInstance()
- .reportCompactionStatus(storageGroupName, i, toMergeTsFiles.size(), true);
- }
compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
// log source file list and target file for recover
for (TsFileResource mergeResource : toMergeTsFiles) {
@@ -682,10 +677,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (logFile.exists()) {
Files.delete(logFile.toPath());
}
- if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMonitor()) {
- CompactionMonitor.getInstance()
- .reportCompactionStatus(storageGroupName, i, toMergeTsFiles.size(), false);
- }
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/monitor/CompactionMonitor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/monitor/CompactionMonitor.java
deleted file mode 100644
index 2cfdc89..0000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/monitor/CompactionMonitor.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.compaction.monitor;
-
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.ShutdownException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.service.IService;
-import org.apache.iotdb.db.service.ServiceType;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class CompactionMonitor implements IService {
- private final Logger LOGGER = LoggerFactory.getLogger(CompactionMonitor.class);
- private static final CompactionMonitor INSTANCE = new CompactionMonitor();
- // These are constant
- private static final String MONITOR_SG_NAME = "root.compaction_monitor";
- private static final String COMPACTION_CPU_CONSUMPTION_DEVICE =
- MONITOR_SG_NAME.concat(".compaction.cpu");
- private static final String MERGE_CPU_CONSUMPTION_DEVICE = MONITOR_SG_NAME.concat(".merge.cpu");
- private static final String COMPACTION_BEGIN_FILE_NUM_DEVICE =
- MONITOR_SG_NAME.concat("%s.compaction.begin.files");
- private static final String COMPACTION_BEGIN_TASK_NUM_DEVICE =
- MONITOR_SG_NAME.concat("%s.compaction.begin.task_num");
- private static final String COMPACTION_FINISH_FILE_NUM_DEVICE =
- MONITOR_SG_NAME.concat("%s.compaction.finish.files");
- private static final String COMPACTION_FINISH_TASK_NUM_DEVICE =
- MONITOR_SG_NAME.concat("%s.compaction.finish.task_num");
- private static final String MERGE_BEGIN_FILE_NUM_DEVICE =
- MONITOR_SG_NAME.concat("%s.merge.begin.files");
- private static final String MERGE_BEGIN_TASK_NUM_DEVICE =
- MONITOR_SG_NAME.concat("%s.merge.begin.task_num");
- private static final String MERGE_FINISH_FILE_NUM_DEVICE =
- MONITOR_SG_NAME.concat("%s.merge.finish.files");
- private static final String MERGE_FINISH_TASK_NUM_DEVICE =
- MONITOR_SG_NAME.concat("%s.merge.finish.task_num");
- private static final String COMPACTION_CPU_CONSUMPTION_SUM_MEASUREMENT = "Compaction-Total";
- private static final String MERGE_CPU_CONSUMPTION_SUM_MEASUREMENT = "Merge-Total";
- private static final long COMPACTION_CPU_CONSUMPTION_TOTAL_MAP_KEY = -1;
- private static final long MERGE_CPU_CONSUMPTION_TOTAL_MAP_KEY = -2;
-
- private ScheduledExecutorService threadPool =
- IoTDBThreadPoolFactory.newScheduledThreadPool(1, "CompactionMonitor");
- private long lastUpdateTime = 0L;
- // storage group name -> file level -> compacted file num
- private Map<String, Map<Integer, Integer>> compactionBeginFileCountMap = new HashMap<>();
- private Map<String, Integer> compactionBeginCountForEachSg = new HashMap<>();
- private Map<String, Map<Integer, Integer>> compactionFinishFileCountMap = new HashMap<>();
- private Map<String, Integer> compactionFinishCountForEachSg = new HashMap<>();
- // it records the total cpu time for all threads
- private long lastTotalCpuTime = 0L;
- private Map<Long, Long> cpuTimeForCompactionThread = new HashMap<>();
- // threadId -> total cpu time
- private Set<Long> compactionThreadIdSet = new HashSet<>();
- private Set<Long> mergeThreadIdSet = new HashSet<>();
- // threadId -> total cpu time
- private Map<Long, Long> cpuTimeForMergeThread = new HashMap<>();
- private Map<String, Pair<Integer, Integer>> mergeStartFileNumForEachSg = new HashMap<>();
- private Map<String, Integer> mergeStartCountForEachSg = new HashMap<>();
- private Map<String, Pair<Integer, Integer>> mergeFinishFileNumForEachSg = new HashMap<>();
- private Map<String, Integer> mergeFinishCountForEachSg = new HashMap<>();
- private PlanExecutor planExecutor;
-
- private CompactionMonitor() {
- try {
- this.planExecutor = new PlanExecutor();
- } catch (QueryProcessException e) {
- LOGGER.error("Failed to initialize CompactionMonitor", e);
- }
- }
-
- public static CompactionMonitor getInstance() {
- return INSTANCE;
- }
-
- public synchronized void start() {
- if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMonitor()) {
- init();
- threadPool.scheduleWithFixedDelay(
- this::saveMonitorStatusPeriodically,
- 10_000L,
- IoTDBDescriptor.getInstance().getConfig().getCompactionMonitorPeriod(),
- TimeUnit.MILLISECONDS);
- LOGGER.info(
- "Start to monitor compaction, period is {} ms",
- IoTDBDescriptor.getInstance().getConfig().getCompactionMonitorPeriod());
- }
- }
-
- public synchronized void stop() {
- if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMonitor()) {
- threadPool.shutdownNow();
- try {
- threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
-
- }
- }
- }
-
- @Override
- public void waitAndStop(long milliseconds) {
- IService.super.waitAndStop(milliseconds);
- }
-
- @Override
- public void shutdown(long milliseconds) throws ShutdownException {
- IService.super.shutdown(milliseconds);
- }
-
- @Override
- public ServiceType getID() {
- return ServiceType.COMPACTION_MONITOR_SERVICE;
- }
-
- private void init() {
- // calculate the cpu consumption when initializing, but do not record them
- // it is for ensuring the correctness of the first record point collected
- calculateCpuConsumptionForCompactionAndMergeThreads();
- }
-
- /** Register compaction thread id to id set */
- public synchronized void registerCompactionThread(long threadId) {
- compactionThreadIdSet.add(threadId);
- }
-
- /** Register merge thread id to id set */
- public synchronized void registerMergeThread(long threadId) {
- mergeThreadIdSet.add(threadId);
- }
-
- /**
- * This function should be executed periodically. The interval of calling it is the monitor
- * period. This function will save the monitor data to IoTDB.
- */
- public synchronized void saveMonitorStatusPeriodically() {
- lastUpdateTime = System.currentTimeMillis();
- Map<Long, Double> cpuConsumptionForCompactionThread =
- calculateCpuConsumptionForCompactionAndMergeThreads();
- saveCpuConsumption(cpuConsumptionForCompactionThread);
- saveCompactionInfo(compactionBeginCountForEachSg, compactionBeginFileCountMap, true);
- saveCompactionInfo(compactionFinishCountForEachSg, compactionFinishFileCountMap, false);
- saveMergeInfo(mergeStartCountForEachSg, mergeStartFileNumForEachSg, true);
- saveMergeInfo(mergeFinishCountForEachSg, mergeFinishFileNumForEachSg, false);
- long costTime = System.currentTimeMillis() - lastUpdateTime;
- LOGGER.info("The CompactionMonitor took {} ms to record the data", costTime);
- }
-
- /**
- * Report the beginning or ending of a compaction task. The CompactionMonitor records the
- * beginning and ending of task separately, and records task in different level separately.
- */
- public synchronized void reportCompactionStatus(
- String storageGroupName, int compactionLevel, int fileNum, boolean begin) {
- if (!compactionThreadIdSet.contains(Thread.currentThread().getId())) {
- registerCompactionThread(Thread.currentThread().getId());
- }
- Map<String, Map<Integer, Integer>> compactionFileCountMap =
- begin ? compactionBeginFileCountMap : compactionFinishFileCountMap;
- Map<String, Integer> compactionCountMap =
- begin ? compactionBeginCountForEachSg : compactionFinishCountForEachSg;
- Map<Integer, Integer> levelFileCountMap =
- compactionFileCountMap.computeIfAbsent(storageGroupName, x -> new HashMap<>());
- int newCompactedFileCount = levelFileCountMap.getOrDefault(compactionLevel, 0) + fileNum;
- levelFileCountMap.put(compactionLevel, newCompactedFileCount);
- int newCompactionCount = compactionCountMap.getOrDefault(storageGroupName, 0) + 1;
- compactionCountMap.put(storageGroupName, newCompactionCount);
- }
-
- /**
- * Report the beginning or ending of a merge task. The CompactionMonitor records the beginning and
- * ending of task separately.
- */
- public synchronized void reportMergeStatus(
- String storageGroupName, int seqFileNum, int unseqFileNum, boolean start) {
- if (!mergeThreadIdSet.contains(Thread.currentThread().getId())) {
- registerMergeThread(Thread.currentThread().getId());
- }
- // records the beginning and ending of task separately.
- Map<String, Integer> mergeCountForSg =
- start ? mergeStartCountForEachSg : mergeFinishCountForEachSg;
- Map<String, Pair<Integer, Integer>> mergeFileNumForSg =
- start ? mergeStartFileNumForEachSg : mergeFinishFileNumForEachSg;
- mergeCountForSg.put(storageGroupName, mergeCountForSg.getOrDefault(storageGroupName, 0) + 1);
- Pair<Integer, Integer> mergeFileNumForCurrSg =
- mergeFileNumForSg.getOrDefault(storageGroupName, new Pair<>(0, 0));
- mergeFileNumForSg.put(
- storageGroupName,
- new Pair<>(
- mergeFileNumForCurrSg.left + seqFileNum, mergeFileNumForCurrSg.right + unseqFileNum));
- }
-
- /**
- * Calculate the percentage of cpu consumption during last period for each compaction and merge
- * thread. Meanwhile, call of this function will also update the total cpu time for all threads
- * and total cpu time for each compaction and merge thread.
- *
- * @return A map from threadId to percentage of cpu consumption for each compaction and merge
- * thread in last monitor period.
- */
- public synchronized Map<Long, Double> calculateCpuConsumptionForCompactionAndMergeThreads() {
- ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
- Map<Long, Long> cpuTimeForCompactionThreadInThisPeriod = new HashMap<>();
- Map<Long, Long> cpuTimeForMergeThreadInThisPeriod = new HashMap<>();
- long[] allThreadIds = threadMXBean.getAllThreadIds();
- long totalCpuTime = 0L;
-
- // calculate the cpu time for all threads
- // and store the total cpu time for each thread
- for (long threadId : allThreadIds) {
- long cpuTimeForCurrThread = threadMXBean.getThreadCpuTime(threadId);
- if (cpuTimeForCurrThread > 0) {
- totalCpuTime += cpuTimeForCurrThread;
- } else {
- cpuTimeForCurrThread = 0;
- }
- // if the thread is a compaction or merge thread, updates its total cpu time
- if (compactionThreadIdSet.contains(threadId)) {
- cpuTimeForCompactionThreadInThisPeriod.put(threadId, cpuTimeForCurrThread);
- }
- if (mergeThreadIdSet.contains(threadId)) {
- cpuTimeForMergeThreadInThisPeriod.put(threadId, cpuTimeForCurrThread);
- }
- }
-
- // the total cpu time in this monitor period
- long cpuTimeInThisPeriod = totalCpuTime - lastTotalCpuTime;
-
- if (cpuTimeInThisPeriod < 0) {
- LOGGER.error(
- "[CompactionMonitor] cpuTimeInThisPeriod is less than 0, total cpu time is {}, prev total cpu time is {}",
- totalCpuTime,
- lastTotalCpuTime);
- return new HashMap<>();
- }
-
- LOGGER.info(
- "[CompactionMonitor] Total CPU time is {} ns, cpu time in last period is {} ns",
- totalCpuTime,
- lastTotalCpuTime);
- lastTotalCpuTime = totalCpuTime;
-
- double compactionThreadsTotalCpuConsumption = 0.0;
- // we use this map to store the cpu consumption percentage of each compaction or merge thread
- // thread id -> percentage of cpu consumption in this period
- Map<Long, Double> cpuConsumptionForCompactionAndMergeThread = new HashMap<>();
- // calculate the cpu consumption of each compaction thread in this period
- // and update the total cpu time for each compaction thread
- for (long threadId : compactionThreadIdSet) {
- // percentage of cpu consumption =
- // cpu time in this period of curr thread / cpu time in this period of all threads
- double cpuConsumptionForCurrentThread =
- (double)
- (cpuTimeForCompactionThreadInThisPeriod.get(threadId)
- - cpuTimeForCompactionThread.getOrDefault(threadId, 0L))
- / (double) (cpuTimeInThisPeriod);
- LOGGER.info(
- "[CompactionMonitor] Cpu consumption for thread {} is {}%",
- threadId, cpuConsumptionForCurrentThread * 100);
- cpuConsumptionForCompactionAndMergeThread.put(threadId, cpuConsumptionForCurrentThread);
- compactionThreadsTotalCpuConsumption += cpuConsumptionForCurrentThread;
- // update the total cpu time for each compaction thread
- cpuTimeForCompactionThread.put(
- threadId, cpuTimeForCompactionThreadInThisPeriod.get(threadId));
- }
- LOGGER.info(
- "[CompactionMonitor] cpu for compaction threads in last period is {}%",
- compactionThreadsTotalCpuConsumption * 100);
-
- // record the cpu consumption percentage for all compaction threads
- cpuConsumptionForCompactionAndMergeThread.put(
- COMPACTION_CPU_CONSUMPTION_TOTAL_MAP_KEY, compactionThreadsTotalCpuConsumption);
-
- if (compactionThreadsTotalCpuConsumption > 1.0) {
- // abnormal data, abort it
- cpuConsumptionForCompactionAndMergeThread.clear();
- }
-
- // calculate the cpu consumption for merge threads as above
- double mergeThreadsTotalCpuConsumption = 0.0;
-
- for (long threadId : mergeThreadIdSet) {
- double cpuConsumptionForCurrentThread =
- (double)
- (cpuTimeForMergeThreadInThisPeriod.get(threadId)
- - cpuTimeForMergeThread.getOrDefault(threadId, 0L))
- / (double) (cpuTimeInThisPeriod);
- cpuConsumptionForCompactionAndMergeThread.put(threadId, compactionThreadsTotalCpuConsumption);
- mergeThreadsTotalCpuConsumption += cpuConsumptionForCurrentThread;
- // update the total cpu time for each merge thread
- cpuTimeForMergeThread.put(threadId, cpuTimeForMergeThreadInThisPeriod.get(threadId));
- }
-
- cpuConsumptionForCompactionAndMergeThread.put(
- MERGE_CPU_CONSUMPTION_TOTAL_MAP_KEY, mergeThreadsTotalCpuConsumption);
-
- if (mergeThreadsTotalCpuConsumption > 1.0) {
- // abnormal data, abort it
- for (long threadId : mergeThreadIdSet) {
- cpuConsumptionForCompactionAndMergeThread.remove(threadId);
- }
- cpuConsumptionForCompactionAndMergeThread.remove(MERGE_CPU_CONSUMPTION_TOTAL_MAP_KEY);
- }
-
- return cpuConsumptionForCompactionAndMergeThread;
- }
-
- /** save the cpu consumption info to local storage group `root.compaction_monitor` */
- private void saveCpuConsumption(Map<Long, Double> consumptionMap) {
- ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
- try {
- // save the cpu consumption of compaction threads
- if (compactionThreadIdSet.size() > 0) {
- PartialPath compactionPath = new PartialPath(COMPACTION_CPU_CONSUMPTION_DEVICE);
- List<String> compactionCpuConsumptionMeasurements = new ArrayList<>();
- List<String> compactionCpuConsumptionValues = new ArrayList<>();
-
- for (long threadId : compactionThreadIdSet) {
- String threadName = mxBean.getThreadInfo(threadId).getThreadName();
- String[] splittedThreadName = threadName.split("-");
- int length = splittedThreadName.length;
- // measurement name is like Compaction-1, Compaction-2, etc.
- String measurementName =
- splittedThreadName[length - 2] + "-" + splittedThreadName[length - 1];
- compactionCpuConsumptionMeasurements.add(measurementName);
- compactionCpuConsumptionValues.add(
- Double.toString(consumptionMap.getOrDefault(threadId, 0.0)));
- }
-
- // write the total cpu consumption for all compaction thread
- compactionCpuConsumptionMeasurements.add(COMPACTION_CPU_CONSUMPTION_SUM_MEASUREMENT);
- compactionCpuConsumptionValues.add(
- Double.toString(
- consumptionMap.getOrDefault(COMPACTION_CPU_CONSUMPTION_TOTAL_MAP_KEY, 0.0)));
- InsertRowPlan insertPlanForCompaction =
- new InsertRowPlan(
- compactionPath,
- lastUpdateTime,
- compactionCpuConsumptionMeasurements.toArray(new String[0]),
- compactionCpuConsumptionValues.toArray(new String[0]));
- planExecutor.processNonQuery(insertPlanForCompaction);
- }
-
- // save the cpu consumption of merge threads as above
- if (mergeThreadIdSet.size() > 0) {
- PartialPath mergePath = new PartialPath(MERGE_CPU_CONSUMPTION_DEVICE);
- List<String> mergeCpuConsumptionMeasurements = new ArrayList<>();
- List<String> mergeCpuConsumptionValues = new ArrayList<>();
- for (long threadId : mergeThreadIdSet) {
- String threadName = mxBean.getThreadInfo(threadId).getThreadName();
- String[] splittedThreadName = threadName.split("-");
- int length = splittedThreadName.length;
- String measurementName =
- splittedThreadName[length - 2] + "-" + splittedThreadName[length - 1];
- mergeCpuConsumptionMeasurements.add(measurementName);
- mergeCpuConsumptionValues.add(
- Double.toString(consumptionMap.getOrDefault(threadId, 0.0)));
- }
- mergeCpuConsumptionMeasurements.add(MERGE_CPU_CONSUMPTION_SUM_MEASUREMENT);
- mergeCpuConsumptionValues.add(
- Double.toString(consumptionMap.getOrDefault(MERGE_CPU_CONSUMPTION_TOTAL_MAP_KEY, 0.0)));
-
- InsertRowPlan insertPlanForMerge =
- new InsertRowPlan(
- mergePath,
- lastUpdateTime,
- mergeCpuConsumptionMeasurements.toArray(new String[0]),
- mergeCpuConsumptionValues.toArray(new String[0]));
- planExecutor.processNonQuery(insertPlanForMerge);
- }
- } catch (Throwable e) {
- LOGGER.error("[CompactionMonitor] Exception occurs while saving cpu consumption", e);
- }
- }
-
- /** Save the compaction task num info and num of files participated in compaction */
- private void saveCompactionInfo(
- Map<String, Integer> compactionCountForSg,
- Map<String, Map<Integer, Integer>> compactionFileCountMap,
- boolean begin) {
- try {
- // save the task num info
- String[] measurementName = {"task_num"};
- for (String sgName : compactionCountForSg.keySet()) {
- PartialPath deviceName =
- new PartialPath(
- String.format(
- begin ? COMPACTION_BEGIN_TASK_NUM_DEVICE : COMPACTION_FINISH_TASK_NUM_DEVICE,
- sgName.replaceAll("root", "")));
- InsertRowPlan insertRowPlanForCompactionCount =
- new InsertRowPlan(
- deviceName,
- lastUpdateTime,
- measurementName,
- new String[] {Integer.toString(compactionCountForSg.get(sgName))});
- planExecutor.processNonQuery(insertRowPlanForCompactionCount);
- compactionCountForSg.put(sgName, 0);
- }
-
- // save the information of number of files participated in compaction
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- int maxCompactionCount = Math.max(config.getSeqLevelNum(), config.getUnseqLevelNum());
- for (String sgName : compactionFileCountMap.keySet()) {
- PartialPath device =
- new PartialPath(
- String.format(
- begin ? COMPACTION_BEGIN_FILE_NUM_DEVICE : COMPACTION_FINISH_FILE_NUM_DEVICE,
- sgName.replaceAll("root", "")));
- Map<Integer, Integer> countMap = compactionFileCountMap.get(sgName);
- // the measurement is like sg_name.compaction.files.level-x
- String measurementPattern = "level-%d";
- List<String> measurements = new ArrayList<>();
- List<String> values = new ArrayList<>();
- for (int i = 0; i < maxCompactionCount; ++i) {
- measurements.add(String.format(measurementPattern, i));
- values.add(Integer.toString(countMap.getOrDefault(i, 0)));
- countMap.put(i, 0);
- }
- InsertRowPlan plan =
- new InsertRowPlan(
- device,
- lastUpdateTime,
- measurements.toArray(new String[0]),
- values.toArray(new String[0]));
- planExecutor.processNonQuery(plan);
- }
-
- } catch (Throwable e) {
- LOGGER.error("[CompactionMonitor] Exception occurs while saving compaction info", e);
- }
- }
-
- /** save merge task num info and info of number of files participated in merge */
- private void saveMergeInfo(
- Map<String, Integer> mergeCountForEachSg,
- Map<String, Pair<Integer, Integer>> mergeFileNumForEachSg,
- boolean begin) {
- try {
- // save task num info
- String[] measurementName = {"task_num"};
- for (String sgName : mergeCountForEachSg.keySet()) {
- PartialPath device =
- new PartialPath(
- String.format(
- begin ? MERGE_BEGIN_TASK_NUM_DEVICE : MERGE_FINISH_TASK_NUM_DEVICE,
- sgName.replaceAll("root", "")));
- InsertRowPlan insertRowPlan =
- new InsertRowPlan(
- device,
- lastUpdateTime,
- measurementName,
- new String[] {Integer.toString(mergeCountForEachSg.get(sgName))});
- planExecutor.processNonQuery(insertRowPlan);
- mergeCountForEachSg.put(sgName, 0);
- }
-
- // save info of files participated in merge
- for (String sgName : mergeFileNumForEachSg.keySet()) {
- PartialPath device =
- new PartialPath(
- String.format(
- begin ? MERGE_BEGIN_FILE_NUM_DEVICE : MERGE_FINISH_FILE_NUM_DEVICE,
- sgName.replaceAll("root", "")));
- List<String> measurements = new ArrayList<>();
- List<String> values = new ArrayList<>();
- // the measurement is like root.compaction_monitor.sg_name.merge.files.(un)seq
- measurements.add("seq");
- measurements.add("unseq");
- Pair<Integer, Integer> fileNum = mergeFileNumForEachSg.get(sgName);
- values.add(Integer.toString(fileNum.left));
- values.add(Integer.toString(fileNum.right));
- InsertRowPlan plan =
- new InsertRowPlan(
- device,
- lastUpdateTime,
- measurements.toArray(new String[0]),
- values.toArray(new String[0]));
- planExecutor.processNonQuery(plan);
- mergeFileNumForEachSg.put(sgName, new Pair<>(0, 0));
- }
- } catch (Throwable e) {
- LOGGER.error("[CompactionMonitor] Exception occurs while saving merge info", e);
- }
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index cc59220..c8d13aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.engine.merge.task;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.monitor.CompactionMonitor;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
@@ -143,14 +141,6 @@ public class MergeTask implements Callable<Void> {
resource.getSeqFiles(),
resource.getUnseqFiles());
}
- if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMonitor()) {
- CompactionMonitor.getInstance()
- .reportMergeStatus(
- storageGroupName,
- resource.getSeqFiles().size(),
- resource.getUnseqFiles().size(),
- true);
- }
long startTime = System.currentTimeMillis();
long totalFileSize =
MergeUtils.collectFileSizes(resource.getSeqFiles(), resource.getUnseqFiles());
@@ -204,14 +194,6 @@ public class MergeTask implements Callable<Void> {
states = States.CLEAN_UP;
fileTask = null;
- if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMonitor()) {
- CompactionMonitor.getInstance()
- .reportMergeStatus(
- storageGroupName,
- resource.getSeqFiles().size(),
- resource.getUnseqFiles().size(),
- false);
- }
cleanUp(true);
if (logger.isInfoEnabled()) {
double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 4e9d9c8..a35f70b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
-import org.apache.iotdb.db.engine.compaction.monitor.CompactionMonitor;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.exception.StartupException;
@@ -127,10 +126,6 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(RPCService.getInstance());
}
- if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMonitor()) {
- registerManager.register(CompactionMonitor.getInstance());
- }
-
if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
registerManager.register(MetricsService.getInstance());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 0bf9796f..b474b2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -38,7 +38,6 @@ public enum ServiceType {
UPGRADE_SERVICE("UPGRADE DataService", ""),
MERGE_SERVICE("Merge Manager", "Merge Manager"),
COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"),
- COMPACTION_MONITOR_SERVICE("Compaction Monintor", "Compaction Monitor"),
PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"),
TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", ""),