You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cd...@apache.org on 2014/05/14 01:15:27 UTC
svn commit: r1594414 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/
hadoop-yarn/hadoop-yarn-server/hadoop...
Author: cdouglas
Date: Tue May 13 23:15:27 2014
New Revision: 1594414
URL: http://svn.apache.org/r1594414
Log:
YARN-1957. Consider the max capacity of the queue when computing the ideal
capacity for preemption. Contributed by Carlo Curino
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1594414&r1=1594413&r2=1594414&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue May 13 23:15:27 2014
@@ -216,6 +216,9 @@ Release 2.4.1 - UNRELEASED
causing both RMs to be stuck in standby mode when automatic failover is
enabled. (Karthik Kambatla and Xuan Gong via vinodkv)
+ YARN-1957. Consider the max capacity of the queue when computing the ideal
+ capacity for preemption. (Carlo Curino via cdouglas)
+
Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1594414&r1=1594413&r2=1594414&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Tue May 13 23:15:27 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -293,34 +294,31 @@ public class ProportionalCapacityPreempt
// with the total capacity for this set of queues
Resource unassigned = Resources.clone(tot_guarant);
- //assign all cluster resources until no more demand, or no resources are left
- while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
- unassigned, Resources.none())) {
- Resource wQassigned = Resource.newInstance(0, 0);
-
- // we compute normalizedGuarantees capacity based on currently active
- // queues
- resetCapacity(rc, unassigned, qAlloc);
-
- // offer for each queue their capacity first and in following invocations
- // their share of over-capacity
- for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
- TempQueue sub = i.next();
- Resource wQavail =
- Resources.multiply(unassigned, sub.normalizedGuarantee);
- Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
- Resource wQdone = Resources.subtract(wQavail, wQidle);
- // if the queue returned a value > 0 it means it is fully satisfied
- // and it is removed from the list of active queues qAlloc
- if (!Resources.greaterThan(rc, tot_guarant,
- wQdone, Resources.none())) {
- i.remove();
- }
- Resources.addTo(wQassigned, wQdone);
+ // group queues based on whether they have non-zero guaranteed capacity
+ Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
+ Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
+
+ for (TempQueue q : qAlloc) {
+ if (Resources
+ .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
+ nonZeroGuarQueues.add(q);
+ } else {
+ zeroGuarQueues.add(q);
}
- Resources.subtractFrom(unassigned, wQassigned);
}
+ // first compute the allocation as a fixpoint based on guaranteed capacity
+ computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
+ false);
+
+ // if any capacity is left unassigned, distributed among zero-guarantee
+ // queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
+ if (!zeroGuarQueues.isEmpty()
+ && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
+ computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
+ true);
+ }
+
// based on ideal assignment computed above and current assignment we derive
// how much preemption is required overall
Resource totPreemptionNeeded = Resource.newInstance(0, 0);
@@ -353,6 +351,46 @@ public class ProportionalCapacityPreempt
}
}
+
+ /**
+ * Given a set of queues compute the fix-point distribution of unassigned
+ * resources among them. As pending request of a queue are exhausted, the
+ * queue is removed from the set and remaining capacity redistributed among
+ * remaining queues. The distribution is weighted based on guaranteed
+ * capacity, unless asked to ignoreGuarantee, in which case resources are
+ * distributed uniformly.
+ */
+ private void computeFixpointAllocation(ResourceCalculator rc,
+ Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned,
+ boolean ignoreGuarantee) {
+ //assign all cluster resources until no more demand, or no resources are left
+ while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
+ unassigned, Resources.none())) {
+ Resource wQassigned = Resource.newInstance(0, 0);
+
+ // we compute normalizedGuarantees capacity based on currently active
+ // queues
+ resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
+
+ // offer for each queue their capacity first and in following invocations
+ // their share of over-capacity
+ for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+ TempQueue sub = i.next();
+ Resource wQavail =
+ Resources.multiply(unassigned, sub.normalizedGuarantee);
+ Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
+ Resource wQdone = Resources.subtract(wQavail, wQidle);
+ // if the queue returned a value > 0 it means it is fully satisfied
+ // and it is removed from the list of active queues qAlloc
+ if (!Resources.greaterThan(rc, tot_guarant,
+ wQdone, Resources.none())) {
+ i.remove();
+ }
+ Resources.addTo(wQassigned, wQdone);
+ }
+ Resources.subtractFrom(unassigned, wQassigned);
+ }
+ }
/**
* Computes a normalizedGuaranteed capacity based on active queues
@@ -361,14 +399,21 @@ public class ProportionalCapacityPreempt
* @param queues the list of queues to consider
*/
private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
- List<TempQueue> queues) {
+ Collection<TempQueue> queues, boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
- for (TempQueue q : queues) {
- Resources.addTo(activeCap, q.guaranteed);
- }
- for (TempQueue q : queues) {
- q.normalizedGuarantee = Resources.divide(rc, clusterResource,
- q.guaranteed, activeCap);
+
+ if (ignoreGuar) {
+ for (TempQueue q : queues) {
+ q.normalizedGuarantee = (float) 1.0f / ((float) queues.size());
+ }
+ } else {
+ for (TempQueue q : queues) {
+ Resources.addTo(activeCap, q.guaranteed);
+ }
+ for (TempQueue q : queues) {
+ q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+ q.guaranteed, activeCap);
+ }
}
}
@@ -515,18 +560,25 @@ public class ProportionalCapacityPreempt
private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
TempQueue ret;
synchronized (root) {
- float absUsed = root.getAbsoluteUsedCapacity();
+ String queueName = root.getQueueName();
+ float absUsed = root.getAbsoluteUsedCapacity();
+ float absCap = root.getAbsoluteCapacity();
+ float absMaxCap = root.getAbsoluteMaximumCapacity();
+
Resource current = Resources.multiply(clusterResources, absUsed);
- Resource guaranteed =
- Resources.multiply(clusterResources, root.getAbsoluteCapacity());
+ Resource guaranteed = Resources.multiply(clusterResources, absCap);
+ Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
if (root instanceof LeafQueue) {
LeafQueue l = (LeafQueue) root;
Resource pending = l.getTotalResourcePending();
- ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+ ret = new TempQueue(queueName, current, pending, guaranteed,
+ maxCapacity);
+
ret.setLeafQueue(l);
} else {
Resource pending = Resource.newInstance(0, 0);
- ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+ ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
+ maxCapacity);
for (CSQueue c : root.getChildQueues()) {
ret.addChild(cloneQueues(c, clusterResources));
}
@@ -563,6 +615,7 @@ public class ProportionalCapacityPreempt
final Resource current;
final Resource pending;
final Resource guaranteed;
+ final Resource maxCapacity;
Resource idealAssigned;
Resource toBePreempted;
Resource actuallyPreempted;
@@ -573,11 +626,12 @@ public class ProportionalCapacityPreempt
LeafQueue leafQueue;
TempQueue(String queueName, Resource current, Resource pending,
- Resource guaranteed) {
+ Resource guaranteed, Resource maxCapacity) {
this.queueName = queueName;
this.current = current;
this.pending = pending;
this.guaranteed = guaranteed;
+ this.maxCapacity = maxCapacity;
this.idealAssigned = Resource.newInstance(0, 0);
this.actuallyPreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0);
@@ -614,12 +668,12 @@ public class ProportionalCapacityPreempt
// the unused ones
Resource offer(Resource avail, ResourceCalculator rc,
Resource clusterResource) {
- // remain = avail - min(avail, current + pending - assigned)
- Resource accepted = Resources.min(rc, clusterResource,
- avail,
- Resources.subtract(
- Resources.add(current, pending),
- idealAssigned));
+ // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
+ Resource accepted =
+ Resources.min(rc, clusterResource,
+ Resources.subtract(maxCapacity, idealAssigned),
+ Resources.min(rc, clusterResource, avail, Resources.subtract(
+ Resources.add(current, pending), idealAssigned)));
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
@@ -628,13 +682,15 @@ public class ProportionalCapacityPreempt
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("CUR: ").append(current)
+ sb.append(" NAME: " + queueName)
+ .append(" CUR: ").append(current)
.append(" PEN: ").append(pending)
.append(" GAR: ").append(guaranteed)
.append(" NORM: ").append(normalizedGuarantee)
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
- .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted);
+ .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted)
+ .append("\n");
return sb.toString();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1594414&r1=1594413&r2=1594414&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java Tue May 13 23:15:27 2014
@@ -115,6 +115,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 0, 0, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -133,6 +134,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C D
{ 100, 10, 40, 20, 30 }, // abs
+ { 100, 100, 100, 100, 100 }, // maxCap
{ 100, 30, 60, 10, 0 }, // used
{ 45, 20, 5, 20, 0 }, // pending
{ 0, 0, 0, 0, 0 }, // reserved
@@ -144,12 +146,33 @@ public class TestProportionalCapacityPre
policy.editSchedule();
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
+
+ @Test
+ public void testMaxCap() {
+ int[][] qData = new int[][]{
+ // / A B C
+ { 100, 40, 40, 20 }, // abs
+ { 100, 100, 45, 100 }, // maxCap
+ { 100, 55, 45, 0 }, // used
+ { 20, 10, 10, 0 }, // pending
+ { 0, 0, 0, 0 }, // reserved
+ { 2, 1, 1, 0 }, // apps
+ { -1, 1, 1, 0 }, // req granularity
+ { 3, 0, 0, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // despite the imbalance, since B is at maxCap, do not correct
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+ }
+
@Test
public void testPreemptCycle() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -169,6 +192,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -205,6 +229,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 39, 43, 21 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -224,6 +249,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -242,6 +268,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -261,6 +288,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 90, 10, 0 }, // used
{ 80, 10, 20, 50 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -280,6 +308,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 50, 50, 100, 10, 90 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 200, 110, 60, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
@@ -295,10 +324,54 @@ public class TestProportionalCapacityPre
}
@Test
+ public void testZeroGuar() {
+ int[][] qData = new int[][] {
+ // / A B C D E F
+ { 200, 100, 0, 99, 100, 10, 90 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
+ { 170, 80, 60, 20, 90, 90, 0 }, // used
+ { 10, 0, 0, 0, 10, 0, 10 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ { 4, 2, 1, 1, 2, 1, 1 }, // apps
+ { -1, -1, 1, 1, -1, 1, 1 }, // req granularity
+ { 2, 2, 0, 0, 2, 0, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // verify capacity taken from A1, not B1 despite B1 being far over
+ // its absolute guaranteed capacity
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+ }
+
+ @Test
+ public void testZeroGuarOverCap() {
+ int[][] qData = new int[][] {
+ // / A B C D E F
+ { 200, 100, 0, 99, 0, 100, 100 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
+ { 170, 170, 60, 20, 90, 0, 0 }, // used
+ { 85, 50, 30, 10, 10, 20, 20 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ { 4, 3, 1, 1, 1, 1, 1 }, // apps
+ { -1, -1, 1, 1, 1, -1, 1 }, // req granularity
+ { 2, 3, 0, 0, 0, 1, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // we verify both that C has priority on B and D (has it has >0 guarantees)
+ // and that B and D are force to share their over capacity fairly (as they
+ // are both zero-guarantees) hence D sees some of its containers preempted
+ verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ }
+
+
+
+ @Test
public void testHierarchicalLarge() {
int[][] qData = new int[][] {
// / A B C D E F G H I
- { 400, 200, 60,140, 100, 70, 30, 100, 10, 90 }, // abs
+ { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
+ { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap
{ 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
@@ -382,24 +455,25 @@ public class TestProportionalCapacityPre
when(mCS.getRootQueue()).thenReturn(mRoot);
Resource clusterResources =
- Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0);
+ Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
when(mCS.getClusterResources()).thenReturn(clusterResources);
return policy;
}
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
int[] abs = queueData[0];
- int[] used = queueData[1];
- int[] pending = queueData[2];
- int[] reserved = queueData[3];
- int[] apps = queueData[4];
- int[] gran = queueData[5];
- int[] queues = queueData[6];
+ int[] maxCap = queueData[1];
+ int[] used = queueData[2];
+ int[] pending = queueData[3];
+ int[] reserved = queueData[4];
+ int[] apps = queueData[5];
+ int[] gran = queueData[6];
+ int[] queues = queueData[7];
- return mockNested(abs, used, pending, reserved, apps, gran, queues);
+ return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
}
- ParentQueue mockNested(int[] abs, int[] used,
+ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
float tot = leafAbsCapacities(abs, queues);
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
@@ -407,6 +481,8 @@ public class TestProportionalCapacityPre
when(root.getQueueName()).thenReturn("/");
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
+ when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
final ParentQueue p = pqs.removeLast();
@@ -420,6 +496,7 @@ public class TestProportionalCapacityPre
when(q.getQueueName()).thenReturn(queueName);
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
+ when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
}
assert 0 == pqs.size();
return root;
@@ -439,7 +516,7 @@ public class TestProportionalCapacityPre
return pq;
}
- LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
+ LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
LeafQueue lq = mock(LeafQueue.class);
when(lq.getTotalResourcePending()).thenReturn(