You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/02/13 09:07:02 UTC

[iotdb] 03/04: finish the part of process io metrics

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-5517
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec6b14936f741c24fb516f843c0869c080d4f29e
Author: liuxuxin <li...@outlook.com>
AuthorDate: Mon Feb 13 17:06:10 2023 +0800

    finish the part of process io metrics
---
 .../iotdb/commons/service/metric/enums/Metric.java |   1 -
 .../iotdb/db/service/metrics/DiskMetrics.java      | 145 +++++++++---------
 .../metrics/io/AbstractDiskMetricsManager.java     |  56 +++++--
 .../metrics/io/LinuxDiskMetricsManager.java        | 166 +++++++++++++++++----
 .../service/metrics/io/MacDiskMetricsManager.java  |  24 +--
 .../metrics/io/WindowsDiskMetricsManager.java      |  36 +++--
 6 files changed, 288 insertions(+), 140 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 18cad095b0..92ea5cb987 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -33,7 +33,6 @@ public enum Metric {
   DISK_IO_SECTOR_NUM,
   PROCESS_IO_SIZE,
   PROCESS_IO_OPS,
-  PROCESS_IO_TIME,
   MEM,
   CACHE,
   CACHE_HIT,
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/DiskMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/DiskMetrics.java
index c09b9b8542..a152c834ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/DiskMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/DiskMetrics.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.service.metrics.io.AbstractDiskMetricsManager;
 import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.config.MetricConfig;
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
@@ -32,6 +34,7 @@ import java.util.Set;
 public class DiskMetrics implements IMetricSet {
   private final AbstractDiskMetricsManager diskMetricsManager =
       AbstractDiskMetricsManager.getDiskMetricsManager();
+  private final MetricConfig metricConfig = MetricConfigDescriptor.getInstance().getMetricConfig();
 
   @Override
   public void bindTo(AbstractMetricService metricService) {
@@ -74,6 +77,24 @@ public class DiskMetrics implements IMetricSet {
           "write",
           Tag.NAME.toString(),
           diskID);
+      metricService.createAutoGauge(
+          Metric.DISK_IO_OPS.toString(),
+          MetricLevel.IMPORTANT,
+          diskMetricsManager,
+          x -> x.getMergedReadOperationForDisk().getOrDefault(diskID, 0L),
+          Tag.TYPE.toString(),
+          "merged_write",
+          Tag.NAME.toString(),
+          diskID);
+      metricService.createAutoGauge(
+          Metric.DISK_IO_OPS.toString(),
+          MetricLevel.IMPORTANT,
+          diskMetricsManager,
+          x -> x.getMergedWriteOperationForDisk().getOrDefault(diskID, 0L),
+          Tag.TYPE.toString(),
+          "merged_read",
+          Tag.NAME.toString(),
+          diskID);
       metricService.createAutoGauge(
           Metric.DISK_IO_TIME.toString(),
           MetricLevel.IMPORTANT,
@@ -131,78 +152,60 @@ public class DiskMetrics implements IMetricSet {
     }
 
     // metrics for datanode and config node
-    metricService.createAutoGauge(
-        Metric.PROCESS_IO_SIZE.toString(),
-        MetricLevel.IMPORTANT,
-        diskMetricsManager,
-        AbstractDiskMetricsManager::getReadDataSizeForDataNode,
-        Tag.NAME.toString(),
-        "datanode",
-        Tag.NAME.toString(),
-        "read");
-    metricService.createAutoGauge(
-        Metric.PROCESS_IO_SIZE.toString(),
-        MetricLevel.IMPORTANT,
-        diskMetricsManager,
-        AbstractDiskMetricsManager::getWriteDataSizeForDataNode,
-        Tag.NAME.toString(),
-        "datanode",
-        Tag.NAME.toString(),
-        "write");
     metricService.createAutoGauge(
         Metric.PROCESS_IO_OPS.toString(),
         MetricLevel.IMPORTANT,
         diskMetricsManager,
-        AbstractDiskMetricsManager::getReadOpsCountForDataNode,
-        Tag.NAME.toString(),
-        "datanode",
+        AbstractDiskMetricsManager::getReadOpsCountForProcess,
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
         "read");
     metricService.createAutoGauge(
         Metric.PROCESS_IO_OPS.toString(),
         MetricLevel.IMPORTANT,
         diskMetricsManager,
-        AbstractDiskMetricsManager::getWriteOpsCountForDataNode,
-        Tag.NAME.toString(),
-        "datanode",
+        AbstractDiskMetricsManager::getWriteOpsCountForProcess,
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
         "write");
     metricService.createAutoGauge(
-        Metric.PROCESS_IO_TIME.toString(),
+        Metric.PROCESS_IO_SIZE.toString(),
         MetricLevel.IMPORTANT,
         diskMetricsManager,
-        AbstractDiskMetricsManager::getReadCostTimeForDataNode,
-        Tag.NAME.toString(),
-        "datanode",
+        AbstractDiskMetricsManager::getActualReadDataSizeForProcess,
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
-        "read");
+        "actual_read");
     metricService.createAutoGauge(
-        Metric.PROCESS_IO_TIME.toString(),
+        Metric.PROCESS_IO_SIZE.toString(),
         MetricLevel.IMPORTANT,
         diskMetricsManager,
-        AbstractDiskMetricsManager::getWriteCostTimeForDataNode,
+        AbstractDiskMetricsManager::getActualWriteDataSizeForProcess,
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
-        "datanode",
-        Tag.NAME.toString(),
-        "write");
+        "actual_write");
     metricService.createAutoGauge(
-        Metric.PROCESS_IO_TIME.toString(),
+        Metric.PROCESS_IO_SIZE.toString(),
         MetricLevel.IMPORTANT,
         diskMetricsManager,
-        AbstractDiskMetricsManager::getAvgReadCostTimeOfEachOpsForDataNode,
-        Tag.NAME.toString(),
-        "datanode",
+        AbstractDiskMetricsManager::getAttemptReadSizeForProcess,
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
-        "avg_read");
+        "attempt_read");
     metricService.createAutoGauge(
-        Metric.PROCESS_IO_TIME.toString(),
+        Metric.PROCESS_IO_SIZE.toString(),
         MetricLevel.IMPORTANT,
         diskMetricsManager,
-        AbstractDiskMetricsManager::getAvgWriteCostTimeOfEachOpsForDataNode,
-        Tag.NAME.toString(),
-        "datanode",
+        AbstractDiskMetricsManager::getAttemptWriteSizeForProcess,
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
-        "avg_write");
+        "attempt_write");
   }
 
   @Override
@@ -286,58 +289,44 @@ public class DiskMetrics implements IMetricSet {
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.PROCESS_IO_SIZE.toString(),
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
-        "datanode",
-        Tag.NAME.toString(),
-        "read");
+        "actual_read");
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.PROCESS_IO_SIZE.toString(),
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
-        "datanode",
-        Tag.NAME.toString(),
-        "write");
+        "actual_write");
     metricService.remove(
         MetricType.AUTO_GAUGE,
-        Metric.PROCESS_IO_OPS.toString(),
-        Tag.NAME.toString(),
-        "datanode",
+        Metric.PROCESS_IO_SIZE.toString(),
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
-        "read");
+        "attempt_read");
     metricService.remove(
         MetricType.AUTO_GAUGE,
-        Metric.PROCESS_IO_OPS.toString(),
-        Tag.NAME.toString(),
-        "datanode",
+        Metric.PROCESS_IO_SIZE.toString(),
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
-        "write");
+        "attempt_write");
     metricService.remove(
         MetricType.AUTO_GAUGE,
-        Metric.PROCESS_IO_TIME.toString(),
-        Tag.NAME.toString(),
-        "datanode",
+        Metric.PROCESS_IO_OPS.toString(),
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
         "read");
     metricService.remove(
         MetricType.AUTO_GAUGE,
-        Metric.PROCESS_IO_TIME.toString(),
-        Tag.NAME.toString(),
-        "datanode",
+        Metric.PROCESS_IO_OPS.toString(),
+        Tag.FROM.toString(),
+        diskMetricsManager.getProcessName(),
         Tag.NAME.toString(),
         "write");
-    metricService.remove(
-        MetricType.AUTO_GAUGE,
-        Metric.PROCESS_IO_TIME.toString(),
-        Tag.NAME.toString(),
-        "datanode",
-        Tag.NAME.toString(),
-        "avg_read");
-    metricService.remove(
-        MetricType.AUTO_GAUGE,
-        Metric.PROCESS_IO_TIME.toString(),
-        Tag.NAME.toString(),
-        "datanode",
-        Tag.NAME.toString(),
-        "avg_write");
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/io/AbstractDiskMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/io/AbstractDiskMetricsManager.java
index d15ca7f881..b8e270e501 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/io/AbstractDiskMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/io/AbstractDiskMetricsManager.java
@@ -19,10 +19,46 @@
 
 package org.apache.iotdb.db.service.metrics.io;
 
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.Map;
 import java.util.Set;
 
 public abstract class AbstractDiskMetricsManager {
+  private final Logger log = LoggerFactory.getLogger(AbstractDiskMetricsManager.class);
+  String processName;
+
+  protected AbstractDiskMetricsManager() {
+    try {
+      Process process = Runtime.getRuntime().exec("jps");
+      String pid = MetricConfigDescriptor.getInstance().getMetricConfig().getPid();
+      // In case of we cannot get the process name,
+      // process name is pid by default
+      processName = pid;
+      try (BufferedReader input =
+          new BufferedReader(new InputStreamReader(process.getInputStream()))) {
+        String line;
+        while ((line = input.readLine()) != null) {
+          if (line.startsWith(pid + " ")) {
+            processName = line.split("\\s")[1];
+            break;
+          }
+        }
+      }
+    } catch (IOException e) {
+      log.warn("Failed to get the process name", e);
+    }
+  }
+
+  public String getProcessName() {
+    return processName;
+  }
 
   public abstract Map<String, Long> getReadDataSizeForDisk();
 
@@ -32,6 +68,10 @@ public abstract class AbstractDiskMetricsManager {
 
   public abstract Map<String, Integer> getWriteOperationCountForDisk();
 
+  public abstract Map<String, Long> getMergedWriteOperationForDisk();
+
+  public abstract Map<String, Long> getMergedReadOperationForDisk();
+
   public abstract Map<String, Long> getReadCostTimeForDisk();
 
   public abstract Map<String, Long> getWriteCostTimeForDisk();
@@ -44,21 +84,17 @@ public abstract class AbstractDiskMetricsManager {
 
   public abstract Map<String, Double> getAvgSectorCountOfEachWriteForDisk();
 
-  public abstract long getReadDataSizeForDataNode();
-
-  public abstract long getWriteDataSizeForDataNode();
-
-  public abstract long getReadOpsCountForDataNode();
+  public abstract long getActualReadDataSizeForProcess();
 
-  public abstract long getWriteOpsCountForDataNode();
+  public abstract long getActualWriteDataSizeForProcess();
 
-  public abstract long getReadCostTimeForDataNode();
+  public abstract long getReadOpsCountForProcess();
 
-  public abstract long getWriteCostTimeForDataNode();
+  public abstract long getWriteOpsCountForProcess();
 
-  public abstract long getAvgReadCostTimeOfEachOpsForDataNode();
+  public abstract long getAttemptReadSizeForProcess();
 
-  public abstract long getAvgWriteCostTimeOfEachOpsForDataNode();
+  public abstract long getAttemptWriteSizeForProcess();
 
   public abstract Set<String> getDiskIDs();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/io/LinuxDiskMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/io/LinuxDiskMetricsManager.java
index f724f801dc..8c919ec968 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/io/LinuxDiskMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/io/LinuxDiskMetricsManager.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.db.service.metrics.io;
 
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -33,8 +38,10 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 public class LinuxDiskMetricsManager extends AbstractDiskMetricsManager {
+  private final Logger log = LoggerFactory.getLogger(AbstractDiskMetricsManager.class);
   private final String DISK_STATS_FILE_PATH = "/proc/diskstats";
   private final String DISK_ID_PATH = "/sys/block";
+  private final String PROCESS_IO_STAT_PATH;
   private final int DISK_ID_OFFSET = 3;
   private final int DISK_READ_COUNT_OFFSET = 4;
   private final int DISK_MERGED_READ_COUNT_OFFSET = 5;
@@ -49,12 +56,14 @@ public class LinuxDiskMetricsManager extends AbstractDiskMetricsManager {
   private Set<String> diskIDSet;
   private long lastUpdateTime = 0L;
   private long updateInterval = 1L;
-  private String[] dataNodeProcessId;
-  private String[] configNodeProcessId;
+
+  // Disk IO status structure
   private final Map<String, Integer> lastReadOperationCountForDisk = new HashMap<>();
   private final Map<String, Integer> lastWriteOperationCountForDisk = new HashMap<>();
   private final Map<String, Long> lastReadTimeCostForDisk = new HashMap<>();
   private final Map<String, Long> lastWriteTimeCostForDisk = new HashMap<>();
+  private final Map<String, Long> lastMergedReadCountForDisk = new HashMap<>();
+  private final Map<String, Long> lastMergedWriteCountForDisk = new HashMap<>();
   private final Map<String, Long> lastReadSectorCountForDisk = new HashMap<>();
   private final Map<String, Long> lastWriteSectorCountForDisk = new HashMap<>();
   private final Map<String, Integer> incrementReadOperationCountForDisk = new HashMap<>();
@@ -63,8 +72,29 @@ public class LinuxDiskMetricsManager extends AbstractDiskMetricsManager {
   private final Map<String, Long> incrementWriteTimeCostForDisk = new HashMap<>();
   private final Map<String, Long> incrementReadSectorCountForDisk = new HashMap<>();
   private final Map<String, Long> incrementWriteSectorCountForDisk = new HashMap<>();
-
-  public LinuxDiskMetricsManager() {}
+  private final Map<String, Long> incrementMergedReadCountForDisk = new HashMap<>();
+  private final Map<String, Long> incrementMergedWriteCountForDisk = new HashMap<>();
+
+  // Process IO status structure
+  private long lastReallyReadSizeForProcess = 0L;
+  private long lastReallyWriteSizeForProcess = 0L;
+  private long lastAttemptReadSizeForProcess = 0L;
+  private long lastAttemptWriteSizeForProcess = 0L;
+  private long lastReadOpsCountForProcess = 0L;
+  private long lastWriteOpsCountForProcess = 0L;
+  private long incrementReallyReadSizeForProcess = 0L;
+  private long incrementReallyWriteSizeForProcess = 0L;
+  private long incrementAttemptReadSizeForProcess = 0L;
+  private long incrementAttemptWriteSizeForProcess = 0L;
+  private long incrementReadOpsCountForProcess = 0L;
+  private long incrementWriteOpsCountForProcess = 0L;
+
+  public LinuxDiskMetricsManager() {
+    super();
+    PROCESS_IO_STAT_PATH =
+        String.format(
+            "/proc/%s/io", MetricConfigDescriptor.getInstance().getMetricConfig().getPid());
+  }
 
   @Override
   public Map<String, Long> getReadDataSizeForDisk() {
@@ -175,42 +205,50 @@ public class LinuxDiskMetricsManager extends AbstractDiskMetricsManager {
   }
 
   @Override
-  public long getReadDataSizeForDataNode() {
-    return 0;
+  public Map<String, Long> getMergedWriteOperationForDisk() {
+    Map<String, Long> incrementMapPerMinute = new HashMap<>();
+    for (Map.Entry<String, Long> entry : incrementMergedWriteCountForDisk.entrySet()) {
+      incrementMapPerMinute.put(entry.getKey(), entry.getValue() / updateInterval * 1000L);
+    }
+    return incrementMapPerMinute;
   }
 
   @Override
-  public long getWriteDataSizeForDataNode() {
-    return 0;
+  public Map<String, Long> getMergedReadOperationForDisk() {
+    Map<String, Long> incrementMapPerMinute = new HashMap<>();
+    for (Map.Entry<String, Long> entry : incrementMergedReadCountForDisk.entrySet()) {
+      incrementMapPerMinute.put(entry.getKey(), entry.getValue() / updateInterval * 1000L);
+    }
+    return incrementMapPerMinute;
   }
 
   @Override
-  public long getReadOpsCountForDataNode() {
-    return 0;
+  public long getActualReadDataSizeForProcess() {
+    return incrementReallyReadSizeForProcess / updateInterval * 1000L / 1024L;
   }
 
   @Override
-  public long getWriteOpsCountForDataNode() {
-    return 0;
+  public long getActualWriteDataSizeForProcess() {
+    return incrementReallyWriteSizeForProcess / updateInterval * 1000L / 1024L;
   }
 
   @Override
-  public long getReadCostTimeForDataNode() {
-    return 0;
+  public long getReadOpsCountForProcess() {
+    return incrementReadOpsCountForProcess / updateInterval * 1000L;
   }
 
   @Override
-  public long getWriteCostTimeForDataNode() {
-    return 0;
+  public long getWriteOpsCountForProcess() {
+    return incrementWriteOpsCountForProcess / updateInterval * 1000L;
   }
 
   @Override
-  public long getAvgReadCostTimeOfEachOpsForDataNode() {
+  public long getAttemptReadSizeForProcess() {
     return 0;
   }
 
   @Override
-  public long getAvgWriteCostTimeOfEachOpsForDataNode() {
+  public long getAttemptWriteSizeForProcess() {
     return 0;
   }
 
@@ -229,14 +267,21 @@ public class LinuxDiskMetricsManager extends AbstractDiskMetricsManager {
     return diskIDSet;
   }
 
-  private void updateDiskInfo() {
+  private void updateInfo() {
     long currentTime = System.currentTimeMillis();
     updateInterval = currentTime - lastUpdateTime;
     lastUpdateTime = currentTime;
+    updateDiskInfo();
+    updateProcessInfo();
+  }
+
+  private void updateDiskInfo() {
     File diskStatsFile = new File(DISK_STATS_FILE_PATH);
     if (!diskStatsFile.exists()) {
+      log.warn("Cannot find disk io status file {}", DISK_STATS_FILE_PATH);
       return;
     }
+
     try (Scanner diskStatsScanner = new Scanner(Files.newInputStream(diskStatsFile.toPath()))) {
       while (diskStatsScanner.hasNextLine()) {
         String[] diskInfo = diskStatsScanner.nextLine().split("\\s+");
@@ -246,16 +291,16 @@ public class LinuxDiskMetricsManager extends AbstractDiskMetricsManager {
         }
         int readOperationCount = Integer.parseInt(diskInfo[DISK_READ_COUNT_OFFSET]);
         int writeOperationCount = Integer.parseInt(diskInfo[DISK_WRITE_COUNT_OFFSET]);
-        int mergedReadOperationCount = Integer.parseInt(diskInfo[DISK_MERGED_READ_COUNT_OFFSET]);
-        int mergedWriteOperationCount = Integer.parseInt(diskInfo[DISK_MERGED_WRITE_COUNT_OFFSET]);
+        long mergedReadOperationCount = Long.parseLong(diskInfo[DISK_MERGED_READ_COUNT_OFFSET]);
+        long mergedWriteOperationCount = Long.parseLong(diskInfo[DISK_MERGED_WRITE_COUNT_OFFSET]);
         long sectorReadCount = Long.parseLong(diskInfo[DISK_SECTOR_READ_COUNT_OFFSET]);
         long sectorWriteCount = Long.parseLong(diskInfo[DISK_SECTOR_WRITE_COUNT_OFFSET]);
         long readTimeCost = Long.parseLong(diskInfo[DISK_READ_TIME_COST_OFFSET]);
         long writeTimeCost = Long.parseLong(diskInfo[DISK_WRITE_TIME_COST_OFFSET]);
-
+        long lastMergedReadCount = lastMergedReadCountForDisk.getOrDefault(diskId, 0L);
+        long lastMergedWriteCount = lastMergedReadCountForDisk.getOrDefault(diskId, 0L);
         int lastReadOperationCount = lastReadOperationCountForDisk.getOrDefault(diskId, 0);
         int lastWriteOperationCount = lastWriteOperationCountForDisk.getOrDefault(diskId, 0);
-        //        int lastMergedReadOperationCount = lastM
         long lastSectorReadCount = lastReadSectorCountForDisk.getOrDefault(diskId, 0L);
         long lastSectorWriteCount = lastWriteSectorCountForDisk.getOrDefault(diskId, 0L);
         long lastReadTime = lastReadTimeCostForDisk.getOrDefault(diskId, 0L);
@@ -299,21 +344,92 @@ public class LinuxDiskMetricsManager extends AbstractDiskMetricsManager {
           incrementWriteTimeCostForDisk.put(diskId, 0L);
         }
 
+        if (lastMergedReadCount != 0) {
+          incrementMergedReadCountForDisk.put(
+              diskId, mergedReadOperationCount - lastMergedReadCount);
+        } else {
+          incrementMergedReadCountForDisk.put(diskId, 0L);
+        }
+
+        if (lastMergedWriteCount != 0) {
+          incrementMergedWriteCountForDisk.put(
+              diskId, mergedWriteOperationCount - lastMergedWriteCount);
+        } else {
+          incrementMergedWriteCountForDisk.put(diskId, 0L);
+        }
+
         lastReadOperationCountForDisk.put(diskId, readOperationCount);
         lastWriteOperationCountForDisk.put(diskId, writeOperationCount);
         lastReadSectorCountForDisk.put(diskId, sectorReadCount);
         lastWriteSectorCountForDisk.put(diskId, sectorWriteCount);
         lastReadTimeCostForDisk.put(diskId, readTimeCost);
         lastWriteTimeCostForDisk.put(diskId, writeTimeCost);
+        lastMergedReadCountForDisk.put(diskId, mergedReadOperationCount);
+        lastMergedWriteCountForDisk.put(diskId, mergedWriteOperationCount);
+      }
+    } catch (IOException e) {
+      log.error("Meets error while updating disk io info", e);
+    }
+  }
+
+  private void updateProcessInfo() {
+    File processStatInfoFile = new File(PROCESS_IO_STAT_PATH);
+    if (!processStatInfoFile.exists()) {
+      log.warn("Cannot find process io status file {}", PROCESS_IO_STAT_PATH);
+    }
+
+    try (Scanner processStatsScanner =
+        new Scanner(Files.newInputStream(processStatInfoFile.toPath()))) {
+      while (processStatsScanner.hasNextLine()) {
+        String infoLine = processStatsScanner.nextLine();
+        if (infoLine.startsWith("syscr")) {
+          long currentReadOpsCount = Long.parseLong(infoLine.split(":\\s")[1]);
+          if (lastReadOpsCountForProcess != 0) {
+            incrementReadOpsCountForProcess = currentReadOpsCount - lastReadOpsCountForProcess;
+          }
+          lastReadOpsCountForProcess = currentReadOpsCount;
+        } else if (infoLine.startsWith("syscw")) {
+          long currentWriteOpsCount = Long.parseLong(infoLine.split(":\\s")[1]);
+          if (lastWriteOpsCountForProcess != 0) {
+            incrementWriteOpsCountForProcess = currentWriteOpsCount - lastWriteOpsCountForProcess;
+          }
+          lastWriteOpsCountForProcess = currentWriteOpsCount;
+        } else if (infoLine.startsWith("read_bytes")) {
+          long currentReadSize = Long.parseLong(infoLine.split(":\\s")[1]);
+          if (lastReallyReadSizeForProcess != 0) {
+            incrementReallyReadSizeForProcess = currentReadSize - lastReallyReadSizeForProcess;
+          }
+          lastReallyReadSizeForProcess = currentReadSize;
+        } else if (infoLine.startsWith("write_bytes")) {
+          long currentWriteSize = Long.parseLong(infoLine.split(":\\s")[1]);
+          if (lastReallyWriteSizeForProcess != 0) {
+            incrementReallyWriteSizeForProcess = currentWriteSize - lastReallyWriteSizeForProcess;
+          }
+          lastReallyWriteSizeForProcess = currentWriteSize;
+        } else if (infoLine.startsWith("rchar")) {
+          long currentAttemptReadSize = Long.parseLong(infoLine.split(":\\s")[1]);
+          if (lastAttemptReadSizeForProcess != 0) {
+            incrementAttemptReadSizeForProcess =
+                currentAttemptReadSize - lastAttemptReadSizeForProcess;
+          }
+          lastAttemptReadSizeForProcess = currentAttemptReadSize;
+        } else if (infoLine.startsWith("wchar")) {
+          long currentAttemptWriteSize = Long.parseLong(infoLine.split(":\\s")[1]);
+          if (lastAttemptWriteSizeForProcess != 0) {
+            incrementAttemptWriteSizeForProcess =
+                currentAttemptWriteSize - lastAttemptWriteSizeForProcess;
+          }
+          lastAttemptWriteSizeForProcess = currentAttemptWriteSize;
+        }
       }
     } catch (IOException e) {
-      throw new RuntimeException(e);
+      log.error("Meets error while updating process io info", e);
     }
   }
 
   private void checkUpdate() {
     if (System.currentTimeMillis() - lastUpdateTime > UPDATE_SMALLEST_INTERVAL) {
-      updateDiskInfo();
+      updateInfo();
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/io/MacDiskMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/io/MacDiskMetricsManager.java
index a1ebb3444a..18c4ea95fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/io/MacDiskMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/io/MacDiskMetricsManager.java
@@ -26,6 +26,10 @@ import java.util.Set;
 /** Disk Metrics Manager for macOS, not implemented yet. */
 public class MacDiskMetricsManager extends AbstractDiskMetricsManager {
 
+  public MacDiskMetricsManager() {
+    super();
+  }
+
   @Override
   public Map<String, Long> getReadDataSizeForDisk() {
     return Collections.emptyMap();
@@ -77,42 +81,42 @@ public class MacDiskMetricsManager extends AbstractDiskMetricsManager {
   }
 
   @Override
-  public long getReadDataSizeForDataNode() {
-    return 0;
+  public Map<String, Long> getMergedWriteOperationForDisk() {
+    return Collections.emptyMap();
   }
 
   @Override
-  public long getWriteDataSizeForDataNode() {
-    return 0;
+  public Map<String, Long> getMergedReadOperationForDisk() {
+    return Collections.emptyMap();
   }
 
   @Override
-  public long getReadOpsCountForDataNode() {
+  public long getActualReadDataSizeForProcess() {
     return 0;
   }
 
   @Override
-  public long getWriteOpsCountForDataNode() {
+  public long getActualWriteDataSizeForProcess() {
     return 0;
   }
 
   @Override
-  public long getReadCostTimeForDataNode() {
+  public long getReadOpsCountForProcess() {
     return 0;
   }
 
   @Override
-  public long getWriteCostTimeForDataNode() {
+  public long getWriteOpsCountForProcess() {
     return 0;
   }
 
   @Override
-  public long getAvgReadCostTimeOfEachOpsForDataNode() {
+  public long getAttemptReadSizeForProcess() {
     return 0;
   }
 
   @Override
-  public long getAvgWriteCostTimeOfEachOpsForDataNode() {
+  public long getAttemptWriteSizeForProcess() {
     return 0;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/io/WindowsDiskMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/io/WindowsDiskMetricsManager.java
index aeb4104094..48c9c2b603 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/io/WindowsDiskMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/io/WindowsDiskMetricsManager.java
@@ -26,6 +26,10 @@ import java.util.Set;
 /** Disk Metrics Manager for Windows system, not implemented yet. */
 public class WindowsDiskMetricsManager extends AbstractDiskMetricsManager {
 
+  public WindowsDiskMetricsManager() {
+    super();
+  }
+
   @Override
   public Map<String, Long> getReadDataSizeForDisk() {
     return Collections.emptyMap();
@@ -47,72 +51,72 @@ public class WindowsDiskMetricsManager extends AbstractDiskMetricsManager {
   }
 
   @Override
-  public Map<String, Long> getReadCostTimeForDisk() {
+  public Map<String, Long> getMergedWriteOperationForDisk() {
     return Collections.emptyMap();
   }
 
   @Override
-  public Map<String, Long> getWriteCostTimeForDisk() {
+  public Map<String, Long> getMergedReadOperationForDisk() {
     return Collections.emptyMap();
   }
 
   @Override
-  public Map<String, Double> getAvgReadCostTimeOfEachOpsForDisk() {
+  public Map<String, Long> getReadCostTimeForDisk() {
     return Collections.emptyMap();
   }
 
   @Override
-  public Map<String, Double> getAvgWriteCostTimeOfEachOpsForDisk() {
+  public Map<String, Long> getWriteCostTimeForDisk() {
     return Collections.emptyMap();
   }
 
   @Override
-  public Map<String, Double> getAvgSectorCountOfEachReadForDisk() {
+  public Map<String, Double> getAvgReadCostTimeOfEachOpsForDisk() {
     return Collections.emptyMap();
   }
 
   @Override
-  public Map<String, Double> getAvgSectorCountOfEachWriteForDisk() {
+  public Map<String, Double> getAvgWriteCostTimeOfEachOpsForDisk() {
     return Collections.emptyMap();
   }
 
   @Override
-  public long getReadDataSizeForDataNode() {
-    return 0;
+  public Map<String, Double> getAvgSectorCountOfEachReadForDisk() {
+    return Collections.emptyMap();
   }
 
   @Override
-  public long getWriteDataSizeForDataNode() {
-    return 0;
+  public Map<String, Double> getAvgSectorCountOfEachWriteForDisk() {
+    return Collections.emptyMap();
   }
 
   @Override
-  public long getReadOpsCountForDataNode() {
+  public long getActualReadDataSizeForProcess() {
     return 0;
   }
 
   @Override
-  public long getWriteOpsCountForDataNode() {
+  public long getActualWriteDataSizeForProcess() {
     return 0;
   }
 
   @Override
-  public long getReadCostTimeForDataNode() {
+  public long getReadOpsCountForProcess() {
     return 0;
   }
 
   @Override
-  public long getWriteCostTimeForDataNode() {
+  public long getWriteOpsCountForProcess() {
     return 0;
   }
 
   @Override
-  public long getAvgReadCostTimeOfEachOpsForDataNode() {
+  public long getAttemptReadSizeForProcess() {
     return 0;
   }
 
   @Override
-  public long getAvgWriteCostTimeOfEachOpsForDataNode() {
+  public long getAttemptWriteSizeForProcess() {
     return 0;
   }