You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by wa...@apache.org on 2014/05/16 20:50:59 UTC
svn commit: r1595303 - in
/hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-se...
Author: wang
Date: Fri May 16 18:50:52 2014
New Revision: 1595303
URL: http://svn.apache.org/r1595303
Log:
Merge trunk r1595301 to branch
Removed:
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/HistoryServerRest.apt.vm
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/MapredAppMasterRest.apt.vm
Modified:
hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/CHANGES.txt Fri May 16 18:50:52 2014
@@ -26,6 +26,9 @@ Release 2.5.0 - UNRELEASED
YARN-1864. Fair Scheduler Dynamic Hierarchical User Queues (Ashwin Shankar
via Sandy Ryza)
+ YARN-1362. Distinguish between nodemanager shutdown for decommission vs shutdown
+ for restart. (Jason Lowe via junping_du)
+
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
@@ -108,11 +111,8 @@ Release 2.5.0 - UNRELEASED
YARN-2011. Fix typo and warning in TestLeafQueue (Chen He via junping_du)
- YARN-1976. Fix yarn application CLI to print the scheme of the tracking url
- of failed/killed applications. (Junping Du via jianhe)
-
- YARN-2016. Fix a bug in GetApplicationsRequestPBImpl to add the missed fields
- to proto. (Junping Du via jianhe)
+ YARN-2042. String shouldn't be compared using == in
+ QueuePlacementRule#NestedUserQueue#getQueueForApp (Chen He via Sandy Ryza)
Release 2.4.1 - UNRELEASED
@@ -216,6 +216,21 @@ Release 2.4.1 - UNRELEASED
causing both RMs to be stuck in standby mode when automatic failover is
enabled. (Karthik Kambatla and Xuan Gong via vinodkv)
+ YARN-1957. Consider the max capacity of the queue when computing the ideal
+ capacity for preemption. (Carlo Curino via cdouglas)
+
+ YARN-1986. In Fifo Scheduler, node heartbeat in between creating app and
+ attempt causes NPE (Hong Zhiguo via Sandy Ryza)
+
+ YARN-1976. Fix yarn application CLI to print the scheme of the tracking url
+ of failed/killed applications. (Junping Du via jianhe)
+
+ YARN-2016. Fix a bug in GetApplicationsRequestPBImpl to add the missed fields
+ to proto. (Junping Du via jianhe)
+
+ YARN-2053. Fixed a bug in AMS to not add null NMToken into NMTokens list from
+ previous attempts for work-preserving AM restart. (Wangda Tan via jianhe)
+
Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java Fri May 16 18:50:52 2014
@@ -66,4 +66,8 @@ public interface Context {
LocalDirsHandlerService getLocalDirsHandler();
ApplicationACLsManager getApplicationACLsManager();
+
+ boolean getDecommissioned();
+
+ void setDecommissioned(boolean isDecommissioned);
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Fri May 16 18:50:52 2014
@@ -272,7 +272,8 @@ public class NodeManager extends Composi
private WebServer webServer;
private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
-
+ private boolean isDecommissioned = false;
+
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager) {
@@ -349,6 +350,16 @@ public class NodeManager extends Composi
public ApplicationACLsManager getApplicationACLsManager() {
return aclsManager;
}
+
+ @Override
+ public boolean getDecommissioned() {
+ return isDecommissioned;
+ }
+
+ @Override
+ public void setDecommissioned(boolean isDecommissioned) {
+ this.isDecommissioned = isDecommissioned;
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Fri May 16 18:50:52 2014
@@ -493,6 +493,7 @@ public class NodeStatusUpdaterImpl exten
+ " hence shutting down.");
LOG.warn("Message from ResourceManager: "
+ response.getDiagnosticsMessage());
+ context.setDecommissioned(true);
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break;
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Fri May 16 18:50:52 2014
@@ -930,6 +930,7 @@ public class TestNodeStatusUpdater {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID < 1);
+ Assert.assertTrue(nm.getNMContext().getDecommissioned());
// NM takes a while to reach the STOPPED state.
waitCount = 0;
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Fri May 16 18:50:52 2014
@@ -298,9 +298,12 @@ public class ApplicationMasterService ex
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Container container : transferredContainers) {
try {
- nmTokens.add(rmContext.getNMTokenSecretManager()
- .createAndGetNMToken(app.getUser(), applicationAttemptId,
- container));
+ NMToken token = rmContext.getNMTokenSecretManager()
+ .createAndGetNMToken(app.getUser(), applicationAttemptId,
+ container);
+ if (null != token) {
+ nmTokens.add(token);
+ }
} catch (IllegalArgumentException e) {
// if it's a DNS issue, throw UnknowHostException directly and that
// will be automatically retried by RMProxy in RPC layer.
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Fri May 16 18:50:52 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -293,34 +294,31 @@ public class ProportionalCapacityPreempt
// with the total capacity for this set of queues
Resource unassigned = Resources.clone(tot_guarant);
- //assign all cluster resources until no more demand, or no resources are left
- while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
- unassigned, Resources.none())) {
- Resource wQassigned = Resource.newInstance(0, 0);
-
- // we compute normalizedGuarantees capacity based on currently active
- // queues
- resetCapacity(rc, unassigned, qAlloc);
-
- // offer for each queue their capacity first and in following invocations
- // their share of over-capacity
- for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
- TempQueue sub = i.next();
- Resource wQavail =
- Resources.multiply(unassigned, sub.normalizedGuarantee);
- Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
- Resource wQdone = Resources.subtract(wQavail, wQidle);
- // if the queue returned a value > 0 it means it is fully satisfied
- // and it is removed from the list of active queues qAlloc
- if (!Resources.greaterThan(rc, tot_guarant,
- wQdone, Resources.none())) {
- i.remove();
- }
- Resources.addTo(wQassigned, wQdone);
+ // group queues based on whether they have non-zero guaranteed capacity
+ Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
+ Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
+
+ for (TempQueue q : qAlloc) {
+ if (Resources
+ .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
+ nonZeroGuarQueues.add(q);
+ } else {
+ zeroGuarQueues.add(q);
}
- Resources.subtractFrom(unassigned, wQassigned);
}
+ // first compute the allocation as a fixpoint based on guaranteed capacity
+ computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
+ false);
+
+ // if any capacity is left unassigned, distributed among zero-guarantee
+ // queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
+ if (!zeroGuarQueues.isEmpty()
+ && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
+ computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
+ true);
+ }
+
// based on ideal assignment computed above and current assignment we derive
// how much preemption is required overall
Resource totPreemptionNeeded = Resource.newInstance(0, 0);
@@ -353,6 +351,46 @@ public class ProportionalCapacityPreempt
}
}
+
+ /**
+ * Given a set of queues compute the fix-point distribution of unassigned
+ * resources among them. As pending request of a queue are exhausted, the
+ * queue is removed from the set and remaining capacity redistributed among
+ * remaining queues. The distribution is weighted based on guaranteed
+ * capacity, unless asked to ignoreGuarantee, in which case resources are
+ * distributed uniformly.
+ */
+ private void computeFixpointAllocation(ResourceCalculator rc,
+ Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned,
+ boolean ignoreGuarantee) {
+ //assign all cluster resources until no more demand, or no resources are left
+ while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
+ unassigned, Resources.none())) {
+ Resource wQassigned = Resource.newInstance(0, 0);
+
+ // we compute normalizedGuarantees capacity based on currently active
+ // queues
+ resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
+
+ // offer for each queue their capacity first and in following invocations
+ // their share of over-capacity
+ for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+ TempQueue sub = i.next();
+ Resource wQavail =
+ Resources.multiply(unassigned, sub.normalizedGuarantee);
+ Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
+ Resource wQdone = Resources.subtract(wQavail, wQidle);
+ // if the queue returned a value > 0 it means it is fully satisfied
+ // and it is removed from the list of active queues qAlloc
+ if (!Resources.greaterThan(rc, tot_guarant,
+ wQdone, Resources.none())) {
+ i.remove();
+ }
+ Resources.addTo(wQassigned, wQdone);
+ }
+ Resources.subtractFrom(unassigned, wQassigned);
+ }
+ }
/**
* Computes a normalizedGuaranteed capacity based on active queues
@@ -361,14 +399,21 @@ public class ProportionalCapacityPreempt
* @param queues the list of queues to consider
*/
private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
- List<TempQueue> queues) {
+ Collection<TempQueue> queues, boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
- for (TempQueue q : queues) {
- Resources.addTo(activeCap, q.guaranteed);
- }
- for (TempQueue q : queues) {
- q.normalizedGuarantee = Resources.divide(rc, clusterResource,
- q.guaranteed, activeCap);
+
+ if (ignoreGuar) {
+ for (TempQueue q : queues) {
+ q.normalizedGuarantee = (float) 1.0f / ((float) queues.size());
+ }
+ } else {
+ for (TempQueue q : queues) {
+ Resources.addTo(activeCap, q.guaranteed);
+ }
+ for (TempQueue q : queues) {
+ q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+ q.guaranteed, activeCap);
+ }
}
}
@@ -515,18 +560,25 @@ public class ProportionalCapacityPreempt
private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
TempQueue ret;
synchronized (root) {
- float absUsed = root.getAbsoluteUsedCapacity();
+ String queueName = root.getQueueName();
+ float absUsed = root.getAbsoluteUsedCapacity();
+ float absCap = root.getAbsoluteCapacity();
+ float absMaxCap = root.getAbsoluteMaximumCapacity();
+
Resource current = Resources.multiply(clusterResources, absUsed);
- Resource guaranteed =
- Resources.multiply(clusterResources, root.getAbsoluteCapacity());
+ Resource guaranteed = Resources.multiply(clusterResources, absCap);
+ Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
if (root instanceof LeafQueue) {
LeafQueue l = (LeafQueue) root;
Resource pending = l.getTotalResourcePending();
- ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+ ret = new TempQueue(queueName, current, pending, guaranteed,
+ maxCapacity);
+
ret.setLeafQueue(l);
} else {
Resource pending = Resource.newInstance(0, 0);
- ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+ ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
+ maxCapacity);
for (CSQueue c : root.getChildQueues()) {
ret.addChild(cloneQueues(c, clusterResources));
}
@@ -563,6 +615,7 @@ public class ProportionalCapacityPreempt
final Resource current;
final Resource pending;
final Resource guaranteed;
+ final Resource maxCapacity;
Resource idealAssigned;
Resource toBePreempted;
Resource actuallyPreempted;
@@ -573,11 +626,12 @@ public class ProportionalCapacityPreempt
LeafQueue leafQueue;
TempQueue(String queueName, Resource current, Resource pending,
- Resource guaranteed) {
+ Resource guaranteed, Resource maxCapacity) {
this.queueName = queueName;
this.current = current;
this.pending = pending;
this.guaranteed = guaranteed;
+ this.maxCapacity = maxCapacity;
this.idealAssigned = Resource.newInstance(0, 0);
this.actuallyPreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0);
@@ -614,12 +668,12 @@ public class ProportionalCapacityPreempt
// the unused ones
Resource offer(Resource avail, ResourceCalculator rc,
Resource clusterResource) {
- // remain = avail - min(avail, current + pending - assigned)
- Resource accepted = Resources.min(rc, clusterResource,
- avail,
- Resources.subtract(
- Resources.add(current, pending),
- idealAssigned));
+ // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
+ Resource accepted =
+ Resources.min(rc, clusterResource,
+ Resources.subtract(maxCapacity, idealAssigned),
+ Resources.min(rc, clusterResource, avail, Resources.subtract(
+ Resources.add(current, pending), idealAssigned)));
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
@@ -628,13 +682,15 @@ public class ProportionalCapacityPreempt
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("CUR: ").append(current)
+ sb.append(" NAME: " + queueName)
+ .append(" CUR: ").append(current)
.append(" PEN: ").append(pending)
.append(" GAR: ").append(guaranteed)
.append(" NORM: ").append(normalizedGuarantee)
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
- .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted);
+ .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted)
+ .append("\n");
return sb.toString();
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java Fri May 16 18:50:52 2014
@@ -227,7 +227,7 @@ public abstract class QueuePlacementRule
String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
groups, configuredQueues);
- if (queueName != null && queueName != "") {
+ if (queueName != null && queueName.length() != 0) {
if (!queueName.startsWith("root.")) {
queueName = "root." + queueName;
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri May 16 18:50:52 2014
@@ -360,7 +360,8 @@ public class FifoScheduler extends Abstr
return nodes.get(nodeId);
}
- private synchronized void addApplication(ApplicationId applicationId,
+ @VisibleForTesting
+ public synchronized void addApplication(ApplicationId applicationId,
String queue, String user) {
SchedulerApplication application =
new SchedulerApplication(DEFAULT_QUEUE, user);
@@ -372,7 +373,8 @@ public class FifoScheduler extends Abstr
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
- private synchronized void
+ @VisibleForTesting
+ public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt) {
SchedulerApplication application =
@@ -458,6 +460,9 @@ public class FifoScheduler extends Abstr
.entrySet()) {
FiCaSchedulerApp application =
(FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
+ if (application == null) {
+ continue;
+ }
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
@@ -497,6 +502,9 @@ public class FifoScheduler extends Abstr
for (SchedulerApplication application : applications.values()) {
FiCaSchedulerApp attempt =
(FiCaSchedulerApp) application.getCurrentAppAttempt();
+ if (attempt == null) {
+ continue;
+ }
attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
}
}
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Fri May 16 18:50:52 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -66,7 +67,7 @@ public class TestFifoScheduler {
private final int GB = 1024;
private static YarnConfiguration conf;
-
+
@BeforeClass
public static void setup() {
conf = new YarnConfiguration();
@@ -213,6 +214,32 @@ public class TestFifoScheduler {
rm.stop();
}
+ @Test
+ public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
+ FifoScheduler scheduler = new FifoScheduler();
+ MockRM rm = new MockRM(conf);
+ scheduler.reinitialize(conf, rm.getRMContext());
+
+ RMNode node = MockNodes.newNodeInfo(1,
+ Resources.createResource(1024, 4), 1, "127.0.0.1");
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ scheduler.addApplication(appId, "queue1", "user1");
+
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ try {
+ scheduler.handle(updateEvent);
+ } catch (NullPointerException e) {
+ Assert.fail();
+ }
+
+ ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
+ scheduler.addApplicationAttempt(attId, false);
+
+ rm.stop();
+ }
+
private void testMinimumAllocation(YarnConfiguration conf, int testAlloc)
throws Exception {
MockRM rm = new MockRM(conf);
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Fri May 16 18:50:52 2014
@@ -264,31 +264,36 @@ public class TestAMRestart {
nm2.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
- int NUM_CONTAINERS = 1;
List<Container> containers = new ArrayList<Container>();
// nmTokens keeps track of all the nmTokens issued in the allocate call.
List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
- // am1 allocate 1 container on nm1.
+ // am1 allocate 2 container on nm1.
+ // first container
while (true) {
AllocateResponse response =
- am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS,
+ am1.allocate("127.0.0.1", 2000, 2,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
containers.addAll(response.getAllocatedContainers());
expectedNMTokens.addAll(response.getNMTokens());
- if (containers.size() == NUM_CONTAINERS) {
+ if (containers.size() == 2) {
break;
}
Thread.sleep(200);
System.out.println("Waiting for container to be allocated.");
}
- // launch the container
+ // launch the container-2
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
-
+ // launch the container-3
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING);
+ ContainerId containerId3 =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 3);
+ rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
+
// fail am1
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am1.waitForState(RMAppAttemptState.FAILED);
@@ -308,12 +313,12 @@ public class TestAMRestart {
containers = new ArrayList<Container>();
while (true) {
AllocateResponse allocateResponse =
- am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS,
+ am2.allocate("127.1.1.1", 4000, 1,
new ArrayList<ContainerId>());
nm2.nodeHeartbeat(true);
containers.addAll(allocateResponse.getAllocatedContainers());
expectedNMTokens.addAll(allocateResponse.getNMTokens());
- if (containers.size() == NUM_CONTAINERS) {
+ if (containers.size() == 1) {
break;
}
Thread.sleep(200);
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1595303&r1=1595302&r2=1595303&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java Fri May 16 18:50:52 2014
@@ -115,6 +115,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 0, 0, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -133,6 +134,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C D
{ 100, 10, 40, 20, 30 }, // abs
+ { 100, 100, 100, 100, 100 }, // maxCap
{ 100, 30, 60, 10, 0 }, // used
{ 45, 20, 5, 20, 0 }, // pending
{ 0, 0, 0, 0, 0 }, // reserved
@@ -144,12 +146,33 @@ public class TestProportionalCapacityPre
policy.editSchedule();
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
+
+ @Test
+ public void testMaxCap() {
+ int[][] qData = new int[][]{
+ // / A B C
+ { 100, 40, 40, 20 }, // abs
+ { 100, 100, 45, 100 }, // maxCap
+ { 100, 55, 45, 0 }, // used
+ { 20, 10, 10, 0 }, // pending
+ { 0, 0, 0, 0 }, // reserved
+ { 2, 1, 1, 0 }, // apps
+ { -1, 1, 1, 0 }, // req granularity
+ { 3, 0, 0, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // despite the imbalance, since B is at maxCap, do not correct
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+ }
+
@Test
public void testPreemptCycle() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -169,6 +192,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -205,6 +229,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 39, 43, 21 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -224,6 +249,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -242,6 +268,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -261,6 +288,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
{ 100, 90, 10, 0 }, // used
{ 80, 10, 20, 50 }, // pending
{ 0, 0, 0, 0 }, // reserved
@@ -280,6 +308,7 @@ public class TestProportionalCapacityPre
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 50, 50, 100, 10, 90 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 200, 110, 60, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
@@ -295,10 +324,54 @@ public class TestProportionalCapacityPre
}
@Test
+ public void testZeroGuar() {
+ int[][] qData = new int[][] {
+ // / A B C D E F
+ { 200, 100, 0, 99, 100, 10, 90 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
+ { 170, 80, 60, 20, 90, 90, 0 }, // used
+ { 10, 0, 0, 0, 10, 0, 10 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ { 4, 2, 1, 1, 2, 1, 1 }, // apps
+ { -1, -1, 1, 1, -1, 1, 1 }, // req granularity
+ { 2, 2, 0, 0, 2, 0, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // verify capacity taken from A1, not B1 despite B1 being far over
+ // its absolute guaranteed capacity
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+ }
+
+ @Test
+ public void testZeroGuarOverCap() {
+ int[][] qData = new int[][] {
+ // / A B C D E F
+ { 200, 100, 0, 99, 0, 100, 100 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
+ { 170, 170, 60, 20, 90, 0, 0 }, // used
+ { 85, 50, 30, 10, 10, 20, 20 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ { 4, 3, 1, 1, 1, 1, 1 }, // apps
+ { -1, -1, 1, 1, 1, -1, 1 }, // req granularity
+ { 2, 3, 0, 0, 0, 1, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // we verify both that C has priority on B and D (has it has >0 guarantees)
+ // and that B and D are force to share their over capacity fairly (as they
+ // are both zero-guarantees) hence D sees some of its containers preempted
+ verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ }
+
+
+
+ @Test
public void testHierarchicalLarge() {
int[][] qData = new int[][] {
// / A B C D E F G H I
- { 400, 200, 60,140, 100, 70, 30, 100, 10, 90 }, // abs
+ { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
+ { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap
{ 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
@@ -382,24 +455,25 @@ public class TestProportionalCapacityPre
when(mCS.getRootQueue()).thenReturn(mRoot);
Resource clusterResources =
- Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0);
+ Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
when(mCS.getClusterResources()).thenReturn(clusterResources);
return policy;
}
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
int[] abs = queueData[0];
- int[] used = queueData[1];
- int[] pending = queueData[2];
- int[] reserved = queueData[3];
- int[] apps = queueData[4];
- int[] gran = queueData[5];
- int[] queues = queueData[6];
+ int[] maxCap = queueData[1];
+ int[] used = queueData[2];
+ int[] pending = queueData[3];
+ int[] reserved = queueData[4];
+ int[] apps = queueData[5];
+ int[] gran = queueData[6];
+ int[] queues = queueData[7];
- return mockNested(abs, used, pending, reserved, apps, gran, queues);
+ return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
}
- ParentQueue mockNested(int[] abs, int[] used,
+ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
float tot = leafAbsCapacities(abs, queues);
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
@@ -407,6 +481,8 @@ public class TestProportionalCapacityPre
when(root.getQueueName()).thenReturn("/");
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
+ when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
final ParentQueue p = pqs.removeLast();
@@ -420,6 +496,7 @@ public class TestProportionalCapacityPre
when(q.getQueueName()).thenReturn(queueName);
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
+ when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
}
assert 0 == pqs.size();
return root;
@@ -439,7 +516,7 @@ public class TestProportionalCapacityPre
return pq;
}
- LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
+ LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
LeafQueue lq = mock(LeafQueue.class);
when(lq.getTotalResourcePending()).thenReturn(