You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/02/09 06:26:40 UTC

[GitHub] [hadoop] jojochuang commented on a change in pull request #3960: HDFS-16446. Consider ioutils of disk when choosing volume

jojochuang commented on a change in pull request #3960:
URL: https://github.com/apache/hadoop/pull/3960#discussion_r802294570



##########
File path: hadoop-hdfs-project/hadoop-hdfs/pom.xml
##########
@@ -225,6 +225,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>lz4-java</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>

Review comment:
       Please update LICENSE-binary to include this new dependency.
   Also, please manage the dependency version in hadoop-project/pom.xml

##########
File path: hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
##########
@@ -36,17 +43,91 @@
 #include <sys/resource.h>
 #include <sys/stat.h>
 #include <sys/syscall.h>
-#ifdef HADOOP_PMDK_LIBRARY

Review comment:
       Most of the changes in this file is redundant. We want to keep the existing pmem feature (HDFS-13762).

##########
File path: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskIOUtilManager.java
##########
@@ -0,0 +1,253 @@
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.Shell;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY;
+
+public class DiskIOUtilManager implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskIOUtilManager.class);
+  private final DataNode dataNode;
+  private volatile long intervalMs;
+  private volatile int ioUtil;
+  private volatile boolean shouldStop = false;
+  private String diskName;
+  private Thread diskStatThread;
+  private Map<StorageLocation, DiskLocation> locationToDisks = new HashMap<>();
+  private Map<DiskLocation, IOStat> ioStats = new HashMap<>();
+
+  private class DiskLocation {
+    private final String diskName;
+    private final StorageLocation location;
+    DiskLocation(StorageLocation location) throws IOException {
+      this.location = location;
+      FileStore fs = Files.getFileStore(Paths.get(location.getUri().getPath()));
+      String diskNamePlace = null;
+      if (NativeCodeLoader.isNativeCodeLoaded() && Shell.LINUX) {
+        try {
+          diskNamePlace = NativeIO.getDiskName(location.getUri().getPath());
+          LOG.info("location is  {}, diskname is {}", location.getUri().getPath(), diskNamePlace);
+        } catch (IOException e) {
+          LOG.error(e.toString());
+          diskNamePlace = Paths.get(fs.name()).getFileName().toString();
+        } finally {
+          this.diskName = diskNamePlace;
+        }
+      } else {
+        this.diskName = Paths.get(fs.name()).getFileName().toString();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return location.toString() + " disk: " + diskName;
+    }
+    @Override
+    public int hashCode() {
+      return location.hashCode();
+    }
+  }
+
+//  private String getDiskName(String path) {
+//    SystemInfo systemInfo = new SystemInfo();
+//    systemInfo.getOperatingSystem().getFileSystem().getFileStores();
+//    return "";
+//  }
+
+  private class IOStat {
+    private String diskName;
+    private long lastTotalTicks;
+    private int util;
+    public IOStat(String diskName, long lastTotalTicks) {
+      this.diskName = diskName;
+      this.lastTotalTicks = lastTotalTicks;
+    }
+
+    public int getUtil() {
+      return util;
+    }
+
+    public void setUtil(int util) {
+      if (util <= 100 && util >= 0) {
+        this.util = util;
+      } else if (util < 0) {
+        this.util = 0;
+      } else {
+        this.util = 100;
+      }
+    }
+
+    public long getLastTotalTicks() {
+      return lastTotalTicks;
+    }
+
+    public void setLastTotalTicks(long lastTotalTicks) {
+      this.lastTotalTicks = lastTotalTicks;
+    }
+  }
+
+  DiskIOUtilManager(DataNode dataNode, Configuration conf) {
+    this.dataNode = dataNode;
+    this.intervalMs = TimeUnit.SECONDS.toMillis(conf.getLong(
+        DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY,
+        DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT));
+    if (this.intervalMs < DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT) {
+      this.intervalMs = 1;
+    }
+  }
+
+  @Override
+  public void run() {
+    FsVolumeImpl.LOG.info(this + " starting disk util stat");
+    while (true) {
+      if (shouldStop) {
+        FsVolumeImpl.LOG.info(this + " stopping disk util stat");
+        break;
+      }
+      if (!Shell.LINUX) {
+        FsVolumeImpl.LOG.debug("Not support disk util stat on this os release");
+        continue;
+      }
+      Map<String, IOStat> allIOStats = getDiskIoUtils();
+      synchronized (this) {
+        for (Map.Entry<DiskLocation, IOStat> entry : ioStats.entrySet()) {
+          String diskName = entry.getKey().diskName;
+          IOStat oldStat = entry.getValue();
+          int util = 0;
+          if (allIOStats.containsKey(diskName)) {
+            long oldTotalTicks = oldStat.getLastTotalTicks();
+            long newTotalTicks = allIOStats.get(diskName).getLastTotalTicks();
+            if (oldTotalTicks != 0) {
+              util = (int) ((double) (newTotalTicks - oldTotalTicks) * 100 / intervalMs);
+            }
+            oldStat.setLastTotalTicks(newTotalTicks);
+            oldStat.setUtil(util);
+            LOG.debug(diskName + " disk io util:" + util);
+          } else {
+            //Maybe this disk has been umounted.
+            oldStat.setUtil(100);
+          }
+        }
+      }
+      try {
+        Thread.sleep(intervalMs);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  void start() {
+    if (diskStatThread != null) {
+      return;
+    }
+    shouldStop = false;
+    diskStatThread = new Thread(this, threadName());
+    diskStatThread.setDaemon(true);
+    diskStatThread.start();
+  }
+
+  void stop() {
+    shouldStop = true;
+    if (diskStatThread != null) {
+      diskStatThread.interrupt();
+      try {
+        diskStatThread.join();
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  private String threadName() {
+    return "DataNode disk io util manager";
+  }
+
+  private static final String PROC_DISKSSTATS = "/proc/diskstats";
+  private static final Pattern DISK_STAT_FORMAT =
+      Pattern.compile("[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*(\\S*)" +
+          "[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*[0-9]*" +
+          "[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*[0-9]*" +
+          "[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*[0-9]*" +
+          "[ \t]*([0-9]*)[ \t].*");
+
+  private Map<String, IOStat> getDiskIoUtils() {
+    Map<String, IOStat> rets = new HashMap<>();
+    InputStreamReader fReader = null;
+    BufferedReader in = null;
+    try {
+      fReader = new InputStreamReader(
+          new FileInputStream(PROC_DISKSSTATS), Charset.forName("UTF-8"));
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      // shouldn't happen....
+      return rets;
+    }
+    try {
+      Matcher mat = null;
+      String str = in.readLine();
+      while (str != null) {
+        mat = DISK_STAT_FORMAT.matcher(str);
+        if (mat.find()) {
+          String diskName = mat.group(1);
+          long totalTicks = Long.parseLong(mat.group(2));
+          LOG.debug(str + " totalTicks:" + totalTicks);
+          IOStat stat = new IOStat(diskName, totalTicks);
+          rets.put(diskName, stat);
+        }
+        str = in.readLine();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();

Review comment:
       Please print the message using slf4j. Otherwise it goes to stdout.

##########
File path: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskIOUtilManager.java
##########
@@ -0,0 +1,253 @@
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.Shell;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY;
+
+public class DiskIOUtilManager implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskIOUtilManager.class);
+  private final DataNode dataNode;
+  private volatile long intervalMs;
+  private volatile int ioUtil;
+  private volatile boolean shouldStop = false;
+  private String diskName;
+  private Thread diskStatThread;
+  private Map<StorageLocation, DiskLocation> locationToDisks = new HashMap<>();
+  private Map<DiskLocation, IOStat> ioStats = new HashMap<>();
+
+  private class DiskLocation {
+    private final String diskName;
+    private final StorageLocation location;
+    DiskLocation(StorageLocation location) throws IOException {
+      this.location = location;
+      FileStore fs = Files.getFileStore(Paths.get(location.getUri().getPath()));
+      String diskNamePlace = null;
+      if (NativeCodeLoader.isNativeCodeLoaded() && Shell.LINUX) {
+        try {
+          diskNamePlace = NativeIO.getDiskName(location.getUri().getPath());
+          LOG.info("location is  {}, diskname is {}", location.getUri().getPath(), diskNamePlace);
+        } catch (IOException e) {
+          LOG.error(e.toString());
+          diskNamePlace = Paths.get(fs.name()).getFileName().toString();
+        } finally {
+          this.diskName = diskNamePlace;
+        }
+      } else {
+        this.diskName = Paths.get(fs.name()).getFileName().toString();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return location.toString() + " disk: " + diskName;
+    }
+    @Override
+    public int hashCode() {
+      return location.hashCode();
+    }
+  }
+
+//  private String getDiskName(String path) {
+//    SystemInfo systemInfo = new SystemInfo();
+//    systemInfo.getOperatingSystem().getFileSystem().getFileStores();
+//    return "";
+//  }
+
+  private class IOStat {
+    private String diskName;
+    private long lastTotalTicks;
+    private int util;
+    public IOStat(String diskName, long lastTotalTicks) {
+      this.diskName = diskName;
+      this.lastTotalTicks = lastTotalTicks;
+    }
+
+    public int getUtil() {
+      return util;
+    }
+
+    public void setUtil(int util) {
+      if (util <= 100 && util >= 0) {
+        this.util = util;
+      } else if (util < 0) {
+        this.util = 0;
+      } else {
+        this.util = 100;
+      }
+    }
+
+    public long getLastTotalTicks() {
+      return lastTotalTicks;
+    }
+
+    public void setLastTotalTicks(long lastTotalTicks) {
+      this.lastTotalTicks = lastTotalTicks;
+    }
+  }
+
+  DiskIOUtilManager(DataNode dataNode, Configuration conf) {
+    this.dataNode = dataNode;
+    this.intervalMs = TimeUnit.SECONDS.toMillis(conf.getLong(
+        DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY,
+        DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT));
+    if (this.intervalMs < DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT) {
+      this.intervalMs = 1;
+    }
+  }
+
+  @Override
+  public void run() {
+    FsVolumeImpl.LOG.info(this + " starting disk util stat");
+    while (true) {
+      if (shouldStop) {
+        FsVolumeImpl.LOG.info(this + " stopping disk util stat");
+        break;
+      }
+      if (!Shell.LINUX) {
+        FsVolumeImpl.LOG.debug("Not support disk util stat on this os release");
+        continue;
+      }
+      Map<String, IOStat> allIOStats = getDiskIoUtils();
+      synchronized (this) {
+        for (Map.Entry<DiskLocation, IOStat> entry : ioStats.entrySet()) {
+          String diskName = entry.getKey().diskName;
+          IOStat oldStat = entry.getValue();
+          int util = 0;
+          if (allIOStats.containsKey(diskName)) {
+            long oldTotalTicks = oldStat.getLastTotalTicks();
+            long newTotalTicks = allIOStats.get(diskName).getLastTotalTicks();
+            if (oldTotalTicks != 0) {
+              util = (int) ((double) (newTotalTicks - oldTotalTicks) * 100 / intervalMs);
+            }
+            oldStat.setLastTotalTicks(newTotalTicks);
+            oldStat.setUtil(util);
+            LOG.debug(diskName + " disk io util:" + util);
+          } else {
+            //Maybe this disk has been umounted.
+            oldStat.setUtil(100);
+          }
+        }
+      }
+      try {
+        Thread.sleep(intervalMs);
+      } catch (InterruptedException e) {
+      }
+    }

Review comment:
       Can you add a try..catch block to wrap this method, and log an ERROR message if any Throwable is thrown from this thread? That'll make troubleshooting easier. Otherwise, this thread could exit without any messages.

##########
File path: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
##########
@@ -2001,5 +2001,13 @@
   public static final long DFS_LEASE_HARDLIMIT_DEFAULT =
       HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT;
 
+  public static final String DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY =
+      "dfs.datanode.disk.stat.interval.seconds";
+  public static final long DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT = 1L;
 
+  public static final String
+      DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_KEY =
+      "dfs.datanode.available-space-volume-choosing-policy.io.util.preference.enable";
+  public static final boolean
+      DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_DEFAULT = true;

Review comment:
       please add these two config keys into hdfs-default.xml

##########
File path: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskIOUtilManager.java
##########
@@ -0,0 +1,253 @@
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.Shell;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY;
+
+public class DiskIOUtilManager implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskIOUtilManager.class);
+  private final DataNode dataNode;
+  private volatile long intervalMs;
+  private volatile int ioUtil;
+  private volatile boolean shouldStop = false;
+  private String diskName;
+  private Thread diskStatThread;
+  private Map<StorageLocation, DiskLocation> locationToDisks = new HashMap<>();
+  private Map<DiskLocation, IOStat> ioStats = new HashMap<>();
+
+  private class DiskLocation {
+    private final String diskName;
+    private final StorageLocation location;
+    DiskLocation(StorageLocation location) throws IOException {
+      this.location = location;
+      FileStore fs = Files.getFileStore(Paths.get(location.getUri().getPath()));
+      String diskNamePlace = null;
+      if (NativeCodeLoader.isNativeCodeLoaded() && Shell.LINUX) {
+        try {
+          diskNamePlace = NativeIO.getDiskName(location.getUri().getPath());
+          LOG.info("location is  {}, diskname is {}", location.getUri().getPath(), diskNamePlace);
+        } catch (IOException e) {
+          LOG.error(e.toString());
+          diskNamePlace = Paths.get(fs.name()).getFileName().toString();
+        } finally {
+          this.diskName = diskNamePlace;
+        }
+      } else {
+        this.diskName = Paths.get(fs.name()).getFileName().toString();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return location.toString() + " disk: " + diskName;
+    }
+    @Override
+    public int hashCode() {
+      return location.hashCode();
+    }
+  }
+
+//  private String getDiskName(String path) {

Review comment:
       Remove it if not used.

##########
File path: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskIOUtilManager.java
##########
@@ -0,0 +1,253 @@
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.Shell;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY;
+
+public class DiskIOUtilManager implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskIOUtilManager.class);
+  private final DataNode dataNode;
+  private volatile long intervalMs;
+  private volatile int ioUtil;
+  private volatile boolean shouldStop = false;
+  private String diskName;
+  private Thread diskStatThread;
+  private Map<StorageLocation, DiskLocation> locationToDisks = new HashMap<>();
+  private Map<DiskLocation, IOStat> ioStats = new HashMap<>();
+
+  private class DiskLocation {
+    private final String diskName;
+    private final StorageLocation location;
+    DiskLocation(StorageLocation location) throws IOException {
+      this.location = location;
+      FileStore fs = Files.getFileStore(Paths.get(location.getUri().getPath()));
+      String diskNamePlace = null;
+      if (NativeCodeLoader.isNativeCodeLoaded() && Shell.LINUX) {
+        try {
+          diskNamePlace = NativeIO.getDiskName(location.getUri().getPath());
+          LOG.info("location is  {}, diskname is {}", location.getUri().getPath(), diskNamePlace);
+        } catch (IOException e) {
+          LOG.error(e.toString());
+          diskNamePlace = Paths.get(fs.name()).getFileName().toString();
+        } finally {
+          this.diskName = diskNamePlace;
+        }
+      } else {
+        this.diskName = Paths.get(fs.name()).getFileName().toString();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return location.toString() + " disk: " + diskName;
+    }
+    @Override
+    public int hashCode() {
+      return location.hashCode();
+    }
+  }
+
+//  private String getDiskName(String path) {
+//    SystemInfo systemInfo = new SystemInfo();
+//    systemInfo.getOperatingSystem().getFileSystem().getFileStores();
+//    return "";
+//  }
+
+  private class IOStat {
+    private String diskName;
+    private long lastTotalTicks;
+    private int util;
+    public IOStat(String diskName, long lastTotalTicks) {
+      this.diskName = diskName;
+      this.lastTotalTicks = lastTotalTicks;
+    }
+
+    public int getUtil() {
+      return util;
+    }
+
+    public void setUtil(int util) {
+      if (util <= 100 && util >= 0) {
+        this.util = util;
+      } else if (util < 0) {
+        this.util = 0;
+      } else {
+        this.util = 100;
+      }
+    }
+
+    public long getLastTotalTicks() {
+      return lastTotalTicks;
+    }
+
+    public void setLastTotalTicks(long lastTotalTicks) {
+      this.lastTotalTicks = lastTotalTicks;
+    }
+  }
+
+  DiskIOUtilManager(DataNode dataNode, Configuration conf) {
+    this.dataNode = dataNode;
+    this.intervalMs = TimeUnit.SECONDS.toMillis(conf.getLong(
+        DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY,
+        DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT));
+    if (this.intervalMs < DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT) {
+      this.intervalMs = 1;
+    }
+  }
+
+  @Override
+  public void run() {
+    FsVolumeImpl.LOG.info(this + " starting disk util stat");
+    while (true) {
+      if (shouldStop) {
+        FsVolumeImpl.LOG.info(this + " stopping disk util stat");
+        break;
+      }
+      if (!Shell.LINUX) {
+        FsVolumeImpl.LOG.debug("Not support disk util stat on this os release");
+        continue;
+      }
+      Map<String, IOStat> allIOStats = getDiskIoUtils();
+      synchronized (this) {
+        for (Map.Entry<DiskLocation, IOStat> entry : ioStats.entrySet()) {
+          String diskName = entry.getKey().diskName;
+          IOStat oldStat = entry.getValue();
+          int util = 0;
+          if (allIOStats.containsKey(diskName)) {
+            long oldTotalTicks = oldStat.getLastTotalTicks();
+            long newTotalTicks = allIOStats.get(diskName).getLastTotalTicks();
+            if (oldTotalTicks != 0) {
+              util = (int) ((double) (newTotalTicks - oldTotalTicks) * 100 / intervalMs);
+            }
+            oldStat.setLastTotalTicks(newTotalTicks);
+            oldStat.setUtil(util);
+            LOG.debug(diskName + " disk io util:" + util);
+          } else {
+            //Maybe this disk has been umounted.
+            oldStat.setUtil(100);
+          }
+        }
+      }
+      try {
+        Thread.sleep(intervalMs);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  void start() {

Review comment:
       Where does this method get called?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org