You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2014/12/05 22:09:21 UTC

hadoop git commit: YARN-2056. Disable preemption at Queue level. Contributed by Eric Payne (cherry picked from commit 4b130821995a3cfe20c71e38e0f63294085c0491)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 2ebd1bc65 -> b72fb6c77


YARN-2056. Disable preemption at Queue level. Contributed by Eric Payne
(cherry picked from commit 4b130821995a3cfe20c71e38e0f63294085c0491)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b72fb6c7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b72fb6c7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b72fb6c7

Branch: refs/heads/branch-2
Commit: b72fb6c774d86f8cc04b7f7dc1a202d3d0c0720e
Parents: 2ebd1bc
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Dec 5 21:06:48 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Dec 5 21:08:14 2014 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../ProportionalCapacityPreemptionPolicy.java   | 170 +++++++++--
 ...estProportionalCapacityPreemptionPolicy.java | 283 ++++++++++++++++++-
 3 files changed, 424 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b72fb6c7/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3a39a42..a837d54 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -96,6 +96,8 @@ Release 2.7.0 - UNRELEASED
 
     YARN-2301. Improved yarn container command. (Naganarasimha G R via jianhe)
 
+    YARN-2056. Disable preemption at Queue level (Eric Payne via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b72fb6c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 0f48b0c..1a3f804 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.PriorityQueue;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -111,6 +112,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   public static final String NATURAL_TERMINATION_FACTOR =
       "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
 
+  public static final String BASE_YARN_RM_PREEMPTION = "yarn.scheduler.capacity.";
+  public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption";
+
   // the dispatcher to send preempt and kill events
   public EventHandler<ContainerPreemptEvent> dispatcher;
 
@@ -192,7 +196,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // extract a summary of the queues from scheduler
     TempQueue tRoot;
     synchronized (scheduler) {
-      tRoot = cloneQueues(root, clusterResources);
+      tRoot = cloneQueues(root, clusterResources, false);
     }
 
     // compute the ideal distribution of resources among queues
@@ -370,28 +374,60 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   private void computeFixpointAllocation(ResourceCalculator rc,
       Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned, 
       boolean ignoreGuarantee) {
+    // Prior to assigning the unused resources, process each queue as follows:
+    // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
+    // Else idealAssigned = current;
+    // Subtract idealAssigned resources from unassigned.
+    // If the queue has all of its needs met (that is, if 
+    // idealAssigned >= current + pending), remove it from consideration.
+    // Sort queues from most under-guaranteed to most over-guaranteed.
+    TQComparator tqComparator = new TQComparator(rc, tot_guarant);
+    PriorityQueue<TempQueue> orderedByNeed =
+                                 new PriorityQueue<TempQueue>(10,tqComparator);
+    for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+      TempQueue q = i.next();
+      if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
+        q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
+      } else {
+        q.idealAssigned = Resources.clone(q.current);
+      }
+      Resources.subtractFrom(unassigned, q.idealAssigned);
+      // If idealAssigned < (current + pending), q needs more resources, so
+      // add it to the list of underserved queues, ordered by need.
+      Resource curPlusPend = Resources.add(q.current, q.pending);
+      if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
+        orderedByNeed.add(q);
+      }
+    }
+
     //assign all cluster resources until no more demand, or no resources are left
-    while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
-          unassigned, Resources.none())) {
+    while (!orderedByNeed.isEmpty()
+       && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) {
       Resource wQassigned = Resource.newInstance(0, 0);
-
       // we compute normalizedGuarantees capacity based on currently active
       // queues
-      resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
-      
-      // offer for each queue their capacity first and in following invocations
-      // their share of over-capacity
-      for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+      resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee);
+
+      // For each underserved queue (or set of queues if multiple are equally
+      // underserved), offer its share of the unassigned resources based on its
+      // normalized guarantee. After the offer, if the queue is not satisfied,
+      // place it back in the ordered list of queues, recalculating its place
+      // in the order of most under-guaranteed to most over-guaranteed. In this
+      // way, the most underserved queue(s) are always given resources first.
+      Collection<TempQueue> underserved =
+          getMostUnderservedQueues(orderedByNeed, tqComparator);
+      for (Iterator<TempQueue> i = underserved.iterator(); i.hasNext();) {
         TempQueue sub = i.next();
-        Resource wQavail =
-          Resources.multiply(unassigned, sub.normalizedGuarantee);
+        Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
+            unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
         Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
         Resource wQdone = Resources.subtract(wQavail, wQidle);
-        // if the queue returned a value > 0 it means it is fully satisfied
-        // and it is removed from the list of active queues qAlloc
-        if (!Resources.greaterThan(rc, tot_guarant,
+
+        if (Resources.greaterThan(rc, tot_guarant,
               wQdone, Resources.none())) {
-          i.remove();
+          // The queue is still asking for more. Put it back in the priority
+          // queue, recalculating its order based on need.
+          orderedByNeed.add(sub);
         }
         Resources.addTo(wQassigned, wQdone);
       }
@@ -399,6 +435,27 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     }
   }
 
+  // Take the most underserved TempQueue (the one on the head). Collect and
+  // return the list of all queues that have the same idealAssigned
+  // percentage of guaranteed.
+  protected Collection<TempQueue> getMostUnderservedQueues(
+      PriorityQueue<TempQueue> orderedByNeed, TQComparator tqComparator) {
+    ArrayList<TempQueue> underserved = new ArrayList<TempQueue>();
+    while (!orderedByNeed.isEmpty()) {
+      TempQueue q1 = orderedByNeed.remove();
+      underserved.add(q1);
+      TempQueue q2 = orderedByNeed.peek();
+      // q1's pct of guaranteed won't be larger than q2's. If it's less, then
+      // return what has already been collected. Otherwise, q1's pct of
+      // guaranteed == that of q2, so add q2 to underserved list during the
+      // next pass.
+      if (q2 == null || tqComparator.compare(q1,q2) < 0) {
+        return underserved;
+      }
+    }
+    return underserved;
+  }
+
   /**
    * Computes a normalizedGuaranteed capacity based on active queues
    * @param rc resource calculator
@@ -626,9 +683,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    *
    * @param root the root of the CapacityScheduler queue hierarchy
    * @param clusterResources the total amount of resources in the cluster
+   * @param parentDisablePreempt true if disable preemption is set for parent
    * @return the root of the cloned queue hierarchy
    */
-  private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
+  private TempQueue cloneQueues(CSQueue root, Resource clusterResources,
+      boolean parentDisablePreempt) {
     TempQueue ret;
     synchronized (root) {
       String queueName = root.getQueueName();
@@ -639,19 +698,46 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       Resource current = Resources.multiply(clusterResources, absUsed);
       Resource guaranteed = Resources.multiply(clusterResources, absCap);
       Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
+
+      boolean queueDisablePreemption = false;
+      String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath()
+                               + SUFFIX_DISABLE_PREEMPTION;
+      queueDisablePreemption = scheduler.getConfiguration()
+                              .getBoolean(queuePropName, parentDisablePreempt);
+
+      Resource extra = Resource.newInstance(0, 0);
+      if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) {
+        extra = Resources.subtract(current, guaranteed);
+      }
       if (root instanceof LeafQueue) {
         LeafQueue l = (LeafQueue) root;
         Resource pending = l.getTotalResourcePending();
         ret = new TempQueue(queueName, current, pending, guaranteed,
             maxCapacity);
-
+        if (queueDisablePreemption) {
+          ret.untouchableExtra = extra;
+        } else {
+          ret.preemptableExtra = extra;
+        }
         ret.setLeafQueue(l);
       } else {
         Resource pending = Resource.newInstance(0, 0);
         ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
             maxCapacity);
+        Resource childrensPreemptable = Resource.newInstance(0, 0);
         for (CSQueue c : root.getChildQueues()) {
-          ret.addChild(cloneQueues(c, clusterResources));
+          TempQueue subq =
+                cloneQueues(c, clusterResources, queueDisablePreemption);
+          Resources.addTo(childrensPreemptable, subq.preemptableExtra);
+          ret.addChild(subq);
+        }
+        // untouchableExtra = max(extra - childrenPreemptable, 0)
+        if (Resources.greaterThanOrEqual(
+              rc, clusterResources, childrensPreemptable, extra)) {
+          ret.untouchableExtra = Resource.newInstance(0, 0);
+        } else {
+          ret.untouchableExtra =
+                Resources.subtractFrom(extra, childrensPreemptable);
         }
       }
     }
@@ -690,6 +776,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     Resource idealAssigned;
     Resource toBePreempted;
     Resource actuallyPreempted;
+    Resource untouchableExtra;
+    Resource preemptableExtra;
 
     double normalizedGuarantee;
 
@@ -708,6 +796,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       this.toBePreempted = Resource.newInstance(0, 0);
       this.normalizedGuarantee = Float.NaN;
       this.children = new ArrayList<TempQueue>();
+      this.untouchableExtra = Resource.newInstance(0, 0);
+      this.preemptableExtra = Resource.newInstance(0, 0);
     }
 
     public void setLeafQueue(LeafQueue l){
@@ -761,10 +851,20 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
         .append(" IDEAL_PREEMPT: ").append(toBePreempted)
         .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted)
+        .append(" UNTOUCHABLE: ").append(untouchableExtra)
+        .append(" PREEMPTABLE: ").append(preemptableExtra)
         .append("\n");
 
       return sb.toString();
     }
+
+    public void printAll() {
+      LOG.info(this.toString());
+      for (TempQueue sub : this.getChildren()) {
+        sub.printAll();
+      }
+    }
+
     public void assignPreemption(float scalingFactor,
         ResourceCalculator rc, Resource clusterResource) {
       if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) {
@@ -793,4 +893,38 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
 
   }
 
+  static class TQComparator implements Comparator<TempQueue> {
+    private ResourceCalculator rc;
+    private Resource clusterRes;
+
+    TQComparator(ResourceCalculator rc, Resource clusterRes) {
+      this.rc = rc;
+      this.clusterRes = clusterRes;
+    }
+
+    @Override
+    public int compare(TempQueue tq1, TempQueue tq2) {
+      if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
+        return -1;
+      }
+      if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
+        return 1;
+      }
+      return 0;
+    }
+
+    // Calculates idealAssigned / guaranteed
+    // TempQueues with 0 guarantees are always considered the most over
+    // capacity and therefore considered last for resources.
+    private double getIdealPctOfGuaranteed(TempQueue q) {
+      double pctOver = Integer.MAX_VALUE;
+      if (q != null && Resources.greaterThan(
+          rc, clusterRes, q.guaranteed, Resources.none())) {
+        pctOver =
+            Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed);
+      }
+      return (pctOver);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b72fb6c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 24e70bb..ca67ef0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -17,16 +17,19 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.isA;
@@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -86,6 +90,7 @@ public class TestProportionalCapacityPreemptionPolicy {
   Clock mClock = null;
   Configuration conf = null;
   CapacityScheduler mCS = null;
+  CapacitySchedulerConfiguration schedConf = null;
   EventHandler<ContainerPreemptEvent> mDisp = null;
   ResourceCalculator rc = new DefaultResourceCalculator();
   final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
@@ -98,6 +103,8 @@ public class TestProportionalCapacityPreemptionPolicy {
       ApplicationId.newInstance(TS, 3), 0);
   final ApplicationAttemptId appE = ApplicationAttemptId.newInstance(
       ApplicationId.newInstance(TS, 4), 0);
+  final ApplicationAttemptId appF = ApplicationAttemptId.newInstance(
+      ApplicationId.newInstance(TS, 4), 0);
   final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
     ArgumentCaptor.forClass(ContainerPreemptEvent.class);
 
@@ -123,6 +130,8 @@ public class TestProportionalCapacityPreemptionPolicy {
     mClock = mock(Clock.class);
     mCS = mock(CapacityScheduler.class);
     when(mCS.getResourceCalculator()).thenReturn(rc);
+    schedConf = new CapacitySchedulerConfiguration();
+    when(mCS.getConfiguration()).thenReturn(schedConf);
     mDisp = mock(EventHandler.class);
     rand = new Random();
     long seed = rand.nextLong();
@@ -266,6 +275,240 @@ public class TestProportionalCapacityPreemptionPolicy {
   }
 
   @Test
+  public void testPerQueueDisablePreemption() {
+    int[][] qData = new int[][]{
+        //  /    A    B    C
+        { 100,  55,  25,  20 },  // abs
+        { 100, 100, 100, 100 },  // maxCap
+        { 100,   0,  54,  46 },  // used
+        {  10,  10,   0,   0 },  // pending
+        {   0,   0,   0,   0 },  // reserved
+       //     appA appB appC
+        {   3,   1,   1,   1 },  // apps
+        {  -1,   1,   1,   1 },  // req granularity
+        {   3,   0,   0,  0 },  // subqueues
+      };
+
+    schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+        + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
+
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // With PREEMPTION_DISABLED set for queueB, get resources from queueC
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+    // With no PREEMPTION_DISABLED set for queueB, resources will be preempted
+    // from both queueB and queueC. Test must be reset for so that the mDisp
+    // event handler will count only events from the following test and not the
+    // previous one.
+    setup();
+    ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
+    
+    schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+        + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false);
+    policy2.editSchedule();
+
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+    verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appC)));
+  }
+
+  @Test
+  public void testPerQueueDisablePreemptionHierarchical() {
+    int[][] qData = new int[][] {
+      //  /    A              D
+      //            B    C         E    F
+      { 200, 100,  50,  50, 100,  10,  90 },  // abs
+      { 200, 200, 200, 200, 200, 200, 200 },  // maxCap
+      { 200, 110,  60,  50,  90,  90,   0 },  // used
+      {  10,   0,   0,   0,  10,   0,  10 },  // pending
+      {   0,   0,   0,   0,   0,   0,   0 },  // reserved
+      //          appA appB      appC appD
+      {   4,   2,   1,   1,   2,   1,   1 },  // apps
+      {  -1,  -1,   1,   1,  -1,   1,   1 },  // req granularity
+      {   2,   2,   0,   0,   2,   0,   0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // verify capacity taken from queueB (appA), not queueE (appC) despite 
+    // queueE being far over its absolute capacity because queueA (queueB's
+    // parent) is over capacity and queueD (queueE's parent) is not.
+    ApplicationAttemptId expectedAttemptOnQueueB = 
+        ApplicationAttemptId.newInstance(
+            appA.getApplicationId(), appA.getAttemptId());
+    assertTrue("appA should be running on queueB",
+        mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB));
+    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+
+    // Need to call setup() again to reset mDisp
+    setup();
+    // Disable preemption for queueB and it's children
+    schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+        + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
+    ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
+    policy2.editSchedule();
+    ApplicationAttemptId expectedAttemptOnQueueC = 
+        ApplicationAttemptId.newInstance(
+            appB.getApplicationId(), appB.getAttemptId());
+    ApplicationAttemptId expectedAttemptOnQueueE = 
+        ApplicationAttemptId.newInstance(
+            appC.getApplicationId(), appC.getAttemptId());
+    // Now, all of queueB's (appA) over capacity is not preemptable, so neither
+    // is queueA's. Verify that capacity is taken from queueE (appC).
+    assertTrue("appB should be running on queueC",
+        mCS.getAppsInQueue("queueC").contains(expectedAttemptOnQueueC));
+    assertTrue("appC should be running on queueE",
+        mCS.getAppsInQueue("queueE").contains(expectedAttemptOnQueueE));
+    // Resources should have come from queueE (appC) and neither of queueA's
+    // children.
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
+  }
+
+  @Test
+  public void testPerQueueDisablePreemptionBroadHierarchical() {
+    int[][] qData = new int[][] {
+        //  /    A              D              G    
+        //            B    C         E    F         H    I
+        {1000, 350, 150, 200, 400, 200, 200, 250, 100, 150 },  // abs
+        {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 },  // maxCap
+        {1000, 400, 200, 200, 400, 250, 150, 200, 150,  50 },  // used
+        {  50,   0,   0,   0,  50,   0,  50,   0,   0,   0 },  // pending
+        {   0,   0,   0,   0,   0,   0,   0,   0,   0,   0 },  // reserved
+        //          appA appB      appC appD      appE appF
+        {   6,   2,   1,   1,   2,   1,   1,   2,   1,   1 },  // apps
+        {  -1,  -1,   1,   1,  -1,   1,   1,  -1,   1,   1 },  // req granulrity
+        {   3,   2,   0,   0,   2,   0,   0,   2,   0,   0 },  // subqueues
+      };
+
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // queueF(appD) wants resources, Verify that resources come from queueE(appC)
+    // because it's a sibling and queueB(appA) because queueA is over capacity.
+    verify(mDisp, times(28)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(22)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+    // Need to call setup() again to reset mDisp
+    setup();
+    // Disable preemption for queueB(appA)
+    schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+        + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
+    ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
+    policy2.editSchedule();
+    // Now that queueB(appA) is not preemptable, verify that resources come
+    // from queueE(appC)
+    verify(mDisp, times(50)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+
+    setup();
+    // Disable preemption for two of the 3 queues with over-capacity.
+    schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+        + "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true);
+    schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+        + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
+    ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
+    policy3.editSchedule();
+
+    // Verify that the request was starved out even though queueH(appE) is
+    // over capacity. This is because queueG (queueH's parent) is NOT
+    // overcapacity.
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueB
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
+  }
+
+  @Test
+  public void testPerQueueDisablePreemptionInheritParent() {
+    int[][] qData = new int[][] {
+        //  /    A                   E          
+        //            B    C    D         F    G    H
+        {1000, 500, 200, 200, 100, 500, 200, 200, 100 },  // abs (guar)
+        {1000,1000,1000,1000,1000,1000,1000,1000,1000 },  // maxCap
+        {1000, 700,   0, 350, 350, 300,   0, 200, 100 },  // used 
+        { 200,   0,   0,   0,   0, 200, 200,   0,   0 },  // pending
+        {   0,   0,   0,   0,   0,   0,   0,   0,   0 },  // reserved
+        //               appA appB      appC appD appE 
+        {   5,   2,   0,   1,   1,   3,   1,   1,   1 },  // apps 
+        {  -1,  -1,   1,   1,   1,  -1,   1,   1,   1 },  // req granulrity
+        {   2,   3,   0,   0,   0,   3,   0,   0,   0 },  // subqueues
+      };
+
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // With all queues preemptable, resources should be taken from queueC(appA)
+    // and queueD(appB). Resources taken more from queueD(appB) than
+    // queueC(appA) because it's over its capacity by a larger percentage.
+    verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+    // Disable preemption for queueA and it's children. queueF(appC)'s request
+    // should starve.
+    setup(); // Call setup() to reset mDisp
+    schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+        + "root.queueA" + SUFFIX_DISABLE_PREEMPTION, true);
+    ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
+    policy2.editSchedule();
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueD
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueG
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
+  }
+
+  @Test
+  public void testPerQueuePreemptionNotAllUntouchable() {
+    int[][] qData = new int[][] {
+      //  /      A                       E
+      //               B     C     D           F     G     H
+      { 2000, 1000,  800,  100,  100, 1000,  500,  300,  200 },  // abs
+      { 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000 },  // maxCap
+      { 2000, 1300,  300,  800,  200,  700,  500,    0,  200 },  // used
+      {  300,    0,    0,    0,    0,  300,    0,  300,    0 },  // pending
+      {    0,    0,    0,    0,    0,    0,    0,    0,    0 },  // reserved
+      //             appA  appB  appC        appD  appE  appF
+      {    6,    3,    1,    1,    1,    3,    1,    1,    1 },  // apps
+      {   -1,   -1,    1,    1,    1,   -1,    1,    1,    1 },  // req granularity
+      {    2,    3,    0,    0,    0,    3,    0,    0,    0 },  // subqueues
+    };
+    schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+        + "root.queueA.queueC" + SUFFIX_DISABLE_PREEMPTION, true);
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // Although queueC(appB) is way over capacity and is untouchable,
+    // queueD(appC) is preemptable. Request should be filled from queueD(appC).
+    verify(mDisp, times(100)).handle(argThat(new IsPreemptionRequestFor(appC)));
+  }
+
+  @Test
+  public void testPerQueueDisablePreemptionRootDisablesAll() {
+    int[][] qData = new int[][] {
+        //  /    A              D              G    
+        //            B    C         E    F         H    I
+        {1000, 500, 250, 250, 250, 100, 150, 250, 100, 150 },  // abs
+        {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 },  // maxCap
+        {1000,  20,   0,  20, 490, 240, 250, 490, 240, 250 },  // used
+        { 200, 200, 200,   0,   0,   0,   0,   0,   0,   0 },  // pending
+        {   0,   0,   0,   0,   0,   0,   0,   0,   0,   0 },  // reserved
+        //          appA appB      appC appD      appE appF
+        {   6,   2,   1,   1,   2,   1,   1,   2,   1,   1 },  // apps
+        {  -1,  -1,   1,   1,  -1,   1,   1,  -1,   1,   1 },  // req granulrity
+        {   3,   2,   0,   0,   2,   0,   0,   2,   0,   0 },  // subqueues
+   };
+
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+        + "root" + SUFFIX_DISABLE_PREEMPTION, true);
+    policy.editSchedule();
+    // All queues should be non-preemptable, so request should starve.
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueB
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
+  }
+
+  @Test
   public void testOverCapacityImbalance() {
     int[][] qData = new int[][]{
       //  /   A   B   C
@@ -341,7 +584,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     policy.editSchedule();
     // verify capacity taken from A1, not B1 despite B1 being far over
     // its absolute guaranteed capacity
-    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
   }
 
   @Test
@@ -390,15 +633,17 @@ public class TestProportionalCapacityPreemptionPolicy {
   @Test
   public void testHierarchicalLarge() {
     int[][] qData = new int[][] {
-      //  /    A   B   C    D   E   F    G   H   I
-      { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90  },  // abs
-      { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, },  // maxCap
-      { 400, 210, 70,140, 100, 50, 50,  90, 90,  0  },  // used
-      {  10,   0,  0,  0,   0,  0,  0,   0,  0, 15  },  // pending
-      {   0,   0,  0,  0,   0,  0,  0,   0,  0,  0  },  // reserved
-      {   6,   2,  1,  1,   2,  1,  1,   2,  1,  1  },  // apps
-      {  -1,  -1,  1,  1,  -1,  1,  1,  -1,  1,  1  },  // req granularity
-      {   3,   2,  0,  0,   2,  0,  0,   2,  0,  0  },  // subqueues
+      //  /    A              D              G        
+      //            B    C         E    F         H    I
+      { 400, 200,  60, 140, 100,  70,  30, 100,  10,  90 },  // abs
+      { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 },  // maxCap
+      { 400, 210,  70, 140, 100,  50,  50,  90,  90,   0 },  // used
+      {  15,   0,   0,   0,   0,   0,   0,   0,   0,  15 },  // pending
+      {   0,   0,   0,   0,   0,   0,   0,   0,   0,   0 },  // reserved
+      //          appA appB      appC appD      appE appF
+      {   6,   2,   1,   1,   2,   1,   1,   2,   1,   1 },  // apps
+      {  -1,  -1,   1,   1,  -1,   1,   1,  -1,   1,   1 },  // req granularity
+      {   3,   2,   0,   0,   2,   0,   0,   2,   0,   0 },  // subqueues
     };
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
@@ -407,8 +652,8 @@ public class TestProportionalCapacityPreemptionPolicy {
 
     // XXX note: compensating for rounding error in Resources.multiplyTo
     // which is likely triggered since we use small numbers for readability
-    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
-    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE)));
+    verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
   }
 
   @Test
@@ -629,6 +874,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
     when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
     when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+    when(root.getQueuePath()).thenReturn("root");
 
     for (int i = 1; i < queues.length; ++i) {
       final CSQueue q;
@@ -644,6 +890,10 @@ public class TestProportionalCapacityPreemptionPolicy {
       when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
       when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
       when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
+      String parentPathName = p.getQueuePath();
+      parentPathName = (parentPathName == null) ? "root" : parentPathName;
+      String queuePathName = (parentPathName+"."+queueName).replace("/","root");
+      when(q.getQueuePath()).thenReturn(queuePathName);
     }
     assert 0 == pqs.size();
     return root;
@@ -666,6 +916,8 @@ public class TestProportionalCapacityPreemptionPolicy {
   LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, 
       int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
     LeafQueue lq = mock(LeafQueue.class);
+    List<ApplicationAttemptId> appAttemptIdList = 
+        new ArrayList<ApplicationAttemptId>();
     when(lq.getTotalResourcePending()).thenReturn(
         Resource.newInstance(pending[i], 0));
     // consider moving where CapacityScheduler::comparator accessible
@@ -683,9 +935,14 @@ public class TestProportionalCapacityPreemptionPolicy {
       int aPending = pending[i] / apps[i];
       int aReserve = reserved[i] / apps[i];
       for (int a = 0; a < apps[i]; ++a) {
-        qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]));
+        FiCaSchedulerApp mockFiCaApp =
+            mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]);
+        qApps.add(mockFiCaApp);
         ++appAlloc;
+        appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId());
       }
+      when(mCS.getAppsInQueue("queue" + (char)('A' + i - 1)))
+              .thenReturn(appAttemptIdList);
     }
     when(lq.getApplications()).thenReturn(qApps);
     if(setAMResourcePercent != 0.0f){