You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/07/13 20:59:54 UTC

[42/48] hadoop git commit: YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda)

YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a7e14587
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a7e14587
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a7e14587

Branch: refs/heads/YARN-2928
Commit: a7e145873b0b9f80cd23b6d1cf0173173ed4c4d3
Parents: 18e891b
Author: Wangda Tan <wa...@apache.org>
Authored: Sat Jul 11 10:26:46 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Jul 13 11:51:16 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../ProportionalCapacityPreemptionPolicy.java   |   4 +-
 ...estProportionalCapacityPreemptionPolicy.java | 253 ++++++++++++++-----
 ...pacityPreemptionPolicyForNodePartitions.java | 114 +++++++--
 4 files changed, 289 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7e14587/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ae5e9d6..db24ad2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -725,6 +725,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3888. ApplicationMaster link is broken in RM WebUI when appstate is NEW.
     (Bibin A Chundatt via xgong)
 
+    YARN-3849. Too much of preemption activity causing continuos killing of
+    containers across queues. (Sunil G via wangda)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7e14587/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 5a20a6f..6e661d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -840,12 +840,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     synchronized (curQueue) {
       String queueName = curQueue.getQueueName();
       QueueCapacities qc = curQueue.getQueueCapacities();
-      float absUsed = qc.getAbsoluteUsedCapacity(partitionToLookAt);
       float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
       float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
       boolean preemptionDisabled = curQueue.getPreemptionDisabled();
 
-      Resource current = Resources.multiply(partitionResource, absUsed);
+      Resource current = curQueue.getQueueResourceUsage().getUsed(
+          partitionToLookAt);
       Resource guaranteed = Resources.multiply(partitionResource, absCap);
       Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7e14587/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 2c0c6d7..3057360 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -81,7 +81,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -372,7 +374,7 @@ public class TestProportionalCapacityPreemptionPolicy {
             appA.getApplicationId(), appA.getAttemptId());
     assertTrue("appA should be running on queueB",
         mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB));
-    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
 
     // Need to call setup() again to reset mDisp
     setup();
@@ -395,7 +397,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // Resources should have come from queueE (appC) and neither of queueA's
     // children.
     verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
-    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
   }
 
   @Test
@@ -470,8 +472,8 @@ public class TestProportionalCapacityPreemptionPolicy {
     // With all queues preemptable, resources should be taken from queueC(appA)
     // and queueD(appB). Resources taken more from queueD(appB) than
     // queueC(appA) because it's over its capacity by a larger percentage.
-    verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
-    verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB)));
+    verify(mDisp, times(17)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(183)).handle(argThat(new IsPreemptionRequestFor(appB)));
 
     // Turn off preemption for queueA and it's children. queueF(appC)'s request
     // should starve.
@@ -635,7 +637,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     policy.editSchedule();
     // verify capacity taken from A1, not B1 despite B1 being far over
     // its absolute guaranteed capacity
-    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
   }
 
   @Test
@@ -676,7 +678,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // we verify both that C has priority on B and D (has it has >0 guarantees)
     // and that B and D are force to share their over capacity fairly (as they
     // are both zero-guarantees) hence D sees some of its containers preempted
-    verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC)));
   }
   
   
@@ -703,8 +705,8 @@ public class TestProportionalCapacityPreemptionPolicy {
 
     // XXX note: compensating for rounding error in Resources.multiplyTo
     // which is likely triggered since we use small numbers for readability
-    verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA)));
-    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
+    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appE)));
   }
 
   @Test
@@ -868,6 +870,34 @@ public class TestProportionalCapacityPreemptionPolicy {
     setAMContainer = false;
   }
 
+  @Test
+  public void testPreemptionWithVCoreResource() {
+    int[][] qData = new int[][]{
+        // / A B
+        {100, 100, 100}, // maxcap
+        {5, 1, 1}, // apps
+        {2, 0, 0}, // subqueues
+    };
+
+    // Resources can be set like memory:vcores
+    String[][] resData = new String[][]{
+        // / A B
+        {"100:100", "50:50", "50:50"}, // abs
+        {"10:100", "10:100", "0"}, // used
+        {"70:20", "70:20", "10:100"}, // pending
+        {"0", "0", "0"}, // reserved
+        {"-1", "1:10", "1:10"}, // req granularity
+    };
+
+    // Passing last param as TRUE to use DominantResourceCalculator
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData,
+        true);
+    policy.editSchedule();
+
+    // 5 containers will be preempted here
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
+
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {
     private final ApplicationAttemptId appAttId;
@@ -892,13 +922,40 @@ public class TestProportionalCapacityPreemptionPolicy {
   }
 
   ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
-    ProportionalCapacityPreemptionPolicy policy =
-      new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
+    ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy(
+        conf, rmContext, mCS, mClock);
+    clusterResources = Resource.newInstance(
+        leafAbsCapacities(qData[0], qData[7]), 0);
     ParentQueue mRoot = buildMockRootQueue(rand, qData);
     when(mCS.getRootQueue()).thenReturn(mRoot);
 
-    clusterResources =
-      Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
+    setResourceAndNodeDetails();
+    return policy;
+  }
+
+  ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
+      String[][] resData) {
+    return buildPolicy(qData, resData, false);
+  }
+
+  ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
+      String[][] resData, boolean useDominantResourceCalculator) {
+    if (useDominantResourceCalculator) {
+      when(mCS.getResourceCalculator()).thenReturn(
+          new DominantResourceCalculator());
+    }
+    ProportionalCapacityPreemptionPolicy policy =
+        new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
+    clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]),
+        qData[2]);
+    ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
+    when(mCS.getRootQueue()).thenReturn(mRoot);
+
+    setResourceAndNodeDetails();
+    return policy;
+  }
+
+  private void setResourceAndNodeDetails() {
     when(mCS.getClusterResource()).thenReturn(clusterResources);
     when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
         clusterResources);
@@ -906,35 +963,78 @@ public class TestProportionalCapacityPreemptionPolicy {
     SchedulerNode mNode = mock(SchedulerNode.class);
     when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
     when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
-    return policy;
   }
 
   ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
-    int[] abs      = queueData[0];
-    int[] maxCap   = queueData[1];
-    int[] used     = queueData[2];
-    int[] pending  = queueData[3];
-    int[] reserved = queueData[4];
-    int[] apps     = queueData[5];
-    int[] gran     = queueData[6];
-    int[] queues   = queueData[7];
-
-    return mockNested(abs, maxCap, used, pending,  reserved, apps, gran, queues);
+    Resource[] abs = generateResourceList(queueData[0]);
+    Resource[] used = generateResourceList(queueData[2]);
+    Resource[] pending = generateResourceList(queueData[3]);
+    Resource[] reserved = generateResourceList(queueData[4]);
+    Resource[] gran = generateResourceList(queueData[6]);
+    int[] maxCap = queueData[1];
+    int[] apps = queueData[5];
+    int[] queues = queueData[7];
+
+    return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
   }
 
-  ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
-      int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
-    float tot = leafAbsCapacities(abs, queues);
+  ParentQueue buildMockRootQueue(Random r, String[][] resData,
+      int[]... queueData) {
+    Resource[] abs = parseResourceDetails(resData[0]);
+    Resource[] used = parseResourceDetails(resData[1]);
+    Resource[] pending = parseResourceDetails(resData[2]);
+    Resource[] reserved = parseResourceDetails(resData[3]);
+    Resource[] gran = parseResourceDetails(resData[4]);
+    int[] maxCap = queueData[0];
+    int[] apps = queueData[1];
+    int[] queues = queueData[2];
+
+    return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
+  }
+
+  Resource[] parseResourceDetails(String[] resData) {
+    List<Resource> resourceList = new ArrayList<Resource>();
+    for (int i = 0; i < resData.length; i++) {
+      String[] resource = resData[i].split(":");
+      if (resource.length == 1) {
+        resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), 0));
+      } else {
+        resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]),
+            Integer.valueOf(resource[1])));
+      }
+    }
+    return resourceList.toArray(new Resource[resourceList.size()]);
+  }
+
+  Resource[] generateResourceList(int[] qData) {
+    List<Resource> resourceList = new ArrayList<Resource>();
+    for (int i = 0; i < qData.length; i++) {
+      resourceList.add(Resource.newInstance(qData[i], 0));
+    }
+    return resourceList.toArray(new Resource[resourceList.size()]);
+  }
+
+  ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used,
+      Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran,
+      int[] queues) {
+    ResourceCalculator rc = mCS.getResourceCalculator();
+    Resource tot = leafAbsCapacities(abs, queues);
     Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
     ParentQueue root = mockParentQueue(null, queues[0], pqs);
+    ResourceUsage resUsage = new ResourceUsage();
+    resUsage.setUsed(used[0]);
     when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
-    when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
-    when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
-    when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+    when(root.getAbsoluteUsedCapacity()).thenReturn(
+        Resources.divide(rc, tot, used[0], tot));
+    when(root.getAbsoluteCapacity()).thenReturn(
+        Resources.divide(rc, tot, abs[0], tot));
+    when(root.getAbsoluteMaximumCapacity()).thenReturn(
+        maxCap[0] / (float) tot.getMemory());
+    when(root.getQueueResourceUsage()).thenReturn(resUsage);
     QueueCapacities rootQc = new QueueCapacities(true);
-    rootQc.setAbsoluteUsedCapacity(used[0] / tot);
-    rootQc.setAbsoluteCapacity(abs[0] / tot);
-    rootQc.setAbsoluteMaximumCapacity(maxCap[0] / tot);
+    rootQc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[0], tot));
+    rootQc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[0], tot));
+    rootQc.setAbsoluteMaximumCapacity(maxCap[0] / (float) tot.getMemory());
     when(root.getQueueCapacities()).thenReturn(rootQc);
     when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
     boolean preemptionDisabled = mockPreemptionStatus("root");
@@ -943,28 +1043,35 @@ public class TestProportionalCapacityPreemptionPolicy {
     for (int i = 1; i < queues.length; ++i) {
       final CSQueue q;
       final ParentQueue p = pqs.removeLast();
-      final String queueName = "queue" + ((char)('A' + i - 1));
+      final String queueName = "queue" + ((char) ('A' + i - 1));
       if (queues[i] > 0) {
         q = mockParentQueue(p, queues[i], pqs);
+        ResourceUsage resUsagePerQueue = new ResourceUsage();
+        resUsagePerQueue.setUsed(used[i]);
+        when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue);
       } else {
         q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
       }
       when(q.getParent()).thenReturn(p);
       when(q.getQueueName()).thenReturn(queueName);
-      when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
-      when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
-      when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
+      when(q.getAbsoluteUsedCapacity()).thenReturn(
+          Resources.divide(rc, tot, used[i], tot));
+      when(q.getAbsoluteCapacity()).thenReturn(
+          Resources.divide(rc, tot, abs[i], tot));
+      when(q.getAbsoluteMaximumCapacity()).thenReturn(
+          maxCap[i] / (float) tot.getMemory());
 
       // We need to make these fields to QueueCapacities
       QueueCapacities qc = new QueueCapacities(false);
-      qc.setAbsoluteUsedCapacity(used[i] / tot);
-      qc.setAbsoluteCapacity(abs[i] / tot);
-      qc.setAbsoluteMaximumCapacity(maxCap[i] / tot);
+      qc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[i], tot));
+      qc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[i], tot));
+      qc.setAbsoluteMaximumCapacity(maxCap[i] / (float) tot.getMemory());
       when(q.getQueueCapacities()).thenReturn(qc);
 
       String parentPathName = p.getQueuePath();
       parentPathName = (parentPathName == null) ? "root" : parentPathName;
-      String queuePathName = (parentPathName+"."+queueName).replace("/","root");
+      String queuePathName = (parentPathName + "." + queueName).replace("/",
+          "root");
       when(q.getQueuePath()).thenReturn(queuePathName);
       preemptionDisabled = mockPreemptionStatus(queuePathName);
       when(q.getPreemptionDisabled()).thenReturn(preemptionDisabled);
@@ -1004,16 +1111,18 @@ public class TestProportionalCapacityPreemptionPolicy {
   }
 
   @SuppressWarnings("rawtypes")
-  LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, 
-      int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
+  LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs,
+      Resource[] used, Resource[] pending, Resource[] reserved, int[] apps,
+      Resource[] gran) {
     LeafQueue lq = mock(LeafQueue.class);
+    ResourceCalculator rc = mCS.getResourceCalculator();
     List<ApplicationAttemptId> appAttemptIdList = 
         new ArrayList<ApplicationAttemptId>();
-    when(lq.getTotalResourcePending()).thenReturn(
-        Resource.newInstance(pending[i], 0));
+    when(lq.getTotalResourcePending()).thenReturn(pending[i]);
     // need to set pending resource in resource usage as well
     ResourceUsage ru = new ResourceUsage();
-    ru.setPending(Resource.newInstance(pending[i], 0));
+    ru.setPending(pending[i]);
+    ru.setUsed(used[i]);
     when(lq.getQueueResourceUsage()).thenReturn(ru);
     // consider moving where CapacityScheduler::comparator accessible
     final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
@@ -1026,9 +1135,9 @@ public class TestProportionalCapacityPreemptionPolicy {
       });
     // applications are added in global L->R order in queues
     if (apps[i] != 0) {
-      int aUsed    = used[i] / apps[i];
-      int aPending = pending[i] / apps[i];
-      int aReserve = reserved[i] / apps[i];
+      Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]);
+      Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]);
+      Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]);
       for (int a = 0; a < apps[i]; ++a) {
         FiCaSchedulerApp mockFiCaApp =
             mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]);
@@ -1055,9 +1164,10 @@ public class TestProportionalCapacityPreemptionPolicy {
     return lq;
   }
 
-  FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved,
-      int gran) {
+  FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending,
+      Resource reserved, Resource gran) {
     FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+    ResourceCalculator rc = mCS.getResourceCalculator();
 
     ApplicationId appId = ApplicationId.newInstance(TS, id);
     ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
@@ -1065,30 +1175,35 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(app.getApplicationAttemptId()).thenReturn(appAttId);
 
     int cAlloc = 0;
-    Resource unit = Resource.newInstance(gran, 0);
+    Resource unit = gran;
     List<RMContainer> cReserved = new ArrayList<RMContainer>();
-    for (int i = 0; i < reserved; i += gran) {
-      cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
-          .getValue()));
+    Resource resIter = Resource.newInstance(0, 0);
+    for (; Resources.lessThan(rc, clusterResources, resIter, reserved); Resources
+        .addTo(resIter, gran)) {
+      cReserved.add(mockContainer(appAttId, cAlloc, unit,
+          priority.CONTAINER.getValue()));
       ++cAlloc;
     }
     when(app.getReservedContainers()).thenReturn(cReserved);
 
     List<RMContainer> cLive = new ArrayList<RMContainer>();
-    for (int i = 0; i < used; i += gran) {
-      if(setAMContainer && i == 0){
-        cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER
-            .getValue()));
-      }else if(setLabeledContainer && i ==1){
+    Resource usedIter = Resource.newInstance(0, 0);
+    int i = 0;
+    for (; Resources.lessThan(rc, clusterResources, usedIter, used); Resources
+        .addTo(usedIter, gran)) {
+      if (setAMContainer && i == 0) {
+        cLive.add(mockContainer(appAttId, cAlloc, unit,
+            priority.AMCONTAINER.getValue()));
+      } else if (setLabeledContainer && i == 1) {
         cLive.add(mockContainer(appAttId, cAlloc, unit,
             priority.LABELEDCONTAINER.getValue()));
-        ++used;
-      }
-      else{
-        cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
-            .getValue()));
+        Resources.addTo(used, Resource.newInstance(1, 1));
+      } else {
+        cLive.add(mockContainer(appAttId, cAlloc, unit,
+            priority.CONTAINER.getValue()));
       }
       ++cAlloc;
+      ++i;
     }
     when(app.getLiveContainers()).thenReturn(cLive);
     return app;
@@ -1124,6 +1239,16 @@ public class TestProportionalCapacityPreemptionPolicy {
     return ret;
   }
 
+  static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) {
+    Resource ret = Resource.newInstance(0, 0);
+    for (int i = 0; i < abs.length; ++i) {
+      if (0 == subqueues[i]) {
+        Resources.addTo(ret, abs[i]);
+      }
+    }
+    return ret;
+  }
+
   void printString(CSQueue nq, String indent) {
     if (nq instanceof ParentQueue) {
       System.out.println(indent + nq.getQueueName()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7e14587/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index 114769c..b3ac79b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
@@ -771,6 +772,60 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
   }
 
+  @Test
+  public void testNodePartitionPreemptionWithVCoreResource() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+     * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
+     * app1/app2 in a, and app3/app4 in b. app1 uses 80 x, app2 uses 20
+     * NO_LABEL, app3 uses 20 x, app4 uses 80 NO_LABEL. Both a/b have 50 pending
+     * resource for x and NO_LABEL
+     *
+     * After preemption, it should preempt 30 from app1, and 30 from app4.
+     */
+    String labelsConfig = "=100:200,true;" + // default partition
+        "x=100:200,true"; // partition=x
+    String nodesConfig = "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+    // guaranteed,max,used,pending
+    "root(=[100:200 100:200 100:200 100:200],x=[100:200 100:200 100:200 100:200]);"
+        + // root
+        "-a(=[50:100 100:200 20:40 50:100],x=[50:100 100:200 80:160 50:100]);" + // a
+        "-b(=[50:100 100:200 80:160 50:100],x=[50:100 100:200 20:40 50:100])"; // b
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved)
+    "a\t" // app1 in a
+        + "(1,1:2,n1,x,80,false);" + // 80 * x in n1
+        "a\t" // app2 in a
+        + "(1,1:2,n2,,20,false);" + // 20 default in n2
+        "b\t" // app3 in b
+        + "(1,1:2,n1,x,20,false);" + // 20 * x in n1
+        "b\t" // app4 in b
+        + "(1,1:2,n2,,80,false)"; // 80 default in n2
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
+    policy.editSchedule();
+
+    // 30 preempted from app1, 30 preempted from app4, and nothing preempted
+    // from app2/app3
+    verify(mDisp, times(30)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, times(30)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
+    verify(mDisp, never()).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+    verify(mDisp, never()).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+  }
 
   private ApplicationAttemptId getAppAttemptId(int id) {
     ApplicationId appId = ApplicationId.newInstance(0L, id);
@@ -821,6 +876,16 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
 
   private void buildEnv(String labelsConfig, String nodesConfig,
       String queuesConfig, String appsConfig) throws IOException {
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
+  }
+
+  private void buildEnv(String labelsConfig, String nodesConfig,
+      String queuesConfig, String appsConfig,
+      boolean useDominantResourceCalculator) throws IOException {
+    if (useDominantResourceCalculator) {
+      when(cs.getResourceCalculator()).thenReturn(
+          new DominantResourceCalculator());
+    }
     mockNodeLabelsManager(labelsConfig);
     mockSchedulerNodes(nodesConfig);
     for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
@@ -832,7 +897,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
     when(cs.getClusterResource()).thenReturn(clusterResource);
     mockApplications(appsConfig);
 
-    policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
+    policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs,
+        mClock);
   }
 
   private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
@@ -868,7 +934,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
             + "(priority,resource,host,expression,repeat,reserved)");
       }
       Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
-      Resource res = Resources.createResource(Integer.valueOf(values[1]));
+      Resource res = parseResourceFromString(values[1]);
       NodeId host = NodeId.newInstance(values[2], 1);
       String exp = values[3];
       int repeat = Integer.valueOf(values[4]);
@@ -1002,11 +1068,10 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
     clusterResource = Resources.createResource(0);
     for (String p : partitionConfigArr) {
       String partitionName = p.substring(0, p.indexOf("="));
-      int totalResource =
-          Integer.valueOf(p.substring(p.indexOf("=") + 1, p.indexOf(",")));
-      boolean exclusivity =
+      Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
+          p.indexOf(",")));
+     boolean exclusivity =
           Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
-      Resource res = Resources.createResource(totalResource);
       when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
           .thenReturn(res);
       when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
@@ -1022,6 +1087,18 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
         partitionToResource.keySet());
   }
 
+  private Resource parseResourceFromString(String p) {
+    String[] resource = p.split(":");
+    Resource res = Resources.createResource(0);
+    if (resource.length == 1) {
+      res = Resources.createResource(Integer.valueOf(resource[0]));
+    } else {
+      res = Resources.createResource(Integer.valueOf(resource[0]),
+          Integer.valueOf(resource[1]));
+    }
+    return res;
+  }
+
   /**
    * Format is:
    * <pre>
@@ -1120,23 +1197,22 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
       // Add a small epsilon to capacities to avoid truncate when doing
       // Resources.multiply
       float epsilon = 1e-6f;
-      float absGuaranteed =
-          Integer.valueOf(values[0].trim())
-              / (float) (partitionToResource.get(partitionName).getMemory())
-              + epsilon;
-      float absMax =
-          Integer.valueOf(values[1].trim())
-              / (float) (partitionToResource.get(partitionName).getMemory())
-              + epsilon;
-      float absUsed =
-          Integer.valueOf(values[2].trim())
-              / (float) (partitionToResource.get(partitionName).getMemory())
-              + epsilon;
-      Resource pending = Resources.createResource(Integer.valueOf(values[3].trim()));
+      Resource totResoucePerPartition = partitionToResource.get(partitionName);
+      float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[0].trim()), totResoucePerPartition)
+          + epsilon;
+      float absMax = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[1].trim()), totResoucePerPartition)
+          + epsilon;
+      float absUsed = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[2].trim()), totResoucePerPartition)
+          + epsilon;
+      Resource pending = parseResourceFromString(values[3].trim());
       qc.setAbsoluteCapacity(partitionName, absGuaranteed);
       qc.setAbsoluteMaximumCapacity(partitionName, absMax);
       qc.setAbsoluteUsedCapacity(partitionName, absUsed);
       ru.setPending(partitionName, pending);
+      ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
       LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
           + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
           + ",abs_used" + absUsed + ",pending_resource=" + pending + "]");