You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/12/15 20:50:17 UTC

[2/2] hadoop git commit: YARN-2168. [YARN-2139] Avoid over-allocation of disk resources. (Wei Yan via kasha)

YARN-2168. [YARN-2139] Avoid over-allocation of disk resources. (Wei Yan via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bad01fe3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bad01fe3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bad01fe3

Branch: refs/heads/YARN-2139
Commit: bad01fe30516ec27ea232083af9a408c1b2dee68
Parents: 6e13fc6
Author: Karthik Kambatla <ka...@apache.org>
Authored: Mon Dec 15 11:50:21 2014 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Mon Dec 15 11:50:21 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   5 +
 .../hadoop/yarn/api/records/Resource.java       |  33 ++++-
 .../hadoop/yarn/conf/YarnConfiguration.java     |  20 ++-
 .../src/main/proto/yarn_protos.proto            |   1 +
 .../apache/hadoop/yarn/client/cli/NodeCLI.java  |   5 +
 .../yarn/client/TestResourceTrackerOnHA.java    |   2 +-
 .../hadoop/yarn/client/cli/TestYarnCLI.java     |   4 +
 .../api/records/impl/pb/ResourcePBImpl.java     |  15 ++
 .../resource/DominantResourceCalculator.java    | 115 ++++++++------
 .../hadoop/yarn/util/resource/Resources.java    |  48 +++++-
 .../hadoop/yarn/TestYarnServerApiClasses.java   |   3 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      |   5 +-
 .../resourcemanager/ResourceTrackerService.java |   7 +-
 .../resourcemanager/resource/ResourceType.java  |   2 +-
 .../resource/ResourceWeights.java               |   5 +
 .../scheduler/SchedulerUtils.java               |  14 +-
 .../fair/policies/ComputeFairShares.java        |   5 +
 .../DominantResourceFairnessPolicy.java         |  38 ++++-
 .../fair/policies/FairSharePolicy.java          |   2 +-
 .../yarn/server/resourcemanager/MockNM.java     |  28 +++-
 .../server/resourcemanager/TestAppManager.java  |   5 +-
 .../TestResourceTrackerService.java             |  26 +++-
 .../TestWorkPreservingRMRestart.java            |   6 +-
 .../resource/TestResourceWeights.java           |  11 +-
 .../resourcemanager/resource/TestResources.java |  22 +--
 .../resourcetracker/TestNMExpiry.java           |   2 +-
 .../resourcetracker/TestNMReconnect.java        |   4 +-
 .../resourcetracker/TestRMNMRPCResponseId.java  |   2 +-
 .../scheduler/TestSchedulerUtils.java           | 113 +++++++++++---
 .../capacity/TestCapacityScheduler.java         |  68 ++++++++-
 .../scheduler/fair/FairSchedulerTestBase.java   |  20 ++-
 .../fair/TestAllocationFileLoaderService.java   |   8 +-
 .../scheduler/fair/TestComputeFairShares.java   |  14 +-
 .../scheduler/fair/TestFairScheduler.java       |  46 +++++-
 .../TestDominantResourceFairnessPolicy.java     | 148 ++++++++++---------
 .../webapp/TestRMWebServicesCapacitySched.java  |   7 +-
 36 files changed, 646 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6e74d14..d0fedc9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -32,6 +32,11 @@ Trunk - Unreleased
 
     YARN-2525. yarn logs command gives error on trunk (Akira AJISAKA via aw)
 
+YARN-2139 - Not merged
+
+    YARN-2168. [YARN-2139] Avoid over-allocation of disk resources. 
+    (Wei Yan via kasha)
+
 Release 2.7.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
index 88b57f1..5d506a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.api.records;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
@@ -54,9 +55,16 @@ public abstract class Resource implements Comparable<Resource> {
   @Public
   @Stable
   public static Resource newInstance(int memory, int vCores) {
+    return newInstance(memory, vCores, 0);
+  }
+
+  @Private
+  @Evolving
+  public static Resource newInstance(int memory, int vCores, int vDisks) {
     Resource resource = Records.newRecord(Resource.class);
     resource.setMemory(memory);
     resource.setVirtualCores(vCores);
+    resource.setVirtualDisks(vDisks);
     return resource;
   }
 
@@ -105,12 +113,31 @@ public abstract class Resource implements Comparable<Resource> {
   @Evolving
   public abstract void setVirtualCores(int vCores);
 
+  /**
+   * Get <em>number of virtual disk I/O</em> of the resource.
+   *
+   * @return <em>number of virtual disk I/O</em> of the resource
+   */
+  @Private
+  @Evolving
+  public abstract int getVirtualDisks();
+
+  /**
+   * Set <em>number of virtual disk I/O</em> of the resource.
+   *
+   * @param vDisks <em>number of virtual disk I/O</em> of the resource
+   */
+  @Private
+  @Evolving
+  public abstract void setVirtualDisks(int vDisks);
+
   @Override
   public int hashCode() {
     final int prime = 263167;
     int result = 3571;
     result = 939769357 + getMemory(); // prime * result = 939769357 initially
     result = prime * result + getVirtualCores();
+    result = prime * result + getVirtualDisks();
     return result;
   }
 
@@ -124,7 +151,8 @@ public abstract class Resource implements Comparable<Resource> {
       return false;
     Resource other = (Resource) obj;
     if (getMemory() != other.getMemory() || 
-        getVirtualCores() != other.getVirtualCores()) {
+        getVirtualCores() != other.getVirtualCores() ||
+        getVirtualDisks() != other.getVirtualDisks()) {
       return false;
     }
     return true;
@@ -132,6 +160,7 @@ public abstract class Resource implements Comparable<Resource> {
 
   @Override
   public String toString() {
-    return "<memory:" + getMemory() + ", vCores:" + getVirtualCores() + ">";
+    return "<memory:" + getMemory() + ", vCores:" + getVirtualCores()
+        + ", vDisks:" + getVirtualDisks() + ">";
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 55073c5..7e15560 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -160,7 +160,14 @@ public class YarnConfiguration extends Configuration {
   public static final int DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB = 1024;
   public static final String RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES =
       YARN_PREFIX + "scheduler.minimum-allocation-vcores";
-    public static final int DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES = 1;
+  public static final int DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES = 1;
+  /** Now we limit the vdisks for each container as 1. This will be updated
+   *  once we support the various vdisks requests. */
+  @Private
+  public static final String RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS =
+      YARN_PREFIX + "scheduler.minimum-allocation-disk-vdisks";
+  @Private
+  public static final int DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS = 1;
 
   /** Maximum request grant-able by the RM scheduler. */
   public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_MB =
@@ -169,6 +176,11 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES =
       YARN_PREFIX + "scheduler.maximum-allocation-vcores";
   public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4;
+  @Private
+  public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS =
+      YARN_PREFIX + "scheduler.maximum-allocation-disk-vdisks";
+  @Private
+  public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS = 1;
 
   /** Number of threads to handle scheduler interface.*/
   public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT =
@@ -794,6 +806,12 @@ public class YarnConfiguration extends Configuration {
   public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
       100;
   
+  /** Number of virtual disk I/O resources which can be allocated for containers. */
+  @Private
+  public static final String NM_DISK_VDISKS = NM_PREFIX + "resource.disk-vdisks";
+  @Private
+  public static final int DEFAULT_NM_DISK_VDISKS = 20;
+
   /** NM Webapp address.**/
   public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
   public static final int DEFAULT_NM_WEBAPP_PORT = 8042;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index c4e756d..65a36b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -56,6 +56,7 @@ message ContainerIdProto {
 message ResourceProto {
   optional int32 memory = 1;
   optional int32 virtual_cores = 2;
+  optional int32 virtual_disks = 3;
 }
 
 message ResourceOptionProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
index fa2779e..a793c8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
@@ -202,6 +202,11 @@ public class NodeCLI extends YarnCLI {
           : (nodeReport.getUsed().getVirtualCores() + " vcores"));
       nodeReportStr.print("\tCPU-Capacity : ");
       nodeReportStr.println(nodeReport.getCapability().getVirtualCores() + " vcores");
+      nodeReportStr.print("\tDisk-I/O-Used : ");
+      nodeReportStr.println((nodeReport.getUsed() == null) ?
+          "0 vdisks" : (nodeReport.getUsed().getVirtualDisks() + " vdisks"));
+      nodeReportStr.print("\tDisk-I/O-Capacity : ");
+      nodeReportStr.println(nodeReport.getCapability().getVirtualDisks() + " vdisks");
       nodeReportStr.print("\tNode-Labels : ");
       
       // Create a List for node labels since we need it get sorted

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
index 8885769..6a7f8da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
@@ -55,7 +55,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
   @Test(timeout = 15000)
   public void testResourceTrackerOnHA() throws Exception {
     NodeId nodeId = NodeId.newInstance("localhost", 0);
-    Resource resource = Resource.newInstance(2048, 4);
+    Resource resource = Resource.newInstance(2048, 4, 4);
 
     // make sure registerNodeManager works when failover happens
     RegisterNodeManagerRequest request =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index fa81f14..7aab6b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -1152,6 +1152,8 @@ public class TestYarnCLI {
     pw.println("\tMemory-Capacity : 0MB");
     pw.println("\tCPU-Used : 0 vcores");
     pw.println("\tCPU-Capacity : 0 vcores");
+    pw.println("\tDisk-I/O-Used : 0 vdisks");
+    pw.println("\tDisk-I/O-Capacity : 0 vdisks");
     pw.println("\tNode-Labels : a,b,c,x,y,z");
     pw.close();
     String nodeStatusStr = baos.toString("UTF-8");
@@ -1186,6 +1188,8 @@ public class TestYarnCLI {
     pw.println("\tMemory-Capacity : 0MB");
     pw.println("\tCPU-Used : 0 vcores");
     pw.println("\tCPU-Capacity : 0 vcores");
+    pw.println("\tDisk-I/O-Used : 0 vdisks");
+    pw.println("\tDisk-I/O-Capacity : 0 vdisks");
     pw.println("\tNode-Labels : ");
     pw.close();
     String nodeStatusStr = baos.toString("UTF-8");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
index a28c6ed..6f4cd86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
@@ -80,10 +80,25 @@ public class ResourcePBImpl extends Resource {
   }
 
   @Override
+  public int getVirtualDisks() {
+    ResourceProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.getVirtualDisks());
+  }
+
+  @Override
+  public void setVirtualDisks(int vDisks) {
+    maybeInitBuilder();
+    builder.setVirtualDisks((vDisks));
+  }
+
+  @Override
   public int compareTo(Resource other) {
     int diff = this.getMemory() - other.getMemory();
     if (diff == 0) {
       diff = this.getVirtualCores() - other.getVirtualCores();
+      if (diff == 0) {
+        diff = this.getVirtualDisks() - other.getVirtualDisks();
+      }
     }
     return diff;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 6f5b40e..0a52c46 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
 
+import java.util.Arrays;
+
 /**
  * A {@link ResourceCalculator} which uses the concept of  
  * <em>dominant resource</em> to compare multi-dimensional resources.
@@ -52,62 +54,61 @@ public class DominantResourceCalculator extends ResourceCalculator {
     if (lhs.equals(rhs)) {
       return 0;
     }
-    
-    float l = getResourceAsValue(clusterResource, lhs, true);
-    float r = getResourceAsValue(clusterResource, rhs, true);
-    
-    if (l < r) {
-      return -1;
-    } else if (l > r) {
-      return 1;
-    } else {
-      l = getResourceAsValue(clusterResource, lhs, false);
-      r = getResourceAsValue(clusterResource, rhs, false);
+
+    for (int i = 1; i <= 3; i ++) {
+      float l = getResourceAsValue(clusterResource, lhs, i);
+      float r = getResourceAsValue(clusterResource, rhs, i);
+
       if (l < r) {
         return -1;
       } else if (l > r) {
         return 1;
       }
     }
-    
     return 0;
   }
 
   /**
-   * Use 'dominant' for now since we only have 2 resources - gives us a slight
-   * performance boost.
-   * 
-   * Once we add more resources, we'll need a more complicated (and slightly
-   * less performant algorithm).
+   * Get the resource value according to the rank. Lower the rank, more
+   * dominant the resource. Rank = 1 means the most dominant resource value.
    */
   protected float getResourceAsValue(
-      Resource clusterResource, Resource resource, boolean dominant) {
-    // Just use 'dominant' resource
-    return (dominant) ?
-        Math.max(
-            (float)resource.getMemory() / clusterResource.getMemory(), 
-            (float)resource.getVirtualCores() / clusterResource.getVirtualCores()
-            ) 
-        :
-          Math.min(
-              (float)resource.getMemory() / clusterResource.getMemory(), 
-              (float)resource.getVirtualCores() / clusterResource.getVirtualCores()
-              ); 
+      Resource clusterResource, Resource resource, int rank) {
+    if (rank < 1 || rank > 3) {
+      throw new IllegalArgumentException("The rank " + rank
+          + " is not in range [1,3].");
+    }
+    float[] values = new float[] {
+        (float) resource.getMemory() / clusterResource.getMemory(),
+        (float) resource.getVirtualCores() / clusterResource.getVirtualCores(),
+        (float) resource.getVirtualDisks() / clusterResource.getVirtualDisks()};
+    Arrays.sort(values);
+    return values[values.length - rank];
   }
   
   @Override
   public int computeAvailableContainers(Resource available, Resource required) {
-    return Math.min(
-        available.getMemory() / required.getMemory(), 
-        available.getVirtualCores() / required.getVirtualCores());
+    int min = Integer.MAX_VALUE;
+    if (required.getMemory() != 0) {
+      min = Math.min(min,
+          available.getMemory() / required.getMemory());
+    }
+    if (required.getVirtualCores() != 0) {
+      min = Math.min(min,
+          available.getVirtualCores() / required.getVirtualCores());
+    }
+    if (required.getVirtualDisks() != 0) {
+      min = Math.min(min,
+          available.getVirtualDisks() / required.getVirtualDisks());
+    }
+    return min;
   }
 
   @Override
   public float divide(Resource clusterResource, 
       Resource numerator, Resource denominator) {
-    return 
-        getResourceAsValue(clusterResource, numerator, true) / 
-        getResourceAsValue(clusterResource, denominator, true);
+    return getResourceAsValue(clusterResource, numerator, 1) /
+        getResourceAsValue(clusterResource, denominator, 1);
   }
   
   @Override
@@ -120,17 +121,28 @@ public class DominantResourceCalculator extends ResourceCalculator {
 
   @Override
   public float ratio(Resource a, Resource b) {
-    return Math.max(
-        (float)a.getMemory()/b.getMemory(), 
-        (float)a.getVirtualCores()/b.getVirtualCores()
-        );
+    float max = 0.0f;
+    if (b.getMemory() != 0) {
+      max = Math.max(max,
+          (float) a.getMemory() / b.getMemory());
+    }
+    if (b.getVirtualCores() != 0) {
+      max = Math.max(max,
+          (float) a.getVirtualCores() / b.getVirtualCores());
+    }
+    if (b.getVirtualDisks() != 0) {
+      max = Math.max(max,
+          (float) a.getVirtualDisks() / b.getVirtualDisks());
+      }
+    return max;
   }
 
   @Override
   public Resource divideAndCeil(Resource numerator, int denominator) {
     return Resources.createResource(
         divideAndCeil(numerator.getMemory(), denominator),
-        divideAndCeil(numerator.getVirtualCores(), denominator)
+        divideAndCeil(numerator.getVirtualCores(), denominator),
+        divideAndCeil(numerator.getVirtualDisks(), denominator)
         );
   }
 
@@ -147,15 +159,22 @@ public class DominantResourceCalculator extends ResourceCalculator {
         Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
         stepFactor.getVirtualCores()),
       maximumResource.getVirtualCores());
+    int normalizedVdisks = Math.min(
+      roundUp(
+        Math.max(r.getVirtualDisks(), minimumResource.getVirtualDisks()),
+        stepFactor.getVirtualDisks()),
+      maximumResource.getVirtualDisks());
+
     return Resources.createResource(normalizedMemory,
-      normalizedCores);
+        normalizedCores, normalizedVdisks);
   }
 
   @Override
   public Resource roundUp(Resource r, Resource stepFactor) {
     return Resources.createResource(
         roundUp(r.getMemory(), stepFactor.getMemory()), 
-        roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
+        roundUp(r.getVirtualCores(), stepFactor.getVirtualCores()),
+        roundUp(r.getVirtualDisks(), stepFactor.getVirtualDisks())
         );
   }
 
@@ -163,7 +182,8 @@ public class DominantResourceCalculator extends ResourceCalculator {
   public Resource roundDown(Resource r, Resource stepFactor) {
     return Resources.createResource(
         roundDown(r.getMemory(), stepFactor.getMemory()),
-        roundDown(r.getVirtualCores(), stepFactor.getVirtualCores())
+        roundDown(r.getVirtualCores(), stepFactor.getVirtualCores()),
+        roundDown(r.getVirtualDisks(), stepFactor.getVirtualDisks())
         );
   }
 
@@ -175,7 +195,10 @@ public class DominantResourceCalculator extends ResourceCalculator {
             (int)Math.ceil(r.getMemory() * by), stepFactor.getMemory()),
         roundUp(
             (int)Math.ceil(r.getVirtualCores() * by), 
-            stepFactor.getVirtualCores())
+            stepFactor.getVirtualCores()),
+        roundDown(
+            (int)Math.ceil(r.getVirtualDisks() * by),
+            stepFactor.getVirtualDisks())
         );
   }
 
@@ -190,6 +213,10 @@ public class DominantResourceCalculator extends ResourceCalculator {
         roundDown(
             (int)(r.getVirtualCores() * by), 
             stepFactor.getVirtualCores()
+            ),
+        roundDown(
+            (int)(r.getVirtualDisks() * by),
+            stepFactor.getVirtualDisks()
             )
         );
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index a205bd1..bc6fd0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -51,10 +51,23 @@ public class Resources {
     }
 
     @Override
+    public int getVirtualDisks() {
+      return 0;
+    }
+
+    @Override
+    public void setVirtualDisks(int disks) {
+      throw new RuntimeException("NONE cannot be modified!");
+    }
+
+    @Override
     public int compareTo(Resource o) {
       int diff = 0 - o.getMemory();
       if (diff == 0) {
         diff = 0 - o.getVirtualCores();
+        if (diff == 0) {
+          diff = 0 - o.getVirtualDisks();
+        }
       }
       return diff;
     }
@@ -70,7 +83,7 @@ public class Resources {
 
     @Override
     public void setMemory(int memory) {
-      throw new RuntimeException("NONE cannot be modified!");
+      throw new RuntimeException("UNBOUNDED cannot be modified!");
     }
 
     @Override
@@ -80,7 +93,17 @@ public class Resources {
 
     @Override
     public void setVirtualCores(int cores) {
-      throw new RuntimeException("NONE cannot be modified!");
+      throw new RuntimeException("UNBOUNDED cannot be modified!");
+    }
+
+    @Override
+    public int getVirtualDisks() {
+      return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public void setVirtualDisks(int disks) {
+      throw new RuntimeException("UNBOUNDED cannot be modified!");
     }
 
     @Override
@@ -88,6 +111,9 @@ public class Resources {
       int diff = 0 - o.getMemory();
       if (diff == 0) {
         diff = 0 - o.getVirtualCores();
+        if (diff == 0) {
+          diff = 0 - o.getVirtualDisks();
+        }
       }
       return diff;
     }
@@ -99,9 +125,14 @@ public class Resources {
   }
 
   public static Resource createResource(int memory, int cores) {
+    return createResource(memory, cores, (memory > 0) ? 1 : 0);
+  }
+
+  public static Resource createResource(int memory, int cores, int vdisks) {
     Resource resource = Records.newRecord(Resource.class);
     resource.setMemory(memory);
     resource.setVirtualCores(cores);
+    resource.setVirtualDisks(vdisks);
     return resource;
   }
 
@@ -114,12 +145,14 @@ public class Resources {
   }
 
   public static Resource clone(Resource res) {
-    return createResource(res.getMemory(), res.getVirtualCores());
+    return createResource(
+        res.getMemory(), res.getVirtualCores(), res.getVirtualDisks());
   }
 
   public static Resource addTo(Resource lhs, Resource rhs) {
     lhs.setMemory(lhs.getMemory() + rhs.getMemory());
     lhs.setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores());
+    lhs.setVirtualDisks(lhs.getVirtualDisks() + rhs.getVirtualDisks());
     return lhs;
   }
 
@@ -130,6 +163,7 @@ public class Resources {
   public static Resource subtractFrom(Resource lhs, Resource rhs) {
     lhs.setMemory(lhs.getMemory() - rhs.getMemory());
     lhs.setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores());
+    lhs.setVirtualDisks(lhs.getVirtualDisks() - rhs.getVirtualDisks());
     return lhs;
   }
 
@@ -144,6 +178,7 @@ public class Resources {
   public static Resource multiplyTo(Resource lhs, double by) {
     lhs.setMemory((int)(lhs.getMemory() * by));
     lhs.setVirtualCores((int)(lhs.getVirtualCores() * by));
+    lhs.setVirtualDisks((int)(lhs.getVirtualDisks() * by));
     return lhs;
   }
 
@@ -165,6 +200,7 @@ public class Resources {
     Resource out = clone(lhs);
     out.setMemory((int)(lhs.getMemory() * by));
     out.setVirtualCores((int)(lhs.getVirtualCores() * by));
+    out.setVirtualDisks((int)(lhs.getVirtualDisks() * by));
     return out;
   }
   
@@ -253,11 +289,13 @@ public class Resources {
   
   public static boolean fitsIn(Resource smaller, Resource bigger) {
     return smaller.getMemory() <= bigger.getMemory() &&
-        smaller.getVirtualCores() <= bigger.getVirtualCores();
+        smaller.getVirtualCores() <= bigger.getVirtualCores() &&
+        smaller.getVirtualDisks() <= bigger.getVirtualDisks();
   }
   
   public static Resource componentwiseMin(Resource lhs, Resource rhs) {
     return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
-        Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
+        Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()),
+        Math.min(lhs.getVirtualDisks(), rhs.getVirtualDisks()));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index 20983b6..542951d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -133,6 +133,7 @@ public class TestYarnServerApiClasses {
     Resource resource = recordFactory.newRecordInstance(Resource.class);
     resource.setMemory(10000);
     resource.setVirtualCores(2);
+    resource.setVirtualDisks(3);
     original.setResource(resource);
     RegisterNodeManagerRequestPBImpl copy = new RegisterNodeManagerRequestPBImpl(
         original.getProto());
@@ -141,7 +142,7 @@ public class TestYarnServerApiClasses {
     assertEquals(9090, copy.getNodeId().getPort());
     assertEquals(10000, copy.getResource().getMemory());
     assertEquals(2, copy.getResource().getVirtualCores());
-
+    assertEquals(3, copy.getResource().getVirtualDisks());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index f561dbb..4b2e6c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -142,8 +142,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     int virtualCores =
         conf.getInt(
             YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
+    int virtualDisks =
+        conf.getInt(
+            YarnConfiguration.NM_DISK_VDISKS, YarnConfiguration.DEFAULT_NM_DISK_VDISKS);
 
-    this.totalResource = Resource.newInstance(memoryMb, virtualCores);
+    this.totalResource = Resource.newInstance(memoryMb, virtualCores, virtualDisks);
     metrics.addResource(totalResource);
     this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
     this.tokenRemovalDelayMs =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 4beb895..dc505b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -99,6 +99,7 @@ public class ResourceTrackerService extends AbstractService implements
   
   private int minAllocMb;
   private int minAllocVcores;
+  private int minAllocVdisks;
 
   static {
     resync.setNodeAction(NodeAction.RESYNC);
@@ -144,6 +145,9 @@ public class ResourceTrackerService extends AbstractService implements
     minAllocVcores = conf.getInt(
     	YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
     	YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+    minAllocVdisks = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS);
 
     minimumNodeManagerVersion = conf.get(
         YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
@@ -285,7 +289,8 @@ public class ResourceTrackerService extends AbstractService implements
 
     // Check if this node has minimum allocations
     if (capability.getMemory() < minAllocMb
-        || capability.getVirtualCores() < minAllocVcores) {
+        || capability.getVirtualCores() < minAllocVcores
+        || capability.getVirtualDisks() < minAllocVdisks) {
       String message =
           "NodeManager from  " + host
               + " doesn't satisfy minimum allocations, Sending SHUTDOWN"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
index 9dd245b..a6ebcf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
@@ -24,5 +24,5 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 @Private
 @Evolving
 public enum ResourceType {
-  MEMORY, CPU
+  MEMORY, CPU, DISKIO
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
index 230f9a9..3ce7c0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
@@ -29,8 +29,13 @@ public class ResourceWeights {
   private float[] weights = new float[ResourceType.values().length];
 
   public ResourceWeights(float memoryWeight, float cpuWeight) {
+    this(memoryWeight, cpuWeight, 0);
+  }
+
+  public ResourceWeights(float memoryWeight, float cpuWeight, float vdiskWeight) {
     weights[ResourceType.MEMORY.ordinal()] = memoryWeight;
     weights[ResourceType.CPU.ordinal()] = cpuWeight;
+    weights[ResourceType.DISKIO.ordinal()] = vdiskWeight;
   }
 
   public ResourceWeights(float weight) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 5d00009..de8382a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -190,7 +190,7 @@ public class SchedulerUtils {
 
   /**
    * Utility method to validate a resource request, by insuring that the
-   * requested memory/vcore is non-negative and not greater than max
+   * requested memory/vcore/vdisks is non-negative and not greater than max
    * 
    * @throws <code>InvalidResourceRequestException</code> when there is invalid
    *         request
@@ -216,7 +216,17 @@ public class SchedulerUtils {
           + resReq.getCapability().getVirtualCores()
           + ", maxVirtualCores=" + maximumResource.getVirtualCores());
     }
-    
+    if (resReq.getCapability().getVirtualDisks() < 0 ||
+        resReq.getCapability().getVirtualDisks() >
+        maximumResource.getVirtualDisks()) {
+      throw new InvalidResourceRequestException("Invalid resource request"
+          + ", requested virtual disks < 0"
+          + ", or requested virtual disks > max configured"
+          + ", requestedVirtualDisks="
+          + resReq.getCapability().getVirtualDisks()
+          + ", maxVirtualDisks=" + maximumResource.getVirtualDisks());
+    }
+
     // Get queue from scheduler
     QueueInfo queueInfo = null;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
index 3bea985..c981e1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
@@ -257,6 +257,8 @@ public class ComputeFairShares {
       return resource.getMemory();
     case CPU:
       return resource.getVirtualCores();
+    case DISKIO:
+      return resource.getVirtualDisks();
     default:
       throw new IllegalArgumentException("Invalid resource");
     }
@@ -270,6 +272,9 @@ public class ComputeFairShares {
     case CPU:
       resource.setVirtualCores(val);
       break;
+    case DISKIO:
+      resource.setVirtualDisks(val);
+      break;
     default:
       throw new IllegalArgumentException("Invalid resource");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index 3f6cbd1..30826b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -96,10 +96,15 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
     int queueAvailableCPU =
         Math.max(queueFairShare.getVirtualCores() - queueUsage
             .getVirtualCores(), 0);
+    int queueAvailableVDisks =
+        Math.max(queueFairShare.getVirtualDisks() - queueUsage
+            .getVirtualDisks(), 0);
     Resource headroom = Resources.createResource(
         Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
         Math.min(clusterAvailable.getVirtualCores(),
-            queueAvailableCPU));
+            queueAvailableCPU),
+        Math.min(clusterAvailable.getVirtualDisks(),
+            queueAvailableVDisks));
     return headroom;
   }
 
@@ -174,14 +179,37 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
           (pool.getMemory() * weights.getWeight(MEMORY)));
       shares.setWeight(CPU, (float)resource.getVirtualCores() /
           (pool.getVirtualCores() * weights.getWeight(CPU)));
+      shares.setWeight(DISKIO, (float)resource.getVirtualDisks() /
+          (pool.getVirtualDisks() * weights.getWeight(DISKIO)));
       // sort order vector by resource share
       if (resourceOrder != null) {
         if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) {
-          resourceOrder[0] = MEMORY;
-          resourceOrder[1] = CPU;
+          if (shares.getWeight(MEMORY) > shares.getWeight(DISKIO)) {
+            resourceOrder[0] = MEMORY;
+            if (shares.getWeight(CPU) > shares.getWeight(DISKIO)) {
+              resourceOrder[1] = CPU;
+              resourceOrder[2] = DISKIO;
+            }
+          } else {
+            resourceOrder[0] = DISKIO;
+            resourceOrder[1] = MEMORY;
+            resourceOrder[2] = CPU;
+          }
         } else  {
-          resourceOrder[0] = CPU;
-          resourceOrder[1] = MEMORY;
+          if (shares.getWeight(CPU) > shares.getWeight(DISKIO)) {
+            resourceOrder[0] = CPU;
+            if (shares.getWeight(MEMORY) > shares.getWeight(DISKIO)) {
+              resourceOrder[1] = MEMORY;
+              resourceOrder[2] = DISKIO;
+            } else {
+              resourceOrder[1] = DISKIO;
+              resourceOrder[2] = MEMORY;
+            }
+          } else {
+            resourceOrder[0] = DISKIO;
+            resourceOrder[1] = CPU;
+            resourceOrder[2] = MEMORY;
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 97669cb..50bcdb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -121,7 +121,7 @@ public class FairSharePolicy extends SchedulingPolicy {
         queueFairShare.getMemory() - queueUsage.getMemory(), 0);
     Resource headroom = Resources.createResource(
         Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
-        clusterAvailable.getVirtualCores());
+        clusterAvailable.getVirtualCores(), clusterAvailable.getVirtualDisks());
     return headroom;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 5f53805..6bfc0d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -50,6 +50,7 @@ public class MockNM {
   private NodeId nodeId;
   private final int memory;
   private final int vCores;
+  private final int vDisks;
   private ResourceTrackerService resourceTracker;
   private final int httpPort = 2;
   private MasterKey currentContainerTokenMasterKey;
@@ -65,14 +66,33 @@ public class MockNM {
   }
 
   public MockNM(String nodeIdStr, int memory, int vcores,
+                ResourceTrackerService resourceTracker) {
+    // scale vdisks based on the requested memory
+    this(nodeIdStr, memory, vcores,
+        Math.max(1, (memory * YarnConfiguration.DEFAULT_NM_DISK_VDISKS) /
+            YarnConfiguration.DEFAULT_NM_PMEM_MB),
+        resourceTracker);
+  }
+
+  public MockNM(String nodeIdStr, int memory, int vcores, int vdisks,
       ResourceTrackerService resourceTracker) {
-    this(nodeIdStr, memory, vcores, resourceTracker, YarnVersionInfo.getVersion());
+    this(nodeIdStr, memory, vcores, vdisks, resourceTracker,
+        YarnVersionInfo.getVersion());
   }
 
   public MockNM(String nodeIdStr, int memory, int vcores,
       ResourceTrackerService resourceTracker, String version) {
+    this(nodeIdStr, memory, vcores,
+        Math.max(1, (memory * YarnConfiguration.DEFAULT_NM_DISK_VDISKS) /
+            YarnConfiguration.DEFAULT_NM_PMEM_MB),
+        resourceTracker, version);
+  }
+
+  public MockNM(String nodeIdStr, int memory, int vcores, int vDisks,
+      ResourceTrackerService resourceTracker, String version) {
     this.memory = memory;
     this.vCores = vcores;
+    this.vDisks = vDisks;
     this.resourceTracker = resourceTracker;
     this.version = version;
     String[] splits = nodeIdStr.split(":");
@@ -115,7 +135,7 @@ public class MockNM {
         RegisterNodeManagerRequest.class);
     req.setNodeId(nodeId);
     req.setHttpPort(httpPort);
-    Resource resource = BuilderUtils.newResource(memory, vCores);
+    Resource resource = Resource.newInstance(memory, vCores, vDisks);
     req.setResource(resource);
     req.setContainerStatuses(containerReports);
     req.setNMVersion(version);
@@ -198,4 +218,8 @@ public class MockNM {
   public int getvCores() {
     return vCores;
   }
+
+  public int getVDisks() {
+    return vDisks;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 6b3eea2..da7494c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -574,7 +574,7 @@ public class TestAppManager{
     when(app.getState()).thenReturn(RMAppState.RUNNING);
 
     RMAppMetrics metrics =
-        new RMAppMetrics(Resource.newInstance(1234, 56), 10, 1, 16384, 64);
+        new RMAppMetrics(Resource.newInstance(1234, 56, 1), 10, 1, 16384, 64);
     when(app.getRMAppMetrics()).thenReturn(metrics);
 
     RMAppManager.ApplicationSummary.SummaryBuilder summary =
@@ -592,7 +592,8 @@ public class TestAppManager{
     Assert.assertTrue(msg.contains("vcoreSeconds=64"));
     Assert.assertTrue(msg.contains("preemptedAMContainers=1"));
     Assert.assertTrue(msg.contains("preemptedNonAMContainers=10"));
-    Assert.assertTrue(msg.contains("preemptedResources=<memory:1234\\, vCores:56>"));
+    Assert.assertTrue(
+        msg.contains("preemptedResources=<memory:1234\\, vCores:56\\, vDisks:1>"));
  }
 
   private static ResourceScheduler mockResourceScheduler() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 7c12848..6d45742 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -298,7 +298,7 @@ public class TestResourceTrackerService {
     RegisterNodeManagerRequest req = Records.newRecord(
         RegisterNodeManagerRequest.class);
     NodeId nodeId = NodeId.newInstance("host2", 1234);
-    Resource capability = BuilderUtils.newResource(1024, 1);
+    Resource capability = Resource.newInstance(1024, 1, 1);
     req.setResource(capability);
     req.setNodeId(nodeId);
     req.setHttpPort(1234);
@@ -382,6 +382,7 @@ public class TestResourceTrackerService {
     Configuration conf = new Configuration();
     conf.set(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "2048");
     conf.set(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "4");
+    conf.set(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS, "1");
     rm = new MockRM(conf);
     rm.start();
 
@@ -392,7 +393,7 @@ public class TestResourceTrackerService {
     NodeId nodeId = BuilderUtils.newNodeId("host", 1234);
     req.setNodeId(nodeId);
 
-    Resource capability = BuilderUtils.newResource(1024, 1);
+    Resource capability = Resource.newInstance(1024, 1, 1);
     req.setResource(capability);
     RegisterNodeManagerResponse response1 =
         resourceTrackerService.registerNodeManager(req);
@@ -400,6 +401,7 @@ public class TestResourceTrackerService {
     
     capability.setMemory(2048);
     capability.setVirtualCores(1);
+    capability.setVirtualDisks(1);
     req.setResource(capability);
     RegisterNodeManagerResponse response2 =
         resourceTrackerService.registerNodeManager(req);
@@ -407,6 +409,7 @@ public class TestResourceTrackerService {
     
     capability.setMemory(1024);
     capability.setVirtualCores(4);
+    capability.setVirtualDisks(1);
     req.setResource(capability);
     RegisterNodeManagerResponse response3 =
         resourceTrackerService.registerNodeManager(req);
@@ -414,10 +417,19 @@ public class TestResourceTrackerService {
     
     capability.setMemory(2048);
     capability.setVirtualCores(4);
+    capability.setVirtualDisks(0);
     req.setResource(capability);
     RegisterNodeManagerResponse response4 =
         resourceTrackerService.registerNodeManager(req);
-    Assert.assertEquals(NodeAction.NORMAL,response4.getNodeAction());
+    Assert.assertEquals(NodeAction.SHUTDOWN,response4.getNodeAction());
+
+    capability.setMemory(2048);
+    capability.setVirtualCores(4);
+    capability.setVirtualDisks(1);
+    req.setResource(capability);
+    RegisterNodeManagerResponse response5 =
+        resourceTrackerService.registerNodeManager(req);
+    Assert.assertEquals(NodeAction.NORMAL,response5.getNodeAction());
   }
 
   @Test
@@ -512,7 +524,7 @@ public class TestResourceTrackerService {
         NMContainerStatus.newInstance(
           ContainerId.newContainerId(
             ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
-          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1, 1),
           "Dummy Completed", 0, Priority.newInstance(10), 1234);
     rm.getResourceTrackerService().handleNMContainerStatus(report, null);
     verify(handler, never()).handle((Event) any());
@@ -523,7 +535,7 @@ public class TestResourceTrackerService {
     currentAttempt.setMasterContainer(null);
     report = NMContainerStatus.newInstance(
           ContainerId.newContainerId(currentAttempt.getAppAttemptId(), 0),
-          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1, 1),
           "Dummy Completed", 0, Priority.newInstance(10), 1234);
     rm.getResourceTrackerService().handleNMContainerStatus(report, null);
     verify(handler, never()).handle((Event)any());
@@ -535,7 +547,7 @@ public class TestResourceTrackerService {
     report = NMContainerStatus.newInstance(
           ContainerId.newContainerId(
             ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
-          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1, 1),
           "Dummy Completed", 0, Priority.newInstance(10), 1234);
     try {
       rm.getResourceTrackerService().handleNMContainerStatus(report, null);
@@ -550,7 +562,7 @@ public class TestResourceTrackerService {
     currentAttempt.setMasterContainer(null);
     report = NMContainerStatus.newInstance(
       ContainerId.newContainerId(currentAttempt.getAppAttemptId(), 0),
-      ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+      ContainerState.COMPLETE, Resource.newInstance(1024, 1, 1),
       "Dummy Completed", 0, Priority.newInstance(10), 1234);
     try {
       rm.getResourceTrackerService().handleNMContainerStatus(report, null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 853e0a5..0746ef8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -227,7 +227,7 @@ public class TestWorkPreservingRMRestart {
     // 2 running containers.
     Resource usedResources = Resources.multiply(containerResource, 2);
     Resource nmResource =
-        Resource.newInstance(nm1.getMemory(), nm1.getvCores());
+        Resource.newInstance(nm1.getMemory(), nm1.getvCores(), nm1.getVDisks());
 
     assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
     assertTrue(schedulerNode1.isValidContainer(runningContainer
@@ -348,7 +348,7 @@ public class TestWorkPreservingRMRestart {
       Resource availableResources) throws Exception {
     // waiting for RM's scheduling apps
     int retry = 0;
-    Resource assumedFairShare = Resource.newInstance(8192, 8);
+    Resource assumedFairShare = Resource.newInstance(8192, 8, 20);
     while (true) {
       Thread.sleep(100);
       if (assumedFairShare.equals(((FairScheduler)rm.getResourceScheduler())
@@ -525,7 +525,7 @@ public class TestWorkPreservingRMRestart {
     // Calculate each queue's resource usage.
     Resource containerResource = Resource.newInstance(1024, 1);
     Resource nmResource =
-        Resource.newInstance(nm1.getMemory(), nm1.getvCores());
+        Resource.newInstance(nm1.getMemory(), nm1.getvCores(), nm1.getVDisks());
     Resource clusterResource = Resources.multiply(nmResource, 2);
     Resource q1Resource = Resources.multiply(clusterResource, 0.5);
     Resource q2Resource = Resources.multiply(clusterResource, 0.5);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java
index f420b9e..f14372c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceWeights.java
@@ -30,19 +30,25 @@ public class TestResourceWeights {
         rw1.getWeight(ResourceType.CPU), 0.00001f);
     Assert.assertEquals("Default memory weight should be 0.0f", 0.0f, 
         rw1.getWeight(ResourceType.MEMORY), 0.00001f);
+    Assert.assertEquals("Default Disk I/O weight should be 0.0f", 0.0f,
+        rw1.getWeight(ResourceType.DISKIO), 0.00001f);
 
     ResourceWeights rw2 = new ResourceWeights(2.0f);
     Assert.assertEquals("The CPU weight should be 2.0f.", 2.0f, 
         rw2.getWeight(ResourceType.CPU), 0.00001f);
     Assert.assertEquals("The memory weight should be 2.0f", 2.0f, 
         rw2.getWeight(ResourceType.MEMORY), 0.00001f);
+    Assert.assertEquals("Default Disk I/O weight should be 2.0f", 2.0f,
+        rw2.getWeight(ResourceType.DISKIO), 0.00001f);
 
     // set each individually
-    ResourceWeights rw3 = new ResourceWeights(1.5f, 2.0f);
+    ResourceWeights rw3 = new ResourceWeights(1.5f, 2.0f, 2.5f);
     Assert.assertEquals("The CPU weight should be 2.0f", 2.0f, 
         rw3.getWeight(ResourceType.CPU), 0.00001f);
     Assert.assertEquals("The memory weight should be 1.5f", 1.5f, 
         rw3.getWeight(ResourceType.MEMORY), 0.00001f);
+    Assert.assertEquals("Default Disk I/O weight should be 2.5f", 2.5f,
+        rw3.getWeight(ResourceType.DISKIO), 0.00001f);
 
     // reset weights
     rw3.setWeight(ResourceType.CPU, 2.5f);
@@ -51,5 +57,8 @@ public class TestResourceWeights {
     rw3.setWeight(ResourceType.MEMORY, 4.0f);
     Assert.assertEquals("The memory weight should be set to 4.0f.", 4.0f, 
         rw3.getWeight(ResourceType.MEMORY), 0.00001f);
+    rw3.setWeight(ResourceType.DISKIO, 5.0f);
+    Assert.assertEquals("Default Disk I/O weight should be 5.0f", 5.0f,
+        rw3.getWeight(ResourceType.DISKIO), 0.00001f);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResources.java
index ae98660..9fc9ccc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResources.java
@@ -24,20 +24,20 @@ import org.junit.Test;
 public class TestResources {
   @Test(timeout=1000)
   public void testFitsIn() {
-    assertTrue(fitsIn(createResource(1, 1), createResource(2, 2)));
-    assertTrue(fitsIn(createResource(2, 2), createResource(2, 2)));
-    assertFalse(fitsIn(createResource(2, 2), createResource(1, 1)));
-    assertFalse(fitsIn(createResource(1, 2), createResource(2, 1)));
-    assertFalse(fitsIn(createResource(2, 1), createResource(1, 2)));
+    assertTrue(fitsIn(createResource(1, 1, 1), createResource(2, 2, 2)));
+    assertTrue(fitsIn(createResource(2, 2, 2), createResource(2, 2, 2)));
+    assertFalse(fitsIn(createResource(2, 2, 2), createResource(1, 1, 1)));
+    assertFalse(fitsIn(createResource(1, 2, 3), createResource(3, 2, 1)));
+    assertFalse(fitsIn(createResource(2, 1, 3), createResource(1, 2, 3)));
   }
   
   @Test(timeout=1000)
   public void testComponentwiseMin() {
-    assertEquals(createResource(1, 1),
-        componentwiseMin(createResource(1, 1), createResource(2, 2)));
-    assertEquals(createResource(1, 1),
-        componentwiseMin(createResource(2, 2), createResource(1, 1)));
-    assertEquals(createResource(1, 1),
-        componentwiseMin(createResource(1, 2), createResource(2, 1)));
+    assertEquals(createResource(1, 1, 1),
+        componentwiseMin(createResource(1, 1, 1), createResource(2, 2, 2)));
+    assertEquals(createResource(1, 1, 1),
+        componentwiseMin(createResource(2, 2, 2), createResource(1, 1, 1)));
+    assertEquals(createResource(1, 1, 1),
+        componentwiseMin(createResource(1, 2, 1), createResource(2, 1, 2)));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
index c837450..822dd14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
@@ -132,7 +132,7 @@ public class TestNMExpiry {
     String hostname1 = "localhost1";
     String hostname2 = "localhost2";
     String hostname3 = "localhost3";
-    Resource capability = BuilderUtils.newResource(1024, 1);
+    Resource capability = Resource.newInstance(1024, 1, 1);
 
     RegisterNodeManagerRequest request1 = recordFactory
         .newRecordInstance(RegisterNodeManagerRequest.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
index d16d551..dd9d99a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
@@ -102,7 +102,7 @@ public class TestNMReconnect {
   @Test
   public void testReconnect() throws Exception {
     String hostname1 = "localhost1";
-    Resource capability = BuilderUtils.newResource(1024, 1);
+    Resource capability = Resource.newInstance(1024, 1, 1);
 
     RegisterNodeManagerRequest request1 = recordFactory
         .newRecordInstance(RegisterNodeManagerRequest.class);
@@ -121,7 +121,7 @@ public class TestNMReconnect {
 
     rmNodeEvents.clear();
     resourceTrackerService.registerNodeManager(request1);
-    capability = BuilderUtils.newResource(1024, 2);
+    capability = Resource.newInstance(1024, 2, 2);
     request1.setResource(capability);
     Assert.assertEquals(RMNodeEventType.RECONNECTED,
         rmNodeEvents.get(0).getType());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
index 4f94695..5e1ec78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
@@ -94,7 +94,7 @@ public class TestRMNMRPCResponseId {
   @Test
   public void testRPCResponseId() throws IOException, YarnException {
     String node = "localhost";
-    Resource capability = BuilderUtils.newResource(1024, 1);
+    Resource capability = Resource.newInstance(1024, 1, 1);
     RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
     nodeId = NodeId.newInstance(node, 1234);
     request.setNodeId(nodeId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
index c9e81ee..08322f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
@@ -94,9 +94,9 @@ public class TestSchedulerUtils {
     
     final int minMemory = 1024;
     final int maxMemory = 8192;
-    Resource minResource = Resources.createResource(minMemory, 0);
-    Resource maxResource = Resources.createResource(maxMemory, 0);
-    
+    Resource minResource = Resources.createResource(minMemory, 0, 0);
+    Resource maxResource = Resources.createResource(maxMemory, 0, 0);
+
     ResourceRequest ask = new ResourceRequestPBImpl();
 
     // case negative memory
@@ -155,33 +155,35 @@ public class TestSchedulerUtils {
   public void testNormalizeRequestWithDominantResourceCalculator() {
     ResourceCalculator resourceCalculator = new DominantResourceCalculator();
     
-    Resource minResource = Resources.createResource(1024, 1);
-    Resource maxResource = Resources.createResource(10240, 10);
-    Resource clusterResource = Resources.createResource(10 * 1024, 10);
+    Resource minResource = Resources.createResource(1024, 1, 1);
+    Resource maxResource = Resources.createResource(10240, 10, 8);
+    Resource clusterResource = Resources.createResource(10 * 1024, 10, 8);
     
     ResourceRequest ask = new ResourceRequestPBImpl();
 
-    // case negative memory/vcores
-    ask.setCapability(Resources.createResource(-1024, -1));
+    // case negative memory/vcores/vdisks
+    ask.setCapability(Resources.createResource(-1024, -1, -1));
     SchedulerUtils.normalizeRequest(
         ask, resourceCalculator, clusterResource, minResource, maxResource);
     assertEquals(minResource, ask.getCapability());
 
-    // case zero memory/vcores
-    ask.setCapability(Resources.createResource(0, 0));
+    // case zero memory/vcores/vdisks
+    ask.setCapability(Resources.createResource(0, 0, 0));
     SchedulerUtils.normalizeRequest(
         ask, resourceCalculator, clusterResource, minResource, maxResource);
     assertEquals(minResource, ask.getCapability());
     assertEquals(1, ask.getCapability().getVirtualCores());
     assertEquals(1024, ask.getCapability().getMemory());
+    assertEquals(1, ask.getCapability().getVirtualDisks());
 
-    // case non-zero memory & zero cores
-    ask.setCapability(Resources.createResource(1536, 0));
+    // case non-zero memory & zero cores & zero vdisks
+    ask.setCapability(Resources.createResource(1536, 0, 0));
     SchedulerUtils.normalizeRequest(
         ask, resourceCalculator, clusterResource, minResource, maxResource);
-    assertEquals(Resources.createResource(2048, 1), ask.getCapability());
+    assertEquals(Resources.createResource(2048, 1, 1), ask.getCapability());
     assertEquals(1, ask.getCapability().getVirtualCores());
     assertEquals(2048, ask.getCapability().getMemory());
+    assertEquals(1, ask.getCapability().getVirtualDisks());
   }
   
   @Test (timeout = 30000)
@@ -347,13 +349,15 @@ public class TestSchedulerUtils {
     Resource maxResource =
         Resources.createResource(
             YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS);
 
     // zero memory
     try {
       Resource resource =
           Resources.createResource(0,
-              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS);
       ResourceRequest resReq =
           BuilderUtils.newResourceRequest(mock(Priority.class),
               ResourceRequest.ANY, resource, 1);
@@ -367,7 +371,8 @@ public class TestSchedulerUtils {
     try {
       Resource resource =
           Resources.createResource(
-              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0,
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS);
       ResourceRequest resReq =
           BuilderUtils.newResourceRequest(mock(Priority.class),
               ResourceRequest.ANY, resource, 1);
@@ -377,12 +382,27 @@ public class TestSchedulerUtils {
       fail("Zero vcores should be accepted");
     }
 
+    // zero vdisks
+    try {
+      Resource resource = Resources.createResource(
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+          0);
+      ResourceRequest resReq = BuilderUtils.newResourceRequest(
+          mock(Priority.class), ResourceRequest.ANY, resource, 1);
+      SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
+          mockScheduler);
+    } catch (InvalidResourceRequestException e) {
+      fail("Zero vdisks should be accepted");
+    }
+
     // max memory
     try {
       Resource resource =
           Resources.createResource(
               YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS);
       ResourceRequest resReq =
           BuilderUtils.newResourceRequest(mock(Priority.class),
               ResourceRequest.ANY, resource, 1);
@@ -397,7 +417,8 @@ public class TestSchedulerUtils {
       Resource resource =
           Resources.createResource(
               YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-              YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS);
       ResourceRequest resReq =
           BuilderUtils.newResourceRequest(mock(Priority.class),
               ResourceRequest.ANY, resource, 1);
@@ -407,11 +428,26 @@ public class TestSchedulerUtils {
       fail("Max vcores should not be accepted");
     }
 
+    // max vdisks
+    try {
+      Resource resource = Resources.createResource(
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS);
+      ResourceRequest resReq = BuilderUtils.newResourceRequest(
+          mock(Priority.class), ResourceRequest.ANY, resource, 1);
+      SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
+          mockScheduler);
+    } catch (InvalidResourceRequestException e) {
+      fail("Max vdisks should not be accepted");
+    }
+
     // negative memory
     try {
       Resource resource =
           Resources.createResource(-1,
-              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS);
       ResourceRequest resReq =
           BuilderUtils.newResourceRequest(mock(Priority.class),
               ResourceRequest.ANY, resource, 1);
@@ -426,7 +462,8 @@ public class TestSchedulerUtils {
     try {
       Resource resource =
           Resources.createResource(
-              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1,
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS);
       ResourceRequest resReq =
           BuilderUtils.newResourceRequest(mock(Priority.class),
               ResourceRequest.ANY, resource, 1);
@@ -437,12 +474,28 @@ public class TestSchedulerUtils {
       // expected
     }
 
+    // negative vdisks
+    try {
+      Resource resource = Resources.createResource(
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+          -1);
+      ResourceRequest resReq = BuilderUtils.newResourceRequest(
+          mock(Priority.class), ResourceRequest.ANY, resource, 1);
+      SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
+          mockScheduler);
+      fail("Negative vdisks should not be accepted");
+    } catch (InvalidResourceRequestException e) {
+      // expected
+    }
+
     // more than max memory
     try {
       Resource resource =
           Resources.createResource(
               YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1,
-              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS);
       ResourceRequest resReq =
           BuilderUtils.newResourceRequest(mock(Priority.class),
               ResourceRequest.ANY, resource, 1);
@@ -459,7 +512,8 @@ public class TestSchedulerUtils {
           Resources
               .createResource(
                   YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-                  YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
+                  YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1,
+                  YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS);
       ResourceRequest resReq =
           BuilderUtils.newResourceRequest(mock(Priority.class),
               ResourceRequest.ANY, resource, 1);
@@ -469,6 +523,21 @@ public class TestSchedulerUtils {
     } catch (InvalidResourceRequestException e) {
       // expected
     }
+
+    // more than max vcores
+    try {
+      Resource resource = Resources.createResource(
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS + 1);
+      ResourceRequest resReq = BuilderUtils.newResourceRequest(
+          mock(Priority.class), ResourceRequest.ANY, resource, 1);
+      SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
+          mockScheduler);
+      fail("More than max vdisks should not be accepted");
+    } catch (InvalidResourceRequestException e) {
+      // expected
+    }
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bad01fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 2aa57a0..81d4025 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -394,6 +394,64 @@ public class TestCapacityScheduler {
     conf.setMaximumCapacity(A, -1);
     assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getMaximumCapacity(A),delta);
   }
+
+  @Test
+  public void testNoMoreVDisksOnNode() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+        DominantResourceCalculator.class.getName());
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Create two nodes, one with 2 vDisks, and one with 1 vDisks
+    RMNode node1 =
+        MockNodes.newNodeInfo(0, Resource.newInstance(4 * GB, 2, 2), 1, "host1");
+    RMNode node2 =
+        MockNodes.newNodeInfo(0, Resource.newInstance(4 * GB, 2, 1), 1, "host2");
+    cs.handle(new NodeAddedSchedulerEvent(node1));
+    cs.handle(new NodeAddedSchedulerEvent(node2));
+
+    // Create one application
+    ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
+    ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
+        appId1, 1);
+    SchedulerEvent addAppEvent1 =
+        new AppAddedSchedulerEvent(appId1, "default", "user");
+    cs.handle(addAppEvent1);
+    SchedulerEvent addAttemptEvent1 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
+    cs.handle(addAttemptEvent1);
+
+    // Request four containers
+    List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
+    ResourceRequest request1 = ResourceRequest.newInstance(
+        Priority.newInstance(1), ResourceRequest.ANY,
+        Resource.newInstance(1024, 1, 1), 4, true);
+    ask1.add(request1);
+    cs.allocate(appAttemptId1, ask1, Collections.<ContainerId>emptyList(),
+        null, null);
+
+    // For node1, it has 2 vdisks, thus it can assign two containers
+    FiCaSchedulerApp app1 = cs.getApplicationAttempt(appAttemptId1);
+    Assert.assertEquals(0, app1.getLiveContainers().size());
+    NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
+    cs.handle(nodeUpdateEvent);
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    cs.handle(nodeUpdateEvent);
+    Assert.assertEquals(2, app1.getLiveContainers().size());
+
+    // For node2, it only has 1 vdisks, thus it can only assign one container
+    NodeUpdateSchedulerEvent nodeUpdateEvent2 = new NodeUpdateSchedulerEvent(node2);
+    cs.handle(nodeUpdateEvent2);
+    Assert.assertEquals(3, app1.getLiveContainers().size());
+    cs.handle(nodeUpdateEvent2);
+    Assert.assertEquals(3, app1.getLiveContainers().size());
+
+    rm.stop();
+  }
   
   
   @Test
@@ -1035,8 +1093,8 @@ public class TestCapacityScheduler {
 
     // check values
     waitForAppPreemptionInfo(app0,
-        Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3,
-        Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
+        Resource.newInstance(CONTAINER_MEMORY * 3, 3, 3), 0, 3,
+        Resource.newInstance(CONTAINER_MEMORY * 3, 3, 3), false, 3);
 
     // kill app0-attempt0 AM container
     cs.killContainer(schedulerAppAttempt.getRMContainer(app0
@@ -1047,7 +1105,7 @@ public class TestCapacityScheduler {
 
     // check values
     waitForAppPreemptionInfo(app0,
-        Resource.newInstance(CONTAINER_MEMORY * 4, 4), 1, 3,
+        Resource.newInstance(CONTAINER_MEMORY * 4, 4, 4), 1, 3,
         Resource.newInstance(0, 0), false, 0);
 
     // launch app0-attempt1
@@ -1065,8 +1123,8 @@ public class TestCapacityScheduler {
 
     // check values
     waitForAppPreemptionInfo(app0,
-        Resource.newInstance(CONTAINER_MEMORY * 7, 7), 1, 6,
-        Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
+        Resource.newInstance(CONTAINER_MEMORY * 7, 7, 7), 1, 6,
+        Resource.newInstance(CONTAINER_MEMORY * 3, 3, 3), false, 3);
 
     rm1.stop();
   }