You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/01 09:02:07 UTC

[50/50] [abbrv] hadoop git commit: YARN-2619. Added NodeManager support for disk io isolation through cgroups. Contributed by Varun Vasudev and Wei Yan.

YARN-2619. Added NodeManager support for disk io isolation through cgroups. Contributed by Varun Vasudev and Wei Yan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b3b9e5c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b3b9e5c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b3b9e5c

Branch: refs/heads/HDFS-7240
Commit: 1b3b9e5c31c38388c1ce4208c65e8dd5f956da82
Parents: 98a6176
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Thu Apr 30 21:41:07 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Apr 30 21:41:07 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  54 ++++--
 .../CGroupsBlkioResourceHandlerImpl.java        | 170 +++++++++++++++++++
 .../linux/resources/CGroupsHandler.java         |   4 +-
 .../linux/resources/CGroupsHandlerImpl.java     |  67 ++++----
 .../linux/resources/DiskResourceHandler.java    |  30 ++++
 .../linux/resources/ResourceHandlerModule.java  |  45 ++++-
 .../util/CgroupsLCEResourcesHandler.java        |   6 +
 .../TestCGroupsBlkioResourceHandlerImpl.java    | 116 +++++++++++++
 .../linux/resources/TestCGroupsHandlerImpl.java | 101 +++++++++--
 .../resources/TestResourceHandlerModule.java    |  26 ++-
 .../util/TestCgroupsLCEResourcesHandler.java    | 112 +++++-------
 12 files changed, 600 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 28fcae4..dcf3538 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -105,6 +105,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2498. Respect labels in preemption policy of capacity scheduler for
     inter-queue preemption. (Wangda Tan via jianhe)
 
+    YARN-2619. Added NodeManager support for disk io isolation through cgroups.
+    (Varun Vasudev and Wei Yan via vinodkv)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index eb568b9..70b87f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -823,38 +823,68 @@ public class YarnConfiguration extends Configuration {
   public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
       100;
 
+  /**
+   * Prefix for disk configurations. Work in progress: This configuration
+   * parameter may be changed/removed in the future.
+   */
+  @Private
+  public static final String NM_DISK_RESOURCE_PREFIX = NM_PREFIX
+      + "resource.disk.";
+  /**
+   * This setting controls if resource handling for disk operations is enabled.
+   * Work in progress: This configuration parameter may be changed/removed in
+   * the future
+   */
+  @Private
+  public static final String NM_DISK_RESOURCE_ENABLED = NM_DISK_RESOURCE_PREFIX
+      + "enabled";
+  /** Disk as a resource is disabled by default. **/
+  @Private
+  public static final boolean DEFAULT_NM_DISK_RESOURCE_ENABLED = false;
 
-  public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX + "resource.network.";
+  public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX
+      + "resource.network.";
 
-  /** This setting controls if resource handling for network bandwidth is enabled **/
-  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  /**
+   * This setting controls if resource handling for network bandwidth is
+   * enabled. Work in progress: This configuration parameter may be
+   * changed/removed in the future
+   */
   @Private
   public static final String NM_NETWORK_RESOURCE_ENABLED =
       NM_NETWORK_RESOURCE_PREFIX + "enabled";
-  /** Network as a resource is disabled by default **/
+  /** Network as a resource is disabled by default. **/
   @Private
   public static final boolean DEFAULT_NM_NETWORK_RESOURCE_ENABLED = false;
 
-  /** Specifies the interface to be used for applying network throttling rules **/
-  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  /**
+   * Specifies the interface to be used for applying network throttling rules.
+   * Work in progress: This configuration parameter may be changed/removed in
+   * the future
+   */
   @Private
   public static final String NM_NETWORK_RESOURCE_INTERFACE =
       NM_NETWORK_RESOURCE_PREFIX + "interface";
   @Private
   public static final String DEFAULT_NM_NETWORK_RESOURCE_INTERFACE = "eth0";
 
-  /** Specifies the total available outbound bandwidth on the node **/
-  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  /**
+   * Specifies the total available outbound bandwidth on the node. Work in
+   * progress: This configuration parameter may be changed/removed in the future
+   */
   @Private
   public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT =
       NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-mbit";
   @Private
-  public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT = 1000;
+  public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT =
+      1000;
 
-  /** Specifies the total outbound bandwidth available to YARN containers. defaults to
-   * NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified.
+  /**
+   * Specifies the total outbound bandwidth available to YARN containers.
+   * defaults to NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified.
+   * Work in progress: This configuration parameter may be changed/removed in
+   * the future
    */
-  /* Work in progress: This configuration parameter may be changed/removed in the future */
   @Private
   public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT =
       NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java
new file mode 100644
index 0000000..e7eea1f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Handler class to handle the blkio controller. Currently it splits resources
+ * evenly across all containers. Once we have scheduling sorted out, we can
+ * modify the function to represent the disk resources allocated.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CGroupsBlkioResourceHandlerImpl implements DiskResourceHandler {
+
+  static final Log LOG = LogFactory
+      .getLog(CGroupsBlkioResourceHandlerImpl.class);
+
+  private CGroupsHandler cGroupsHandler;
+  // Arbitrarily choose a weight - all that matters is that all containers
+  // get the same weight assigned to them. Once we have scheduling support
+  // this number will be determined dynamically for each container.
+  @VisibleForTesting
+  static final String DEFAULT_WEIGHT = "500";
+  private static final String PARTITIONS_FILE = "/proc/partitions";
+
+  CGroupsBlkioResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
+    this.cGroupsHandler = cGroupsHandler;
+    // check for linux so that we don't print messages for tests running on
+    // other platforms
+    if(Shell.LINUX) {
+      checkDiskScheduler();
+    }
+  }
+
+
+  private void checkDiskScheduler() {
+    String data;
+
+    // read /proc/partitions and check to make sure that sd* and hd*
+    // are using the CFQ scheduler. If they aren't print a warning
+    try {
+      byte[] contents = Files.readAllBytes(Paths.get(PARTITIONS_FILE));
+      data = new String(contents, "UTF-8").trim();
+    } catch (IOException e) {
+      String msg = "Couldn't read " + PARTITIONS_FILE +
+          "; can't determine disk scheduler type";
+      LOG.warn(msg, e);
+      return;
+    }
+    String[] lines = data.split(System.lineSeparator());
+    if (lines.length > 0) {
+      for (String line : lines) {
+        String[] columns = line.split("\\s+");
+        if (columns.length > 4) {
+          String partition = columns[4];
+          // check some known partitions to make sure  the disk scheduler
+          // is cfq - not meant to be comprehensive, more a sanity check
+          if (partition.startsWith("sd") || partition.startsWith("hd")
+              || partition.startsWith("vd") || partition.startsWith("xvd")) {
+            String schedulerPath =
+                "/sys/block/" + partition + "/queue/scheduler";
+            File schedulerFile = new File(schedulerPath);
+            if (schedulerFile.exists()) {
+              try {
+                byte[] contents = Files.readAllBytes(Paths.get(schedulerPath));
+                String schedulerString = new String(contents, "UTF-8").trim();
+                if (!schedulerString.contains("[cfq]")) {
+                  LOG.warn("Device " + partition + " does not use the CFQ"
+                      + " scheduler; disk isolation using "
+                      + "CGroups will not work on this partition.");
+                }
+              } catch (IOException ie) {
+                LOG.warn(
+                    "Unable to determine disk scheduler type for partition "
+                      + partition, ie);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<PrivilegedOperation> bootstrap(Configuration configuration)
+      throws ResourceHandlerException {
+    // if bootstrap is called on this class, disk is already enabled
+    // so no need to check again
+    this.cGroupsHandler
+      .mountCGroupController(CGroupsHandler.CGroupController.BLKIO);
+    return null;
+  }
+
+  @Override
+  public List<PrivilegedOperation> preStart(Container container)
+      throws ResourceHandlerException {
+
+    String cgroupId = container.getContainerId().toString();
+    cGroupsHandler
+      .createCGroup(CGroupsHandler.CGroupController.BLKIO, cgroupId);
+    try {
+      cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.BLKIO,
+          cgroupId, CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT, DEFAULT_WEIGHT);
+    } catch (ResourceHandlerException re) {
+      cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO,
+          cgroupId);
+      LOG.warn("Could not update cgroup for container", re);
+      throw re;
+    }
+    List<PrivilegedOperation> ret = new ArrayList<>();
+    ret.add(new PrivilegedOperation(
+      PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+      PrivilegedOperation.CGROUP_ARG_PREFIX
+          + cGroupsHandler.getPathForCGroupTasks(
+            CGroupsHandler.CGroupController.BLKIO, cgroupId)));
+    return ret;
+  }
+
+  @Override
+  public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
+      throws ResourceHandlerException {
+    return null;
+  }
+
+  @Override
+  public List<PrivilegedOperation> postComplete(ContainerId containerId)
+      throws ResourceHandlerException {
+    cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO,
+        containerId.toString());
+    return null;
+  }
+
+  @Override
+  public List<PrivilegedOperation> teardown() throws ResourceHandlerException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
index 34429d3..70dc818 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
@@ -33,7 +33,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 public interface CGroupsHandler {
   public enum CGroupController {
     CPU("cpu"),
-    NET_CLS("net_cls");
+    NET_CLS("net_cls"),
+    BLKIO("blkio");
 
     private final String name;
 
@@ -48,6 +49,7 @@ public interface CGroupsHandler {
 
   public static final String CGROUP_FILE_TASKS = "tasks";
   public static final String CGROUP_PARAM_CLASSID = "classid";
+  public static final String CGROUP_PARAM_BLKIO_WEIGHT = "weight";
 
   /**
    * Mounts a cgroup controller

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
index 9a4230f..ff56121 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
@@ -20,6 +20,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -38,6 +39,7 @@ import java.io.*;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -63,7 +65,7 @@ class CGroupsHandlerImpl implements CGroupsHandler {
   private final String cGroupMountPath;
   private final long deleteCGroupTimeout;
   private final long deleteCGroupDelay;
-  private final Map<CGroupController, String> controllerPaths;
+  private Map<CGroupController, String> controllerPaths;
   private final ReadWriteLock rwLock;
   private final PrivilegedOperationExecutor privilegedOperationExecutor;
   private final Clock clock;
@@ -106,55 +108,61 @@ class CGroupsHandlerImpl implements CGroupsHandler {
 
   private void initializeControllerPaths() throws ResourceHandlerException {
     if (enableCGroupMount) {
-      //nothing to do here - we support 'deferred' mounting of specific
-      //controllers - we'll populate the path for a given controller when an
-      //explicit mountCGroupController request is issued.
+      // nothing to do here - we support 'deferred' mounting of specific
+      // controllers - we'll populate the path for a given controller when an
+      // explicit mountCGroupController request is issued.
       LOG.info("CGroup controller mounting enabled.");
     } else {
-      //cluster admins are expected to have mounted controllers in specific
-      //locations - we'll attempt to figure out mount points
-      initializeControllerPathsFromMtab();
+      // cluster admins are expected to have mounted controllers in specific
+      // locations - we'll attempt to figure out mount points
+
+      Map<CGroupController, String> cPaths =
+          initializeControllerPathsFromMtab(MTAB_FILE, this.cGroupPrefix);
+      // we want to do a bulk update without the paths changing concurrently
+      try {
+        rwLock.writeLock().lock();
+        controllerPaths = cPaths;
+      } finally {
+        rwLock.writeLock().unlock();
+      }
     }
   }
 
-  private void initializeControllerPathsFromMtab()
-      throws ResourceHandlerException {
+  @VisibleForTesting
+  static Map<CGroupController, String> initializeControllerPathsFromMtab(
+      String mtab, String cGroupPrefix) throws ResourceHandlerException {
     try {
-      Map<String, List<String>> parsedMtab = parseMtab();
-
-      //we want to do a bulk update without the paths changing concurrently
-      rwLock.writeLock().lock();
+      Map<String, List<String>> parsedMtab = parseMtab(mtab);
+      Map<CGroupController, String> ret = new HashMap<>();
 
       for (CGroupController controller : CGroupController.values()) {
         String name = controller.getName();
         String controllerPath = findControllerInMtab(name, parsedMtab);
 
         if (controllerPath != null) {
-          File f = new File(controllerPath + "/" + this.cGroupPrefix);
+          File f = new File(controllerPath + "/" + cGroupPrefix);
 
           if (FileUtil.canWrite(f)) {
-            controllerPaths.put(controller, controllerPath);
+            ret.put(controller, controllerPath);
           } else {
             String error =
                 new StringBuffer("Mount point Based on mtab file: ")
-                    .append(MTAB_FILE).append(
-                    ". Controller mount point not writable for: ")
-                    .append(name).toString();
+                  .append(mtab)
+                  .append(". Controller mount point not writable for: ")
+                  .append(name).toString();
 
             LOG.error(error);
             throw new ResourceHandlerException(error);
           }
         } else {
-
-            LOG.warn("Controller not mounted but automount disabled: " + name);
+          LOG.warn("Controller not mounted but automount disabled: " + name);
         }
       }
+      return ret;
     } catch (IOException e) {
       LOG.warn("Failed to initialize controller paths! Exception: " + e);
       throw new ResourceHandlerException(
-          "Failed to initialize controller paths!");
-    } finally {
-      rwLock.writeLock().unlock();
+        "Failed to initialize controller paths!");
     }
   }
 
@@ -173,12 +181,13 @@ class CGroupsHandlerImpl implements CGroupsHandler {
    * for mounts with type "cgroup". Cgroup controllers will
    * appear in the list of options for a path.
    */
-  private Map<String, List<String>> parseMtab() throws IOException {
+  private static Map<String, List<String>> parseMtab(String mtab)
+      throws IOException {
     Map<String, List<String>> ret = new HashMap<String, List<String>>();
     BufferedReader in = null;
 
     try {
-      FileInputStream fis = new FileInputStream(new File(getMtabFileName()));
+      FileInputStream fis = new FileInputStream(new File(mtab));
       in = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
 
       for (String str = in.readLine(); str != null;
@@ -197,7 +206,7 @@ class CGroupsHandlerImpl implements CGroupsHandler {
         }
       }
     } catch (IOException e) {
-      throw new IOException("Error while reading " + getMtabFileName(), e);
+      throw new IOException("Error while reading " + mtab, e);
     } finally {
       IOUtils.cleanup(LOG, in);
     }
@@ -205,7 +214,7 @@ class CGroupsHandlerImpl implements CGroupsHandler {
     return ret;
   }
 
-  private String findControllerInMtab(String controller,
+  private static String findControllerInMtab(String controller,
       Map<String, List<String>> entries) {
     for (Map.Entry<String, List<String>> e : entries.entrySet()) {
       if (e.getValue().contains(controller))
@@ -215,10 +224,6 @@ class CGroupsHandlerImpl implements CGroupsHandler {
     return null;
   }
 
-  String getMtabFileName() {
-    return MTAB_FILE;
-  }
-
   @Override
   public void mountCGroupController(CGroupController controller)
       throws ResourceHandlerException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DiskResourceHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DiskResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DiskResourceHandler.java
new file mode 100644
index 0000000..ca08d89
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DiskResourceHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Resource handler for disk resources.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface DiskResourceHandler extends ResourceHandler {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
index 30fc951..5dfd78c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
@@ -20,6 +20,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -31,25 +32,27 @@ import java.util.List;
 
 /**
  * Provides mechanisms to get various resource handlers - cpu, memory, network,
- * disk etc., - based on configuration
+ * disk etc., - based on configuration.
  */
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class ResourceHandlerModule {
-  private volatile static ResourceHandlerChain resourceHandlerChain;
+  private static volatile ResourceHandlerChain resourceHandlerChain;
 
   /**
    * This specific implementation might provide resource management as well
    * as resource metrics functionality. We need to ensure that the same
    * instance is used for both.
    */
-  private volatile static TrafficControlBandwidthHandlerImpl
+  private static volatile TrafficControlBandwidthHandlerImpl
       trafficControlBandwidthHandler;
-  private volatile static CGroupsHandler cGroupsHandler;
+  private static volatile CGroupsHandler cGroupsHandler;
+  private static volatile CGroupsBlkioResourceHandlerImpl
+      cGroupsBlkioResourceHandler;
 
   /**
-   * Returns an initialized, thread-safe CGroupsHandler instance
+   * Returns an initialized, thread-safe CGroupsHandler instance.
    */
   public static CGroupsHandler getCGroupsHandler(Configuration conf)
       throws ResourceHandlerException {
@@ -94,6 +97,28 @@ public class ResourceHandlerModule {
     return getTrafficControlBandwidthHandler(conf);
   }
 
+  public static DiskResourceHandler getDiskResourceHandler(Configuration conf)
+      throws ResourceHandlerException {
+    if (conf.getBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED,
+        YarnConfiguration.DEFAULT_NM_DISK_RESOURCE_ENABLED)) {
+      return getCgroupsBlkioResourceHandler(conf);
+    }
+    return null;
+  }
+
+  private static CGroupsBlkioResourceHandlerImpl getCgroupsBlkioResourceHandler(
+      Configuration conf) throws ResourceHandlerException {
+    if (cGroupsBlkioResourceHandler == null) {
+      synchronized (DiskResourceHandler.class) {
+        if (cGroupsBlkioResourceHandler == null) {
+          cGroupsBlkioResourceHandler =
+              new CGroupsBlkioResourceHandlerImpl(getCGroupsHandler(conf));
+        }
+      }
+    }
+    return cGroupsBlkioResourceHandler;
+  }
+
   private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
       ResourceHandler handler) {
     if (handler != null) {
@@ -106,11 +131,12 @@ public class ResourceHandlerModule {
     ArrayList<ResourceHandler> handlerList = new ArrayList<>();
 
     addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf));
+    addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf));
     resourceHandlerChain = new ResourceHandlerChain(handlerList);
   }
 
-  public static ResourceHandlerChain getConfiguredResourceHandlerChain
-      (Configuration conf) throws ResourceHandlerException {
+  public static ResourceHandlerChain getConfiguredResourceHandlerChain(
+      Configuration conf) throws ResourceHandlerException {
     if (resourceHandlerChain == null) {
       synchronized (ResourceHandlerModule.class) {
         if (resourceHandlerChain == null) {
@@ -125,4 +151,9 @@ public class ResourceHandlerModule {
       return null;
     }
   }
+
+  @VisibleForTesting
+  static void nullifyResourceHandlerChain() throws ResourceHandlerException {
+    resourceHandlerChain = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
index ffa17ac..176b63c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
@@ -30,6 +30,7 @@ import java.io.PrintWriter;
 import java.io.Writer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -503,4 +504,9 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
   String getMtabFileName() {
     return MTAB_FILE;
   }
+
+  @VisibleForTesting
+  Map<String, String> getControllerPaths() {
+    return Collections.unmodifiableMap(controllerPaths);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java
new file mode 100644
index 0000000..20aab69
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+
+/**
+ * Tests for the cgroups disk handler implementation.
+ */
+public class TestCGroupsBlkioResourceHandlerImpl {
+
+  private CGroupsHandler mockCGroupsHandler;
+  private CGroupsBlkioResourceHandlerImpl cGroupsBlkioResourceHandlerImpl;
+
+  @Before
+  public void setup() {
+    mockCGroupsHandler = mock(CGroupsHandler.class);
+    cGroupsBlkioResourceHandlerImpl =
+        new CGroupsBlkioResourceHandlerImpl(mockCGroupsHandler);
+  }
+
+  @Test
+  public void testBootstrap() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    List<PrivilegedOperation> ret =
+        cGroupsBlkioResourceHandlerImpl.bootstrap(conf);
+    verify(mockCGroupsHandler, times(1)).mountCGroupController(
+        CGroupsHandler.CGroupController.BLKIO);
+    Assert.assertNull(ret);
+  }
+
+  @Test
+  public void testPreStart() throws Exception {
+    String id = "container_01_01";
+    String path = "test-path/" + id;
+    ContainerId mockContainerId = mock(ContainerId.class);
+    when(mockContainerId.toString()).thenReturn(id);
+    Container mockContainer = mock(Container.class);
+    when(mockContainer.getContainerId()).thenReturn(mockContainerId);
+    when(
+      mockCGroupsHandler.getPathForCGroupTasks(
+        CGroupsHandler.CGroupController.BLKIO, id)).thenReturn(path);
+
+    List<PrivilegedOperation> ret =
+        cGroupsBlkioResourceHandlerImpl.preStart(mockContainer);
+    verify(mockCGroupsHandler, times(1)).createCGroup(
+        CGroupsHandler.CGroupController.BLKIO, id);
+    verify(mockCGroupsHandler, times(1)).updateCGroupParam(
+        CGroupsHandler.CGroupController.BLKIO, id,
+        CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT,
+        CGroupsBlkioResourceHandlerImpl.DEFAULT_WEIGHT);
+    Assert.assertNotNull(ret);
+    Assert.assertEquals(1, ret.size());
+    PrivilegedOperation op = ret.get(0);
+    Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+        op.getOperationType());
+    List<String> args = op.getArguments();
+    Assert.assertEquals(1, args.size());
+    Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
+        args.get(0));
+  }
+
+  @Test
+  public void testReacquireContainer() throws Exception {
+    ContainerId containerIdMock = mock(ContainerId.class);
+    Assert.assertNull(cGroupsBlkioResourceHandlerImpl
+        .reacquireContainer(containerIdMock));
+  }
+
+  @Test
+  public void testPostComplete() throws Exception {
+    String id = "container_01_01";
+    ContainerId mockContainerId = mock(ContainerId.class);
+    when(mockContainerId.toString()).thenReturn(id);
+    Assert.assertNull(cGroupsBlkioResourceHandlerImpl
+        .postComplete(mockContainerId));
+    verify(mockCGroupsHandler, times(1)).deleteCGroup(
+        CGroupsHandler.CGroupController.BLKIO, id);
+  }
+
+  @Test
+  public void testTeardown() throws Exception {
+    Assert.assertNull(cGroupsBlkioResourceHandlerImpl.teardown());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java
index 0717447..50f8da6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java
@@ -20,6 +20,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,18 +36,21 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 
+/**
+ * Tests for the CGroups handler implementation.
+ */
 public class TestCGroupsHandlerImpl {
   private static final Log LOG =
       LogFactory.getLog(TestCGroupsHandlerImpl.class);
@@ -84,8 +88,8 @@ public class TestCGroupsHandlerImpl {
     try {
       cGroupsHandler = new CGroupsHandlerImpl(conf,
           privilegedOperationExecutorMock);
-      PrivilegedOperation expectedOp = new PrivilegedOperation
-          (PrivilegedOperation.OperationType.MOUNT_CGROUPS, (String) null);
+      PrivilegedOperation expectedOp = new PrivilegedOperation(
+          PrivilegedOperation.OperationType.MOUNT_CGROUPS, (String) null);
       //This is expected to be of the form :
       //net_cls=<mount_path>/net_cls
       StringBuffer controllerKV = new StringBuffer(controller.getName())
@@ -94,8 +98,8 @@ public class TestCGroupsHandlerImpl {
 
       cGroupsHandler.mountCGroupController(controller);
       try {
-        ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
-            (PrivilegedOperation.class);
+        ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass(
+            PrivilegedOperation.class);
         verify(privilegedOperationExecutorMock)
             .executePrivilegedOperation(opCaptor.capture(), eq(false));
 
@@ -200,17 +204,15 @@ public class TestCGroupsHandlerImpl {
 
       Assert.assertTrue(paramFile.exists());
       try {
-        Assert.assertEquals(paramValue, new String(Files.readAllBytes
-            (paramFile
-                .toPath())));
+        Assert.assertEquals(paramValue, new String(Files.readAllBytes(
+            paramFile.toPath())));
       } catch (IOException e) {
         LOG.error("Caught exception: " + e);
-        Assert.assertTrue("Unexpected IOException trying to read cgroup param!",
-            false);
+        Assert.fail("Unexpected IOException trying to read cgroup param!");
       }
 
-      Assert.assertEquals(paramValue, cGroupsHandler.getCGroupParam
-          (controller, testCGroup, param));
+      Assert.assertEquals(paramValue,
+          cGroupsHandler.getCGroupParam(controller, testCGroup, param));
 
       //We can't really do a delete test here. Linux cgroups
       //implementation provides additional semantics - the cgroup cannot be
@@ -222,10 +224,77 @@ public class TestCGroupsHandlerImpl {
       //delete is not possible with a regular non-empty directory.
     } catch (ResourceHandlerException e) {
       LOG.error("Caught exception: " + e);
-      Assert.assertTrue(
-          "Unexpected ResourceHandlerException during cgroup operations!",
-          false);
+      Assert
+        .fail("Unexpected ResourceHandlerException during cgroup operations!");
+    }
+  }
+
+  public static File createMockCgroupMount(File parentDir, String type)
+      throws IOException {
+    return createMockCgroupMount(parentDir, type, "hadoop-yarn");
+  }
+
+  public static File createMockCgroupMount(File parentDir, String type,
+      String hierarchy) throws IOException {
+    File cgroupMountDir =
+        new File(parentDir.getAbsolutePath(), type + "/" + hierarchy);
+    FileUtils.deleteQuietly(cgroupMountDir);
+    if (!cgroupMountDir.mkdirs()) {
+      String message =
+          "Could not create dir " + cgroupMountDir.getAbsolutePath();
+      throw new IOException(message);
     }
+    return cgroupMountDir;
+  }
+
+  public static File createMockMTab(File parentDir) throws IOException {
+    String cpuMtabContent =
+        "none " + parentDir.getAbsolutePath()
+            + "/cpu cgroup rw,relatime,cpu 0 0\n";
+    String blkioMtabContent =
+        "none " + parentDir.getAbsolutePath()
+            + "/blkio cgroup rw,relatime,blkio 0 0\n";
+
+    File mockMtab = new File(parentDir, UUID.randomUUID().toString());
+    if (!mockMtab.exists()) {
+      if (!mockMtab.createNewFile()) {
+        String message = "Could not create file " + mockMtab.getAbsolutePath();
+        throw new IOException(message);
+      }
+    }
+    FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
+    mtabWriter.write(cpuMtabContent);
+    mtabWriter.write(blkioMtabContent);
+    mtabWriter.close();
+    mockMtab.deleteOnExit();
+    return mockMtab;
+  }
+
+
+  @Test
+  public void testMtabParsing() throws Exception {
+    File parentDir = new File(tmpPath);
+    // create mock cgroup
+    File cpuCgroupMountDir = createMockCgroupMount(parentDir, "cpu",
+        hierarchy);
+    Assert.assertTrue(cpuCgroupMountDir.exists());
+    File blkioCgroupMountDir = createMockCgroupMount(parentDir,
+        "blkio", hierarchy);
+    Assert.assertTrue(blkioCgroupMountDir.exists());
+    File mockMtabFile = createMockMTab(parentDir);
+    Map<CGroupsHandler.CGroupController, String> controllerPaths =
+        CGroupsHandlerImpl.initializeControllerPathsFromMtab(
+          mockMtabFile.getAbsolutePath(), hierarchy);
+    Assert.assertEquals(2, controllerPaths.size());
+    Assert.assertTrue(controllerPaths
+        .containsKey(CGroupsHandler.CGroupController.CPU));
+    Assert.assertTrue(controllerPaths
+        .containsKey(CGroupsHandler.CGroupController.BLKIO));
+    String cpuDir = controllerPaths.get(CGroupsHandler.CGroupController.CPU);
+    String blkioDir =
+        controllerPaths.get(CGroupsHandler.CGroupController.BLKIO);
+    Assert.assertEquals(parentDir.getAbsolutePath() + "/cpu", cpuDir);
+    Assert.assertEquals(parentDir.getAbsolutePath() + "/blkio", blkioDir);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
index 939dfe7..69479d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
@@ -37,7 +37,7 @@ public class TestResourceHandlerModule {
   Configuration networkEnabledConf;
 
   @Before
-  public void setup() {
+  public void setup() throws Exception {
     emptyConf = new YarnConfiguration();
     networkEnabledConf = new YarnConfiguration();
 
@@ -46,6 +46,7 @@ public class TestResourceHandlerModule {
     //We need to bypass mtab parsing for figuring out cgroups mount locations
     networkEnabledConf.setBoolean(YarnConfiguration
         .NM_LINUX_CONTAINER_CGROUPS_MOUNT, true);
+    ResourceHandlerModule.nullifyResourceHandlerChain();
   }
 
   @Test
@@ -75,4 +76,27 @@ public class TestResourceHandlerModule {
       Assert.fail("Unexpected ResourceHandlerException: " + e);
     }
   }
+
+  @Test
+  public void testDiskResourceHandler() throws Exception {
+
+    DiskResourceHandler handler =
+        ResourceHandlerModule.getDiskResourceHandler(emptyConf);
+    Assert.assertNull(handler);
+
+    Configuration diskConf = new YarnConfiguration();
+    diskConf.setBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, true);
+
+    handler = ResourceHandlerModule.getDiskResourceHandler(diskConf);
+    Assert.assertNotNull(handler);
+
+    ResourceHandlerChain resourceHandlerChain =
+        ResourceHandlerModule.getConfiguredResourceHandlerChain(diskConf);
+    List<ResourceHandler> resourceHandlers =
+        resourceHandlerChain.getResourceHandlerList();
+    // Exactly one resource handler in chain
+    Assert.assertEquals(resourceHandlers.size(), 1);
+    // Same instance is expected to be in the chain.
+    Assert.assertTrue(resourceHandlers.get(0) == handler);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b3b9e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java
index 4e35169..8e9d787 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java
@@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.TestCGroupsHandlerImpl;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.junit.Assert;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -33,7 +34,6 @@ import org.mockito.Mockito;
 import java.io.*;
 import java.util.List;
 import java.util.Scanner;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
 public class TestCgroupsLCEResourcesHandler {
@@ -142,7 +142,7 @@ public class TestCgroupsLCEResourcesHandler {
 
     @Override
     int[] getOverallLimits(float x) {
-      if (generateLimitsMode == true) {
+      if (generateLimitsMode) {
         return super.getOverallLimits(x);
       }
       return limits;
@@ -172,10 +172,11 @@ public class TestCgroupsLCEResourcesHandler {
     handler.initConfig();
 
     // create mock cgroup
-    File cgroupMountDir = createMockCgroupMount(cgroupDir);
+    File cpuCgroupMountDir = TestCGroupsHandlerImpl.createMockCgroupMount(
+        cgroupDir, "cpu");
 
     // create mock mtab
-    File mockMtab = createMockMTab(cgroupDir);
+    File mockMtab = TestCGroupsHandlerImpl.createMockMTab(cgroupDir);
 
     // setup our handler and call init()
     handler.setMtabFile(mockMtab.getAbsolutePath());
@@ -184,8 +185,8 @@ public class TestCgroupsLCEResourcesHandler {
     // in this case, we're using all cpu so the files
     // shouldn't exist(because init won't create them
     handler.init(mockLCE, plugin);
-    File periodFile = new File(cgroupMountDir, "cpu.cfs_period_us");
-    File quotaFile = new File(cgroupMountDir, "cpu.cfs_quota_us");
+    File periodFile = new File(cpuCgroupMountDir, "cpu.cfs_period_us");
+    File quotaFile = new File(cpuCgroupMountDir, "cpu.cfs_quota_us");
     Assert.assertFalse(periodFile.exists());
     Assert.assertFalse(quotaFile.exists());
 
@@ -202,7 +203,7 @@ public class TestCgroupsLCEResourcesHandler {
 
     // set cpu back to 100, quota should be -1
     conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
-      100);
+        100);
     handler.limits[0] = 100 * 1000;
     handler.limits[1] = 1000 * 1000;
     handler.init(mockLCE, plugin);
@@ -235,7 +236,7 @@ public class TestCgroupsLCEResourcesHandler {
     Assert.assertEquals(expectedQuota, ret[0]);
     Assert.assertEquals(-1, ret[1]);
 
-    int[] params = { 0, -1 };
+    int[] params = {0, -1};
     for (int cores : params) {
       try {
         handler.getOverallLimits(cores);
@@ -251,34 +252,6 @@ public class TestCgroupsLCEResourcesHandler {
     Assert.assertEquals(-1, ret[1]);
   }
 
-  private File createMockCgroupMount(File cgroupDir) throws IOException {
-    File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
-    FileUtils.deleteQuietly(cgroupDir);
-    if (!cgroupMountDir.mkdirs()) {
-      String message =
-          "Could not create dir " + cgroupMountDir.getAbsolutePath();
-      throw new IOException(message);
-    }
-    return cgroupMountDir;
-  }
-
-  private File createMockMTab(File cgroupDir) throws IOException {
-    String mtabContent =
-        "none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
-    File mockMtab = new File("target", UUID.randomUUID().toString());
-    if (!mockMtab.exists()) {
-      if (!mockMtab.createNewFile()) {
-        String message = "Could not create file " + mockMtab.getAbsolutePath();
-        throw new IOException(message);
-      }
-    }
-    FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
-    mtabWriter.write(mtabContent);
-    mtabWriter.close();
-    mockMtab.deleteOnExit();
-    return mockMtab;
-  }
-
   @Test
   public void testContainerLimits() throws IOException {
     LinuxContainerExecutor mockLCE = new MockLinuxContainerExecutor();
@@ -286,6 +259,7 @@ public class TestCgroupsLCEResourcesHandler {
         new CustomCgroupsLCEResourceHandler();
     handler.generateLimitsMode = true;
     YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, true);
     final int numProcessors = 4;
     ResourceCalculatorPlugin plugin =
         Mockito.mock(ResourceCalculatorPlugin.class);
@@ -294,71 +268,77 @@ public class TestCgroupsLCEResourcesHandler {
     handler.initConfig();
 
     // create mock cgroup
-    File cgroupMountDir = createMockCgroupMount(cgroupDir);
+    File cpuCgroupMountDir = TestCGroupsHandlerImpl.createMockCgroupMount(
+        cgroupDir, "cpu");
 
     // create mock mtab
-    File mockMtab = createMockMTab(cgroupDir);
+    File mockMtab = TestCGroupsHandlerImpl.createMockMTab(cgroupDir);
 
     // setup our handler and call init()
     handler.setMtabFile(mockMtab.getAbsolutePath());
     handler.init(mockLCE, plugin);
 
-    // check values
-    // default case - files shouldn't exist, strict mode off by default
+    // check the controller paths map isn't empty
     ContainerId id = ContainerId.fromString("container_1_1_1_1");
     handler.preExecute(id, Resource.newInstance(1024, 1));
-    File containerDir = new File(cgroupMountDir, id.toString());
-    Assert.assertTrue(containerDir.exists());
-    Assert.assertTrue(containerDir.isDirectory());
-    File periodFile = new File(containerDir, "cpu.cfs_period_us");
-    File quotaFile = new File(containerDir, "cpu.cfs_quota_us");
+    Assert.assertNotNull(handler.getControllerPaths());
+    // check values
+    // default case - files shouldn't exist, strict mode off by default
+    File containerCpuDir = new File(cpuCgroupMountDir, id.toString());
+    Assert.assertTrue(containerCpuDir.exists());
+    Assert.assertTrue(containerCpuDir.isDirectory());
+    File periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
+    File quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
     Assert.assertFalse(periodFile.exists());
     Assert.assertFalse(quotaFile.exists());
 
     // no files created because we're using all cpu
-    FileUtils.deleteQuietly(containerDir);
+    FileUtils.deleteQuietly(containerCpuDir);
     conf.setBoolean(
-      YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
+        YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
+        true);
     handler.initConfig();
     handler.preExecute(id,
-      Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES));
-    Assert.assertTrue(containerDir.exists());
-    Assert.assertTrue(containerDir.isDirectory());
-    periodFile = new File(containerDir, "cpu.cfs_period_us");
-    quotaFile = new File(containerDir, "cpu.cfs_quota_us");
+        Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES));
+    Assert.assertTrue(containerCpuDir.exists());
+    Assert.assertTrue(containerCpuDir.isDirectory());
+    periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
+    quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
     Assert.assertFalse(periodFile.exists());
     Assert.assertFalse(quotaFile.exists());
 
     // 50% of CPU
-    FileUtils.deleteQuietly(containerDir);
+    FileUtils.deleteQuietly(containerCpuDir);
     conf.setBoolean(
-      YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
+        YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
+        true);
     handler.initConfig();
     handler.preExecute(id,
-      Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
-    Assert.assertTrue(containerDir.exists());
-    Assert.assertTrue(containerDir.isDirectory());
-    periodFile = new File(containerDir, "cpu.cfs_period_us");
-    quotaFile = new File(containerDir, "cpu.cfs_quota_us");
+        Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
+    Assert.assertTrue(containerCpuDir.exists());
+    Assert.assertTrue(containerCpuDir.isDirectory());
+    periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
+    quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
     Assert.assertTrue(periodFile.exists());
     Assert.assertTrue(quotaFile.exists());
     Assert.assertEquals(500 * 1000, readIntFromFile(periodFile));
     Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile));
 
     // CGroups set to 50% of CPU, container set to 50% of YARN CPU
-    FileUtils.deleteQuietly(containerDir);
+    FileUtils.deleteQuietly(containerCpuDir);
     conf.setBoolean(
-      YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
+        YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
+        true);
     conf
       .setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50);
     handler.initConfig();
     handler.init(mockLCE, plugin);
     handler.preExecute(id,
-      Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
-    Assert.assertTrue(containerDir.exists());
-    Assert.assertTrue(containerDir.isDirectory());
-    periodFile = new File(containerDir, "cpu.cfs_period_us");
-    quotaFile = new File(containerDir, "cpu.cfs_quota_us");
+        Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
+    Assert.assertTrue(containerCpuDir.exists());
+    Assert.assertTrue(containerCpuDir.isDirectory());
+    periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
+    quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
     Assert.assertTrue(periodFile.exists());
     Assert.assertTrue(quotaFile.exists());
     Assert.assertEquals(1000 * 1000, readIntFromFile(periodFile));