You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sa...@apache.org on 2014/01/31 01:14:47 UTC
svn commit: r1563021 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-s...
Author: sandy
Date: Fri Jan 31 00:14:47 2014
New Revision: 1563021
URL: http://svn.apache.org/r1563021
Log:
YARN-1498. Common scheduler changes for moving apps between queues (Sandy Ryza)
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1563021&r1=1563020&r2=1563021&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Jan 31 00:14:47 2014
@@ -9,6 +9,9 @@ Trunk - Unreleased
YARN-1496. Protocol additions to allow moving apps between queues (Sandy
Ryza)
+ YARN-1498. Common scheduler changes for moving apps between queues (Sandy
+ Ryza)
+
IMPROVEMENTS
OPTIMIZATIONS
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1563021&r1=1563020&r2=1563021&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Jan 31 00:14:47 2014
@@ -64,7 +64,7 @@ public class AppSchedulingInfo {
private Set<String> blacklist = new HashSet<String>();
//private final ApplicationStore store;
- private final ActiveUsersManager activeUsersManager;
+ private ActiveUsersManager activeUsersManager;
/* Allocated by scheduler */
boolean pending = true; // for app metrics
@@ -171,11 +171,10 @@ public class AppSchedulingInfo {
.getNumContainers() : 0;
Resource lastRequestCapability = lastRequest != null ? lastRequest
.getCapability() : Resources.none();
- metrics.incrPendingResources(user, request.getNumContainers()
- - lastRequestContainers, Resources.subtractFrom( // save a clone
- Resources.multiply(request.getCapability(), request
- .getNumContainers()), Resources.multiply(lastRequestCapability,
- lastRequestContainers)));
+ metrics.incrPendingResources(user, request.getNumContainers(),
+ request.getCapability());
+ metrics.decrPendingResources(user, lastRequestContainers,
+ lastRequestCapability);
}
}
}
@@ -262,6 +261,7 @@ public class AppSchedulingInfo {
pending = false;
metrics.runAppAttempt(applicationId, user);
}
+
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationId=" + applicationId
+ " container=" + container.getId()
@@ -269,7 +269,7 @@ public class AppSchedulingInfo {
+ " user=" + user
+ " resource=" + request.getCapability());
}
- metrics.allocateResources(user, 1, request.getCapability());
+ metrics.allocateResources(user, 1, request.getCapability(), true);
}
/**
@@ -359,6 +359,26 @@ public class AppSchedulingInfo {
}
}
+ synchronized public void move(Queue newQueue) {
+ QueueMetrics oldMetrics = queue.getMetrics();
+ QueueMetrics newMetrics = newQueue.getMetrics();
+ for (Map<String, ResourceRequest> asks : requests.values()) {
+ ResourceRequest request = asks.get(ResourceRequest.ANY);
+ if (request != null) {
+ oldMetrics.decrPendingResources(user, request.getNumContainers(),
+ request.getCapability());
+ newMetrics.incrPendingResources(user, request.getNumContainers(),
+ request.getCapability());
+ }
+ }
+ oldMetrics.moveAppFrom(this);
+ newMetrics.moveAppTo(this);
+ activeUsersManager.deactivateApplication(user, applicationId);
+ activeUsersManager = newQueue.getActiveUsersManager();
+ activeUsersManager.activateApplication(user, applicationId);
+ this.queue = newQueue;
+ }
+
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
// clear pending resources metrics for the application
QueueMetrics metrics = queue.getMetrics();
@@ -366,8 +386,7 @@ public class AppSchedulingInfo {
ResourceRequest request = asks.get(ResourceRequest.ANY);
if (request != null) {
metrics.decrPendingResources(user, request.getNumContainers(),
- Resources.multiply(request.getCapability(), request
- .getNumContainers()));
+ request.getCapability());
}
}
metrics.finishAppAttempt(applicationId, pending, user);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1563021&r1=1563020&r2=1563021&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Fri Jan 31 00:14:47 2014
@@ -58,4 +58,6 @@ public interface Queue {
List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user);
boolean hasAccess(QueueACL acl, UserGroupInformation user);
+
+ public ActiveUsersManager getActiveUsersManager();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1563021&r1=1563020&r2=1563021&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Fri Jan 31 00:14:47 2014
@@ -280,6 +280,36 @@ public class QueueMetrics implements Met
parent.finishApp(user, rmAppFinalState);
}
}
+
+ public void moveAppFrom(AppSchedulingInfo app) {
+ if (app.isPending()) {
+ appsPending.decr();
+ } else {
+ appsRunning.decr();
+ }
+ QueueMetrics userMetrics = getUserMetrics(app.getUser());
+ if (userMetrics != null) {
+ userMetrics.moveAppFrom(app);
+ }
+ if (parent != null) {
+ parent.moveAppFrom(app);
+ }
+ }
+
+ public void moveAppTo(AppSchedulingInfo app) {
+ if (app.isPending()) {
+ appsPending.incr();
+ } else {
+ appsRunning.incr();
+ }
+ QueueMetrics userMetrics = getUserMetrics(app.getUser());
+ if (userMetrics != null) {
+ userMetrics.moveAppTo(app);
+ }
+ if (parent != null) {
+ parent.moveAppTo(app);
+ }
+ }
/**
* Set available resources. To be called by scheduler periodically as
@@ -324,8 +354,8 @@ public class QueueMetrics implements Met
private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
- pendingMB.incr(res.getMemory());
- pendingVCores.incr(res.getVirtualCores());
+ pendingMB.incr(res.getMemory() * containers);
+ pendingVCores.incr(res.getVirtualCores() * containers);
}
public void decrPendingResources(String user, int containers, Resource res) {
@@ -341,22 +371,25 @@ public class QueueMetrics implements Met
private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
- pendingMB.decr(res.getMemory());
- pendingVCores.decr(res.getVirtualCores());
+ pendingMB.decr(res.getMemory() * containers);
+ pendingVCores.decr(res.getVirtualCores() * containers);
}
- public void allocateResources(String user, int containers, Resource res) {
+ public void allocateResources(String user, int containers, Resource res,
+ boolean decrPending) {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemory() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
- _decrPendingResources(containers, Resources.multiply(res, containers));
+ if (decrPending) {
+ _decrPendingResources(containers, res);
+ }
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
- userMetrics.allocateResources(user, containers, res);
+ userMetrics.allocateResources(user, containers, res, decrPending);
}
if (parent != null) {
- parent.allocateResources(user, containers, res);
+ parent.allocateResources(user, containers, res, decrPending);
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1563021&r1=1563020&r2=1563021&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Fri Jan 31 00:14:47 2014
@@ -57,7 +57,7 @@ import com.google.common.collect.Multise
*/
@Private
@Unstable
-public abstract class SchedulerApplicationAttempt {
+public class SchedulerApplicationAttempt {
private static final Log LOG = LogFactory
.getLog(SchedulerApplicationAttempt.class);
@@ -91,7 +91,7 @@ public abstract class SchedulerApplicati
protected Map<Priority, Long> lastScheduledContainer =
new HashMap<Priority, Long>();
- protected final Queue queue;
+ protected Queue queue;
protected boolean isStopped = false;
protected final RMContext rmContext;
@@ -431,4 +431,25 @@ public abstract class SchedulerApplicati
this.appSchedulingInfo
.transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
}
+
+ public void move(Queue newQueue) {
+ QueueMetrics oldMetrics = queue.getMetrics();
+ QueueMetrics newMetrics = newQueue.getMetrics();
+ String user = getUser();
+ for (RMContainer liveContainer : liveContainers.values()) {
+ Resource resource = liveContainer.getContainer().getResource();
+ oldMetrics.releaseResources(user, 1, resource);
+ newMetrics.allocateResources(user, 1, resource, false);
+ }
+ for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
+ for (RMContainer reservedContainer : map.values()) {
+ Resource resource = reservedContainer.getReservedResource();
+ oldMetrics.unreserveResource(user, resource);
+ newMetrics.reserveResource(user, resource);
+ }
+ }
+
+ appSchedulingInfo.move(newQueue);
+ this.queue = newQueue;
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1563021&r1=1563020&r2=1563021&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Fri Jan 31 00:14:47 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -54,11 +55,14 @@ public class FSLeafQueue extends FSQueue
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
+ private final ActiveUsersManager activeUsersManager;
+
public FSLeafQueue(String name, FairScheduler scheduler,
FSParentQueue parent) {
super(name, scheduler, parent);
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
+ activeUsersManager = new ActiveUsersManager(getMetrics());
}
public void addApp(FSSchedulerApp app, boolean runnable) {
@@ -245,4 +249,9 @@ public class FSLeafQueue extends FSQueue
public int getNumRunnableApps() {
return runnableAppScheds.size();
}
+
+ @Override
+ public ActiveUsersManager getActiveUsersManager() {
+ return activeUsersManager;
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1563021&r1=1563020&r2=1563021&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Jan 31 00:14:47 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@Private
@Unstable
@@ -194,4 +194,10 @@ public class FSParentQueue extends FSQue
childQueue.collectSchedulerApplications(apps);
}
}
+
+ @Override
+ public ActiveUsersManager getActiveUsersManager() {
+ // Should never be called since all applications are submitted to LeafQueues
+ return null;
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1563021&r1=1563020&r2=1563021&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Jan 31 00:14:47 2014
@@ -184,6 +184,11 @@ public class FifoScheduler extends Abstr
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return getQueueAcls().get(acl).isUserAllowed(user);
}
+
+ @Override
+ public ActiveUsersManager getActiveUsersManager() {
+ return activeUsersManager;
+ }
};
@Override
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1563021&r1=1563020&r2=1563021&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Fri Jan 31 00:14:47 2014
@@ -73,7 +73,7 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
- metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
+ metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
@@ -81,7 +81,7 @@ public class TestQueueMetrics {
metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
- metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+ metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
@@ -171,7 +171,7 @@ public class TestQueueMetrics {
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
- metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
+ metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
@@ -181,7 +181,7 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
- metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+ metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
@@ -232,7 +232,7 @@ public class TestQueueMetrics {
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
- metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
+ metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
@@ -242,7 +242,7 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
- metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+ metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
metrics.reserveResource(user, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java?rev=1563021&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java Fri Jan 31 00:14:47 2014
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestSchedulerApplicationAttempt {
+
+ private static final NodeId nodeId = NodeId.newInstance("somehost", 5);
+
+ private Configuration conf = new Configuration();
+
+ @After
+ public void tearDown() {
+ QueueMetrics.clearQueueMetrics();
+ DefaultMetricsSystem.shutdown();
+ }
+
+ @Test
+ public void testMove() {
+ final String user = "user1";
+ Queue parentQueue = createQueue("parent", null);
+ Queue oldQueue = createQueue("old", parentQueue);
+ Queue newQueue = createQueue("new", parentQueue);
+ QueueMetrics parentMetrics = parentQueue.getMetrics();
+ QueueMetrics oldMetrics = oldQueue.getMetrics();
+ QueueMetrics newMetrics = newQueue.getMetrics();
+
+ ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
+ SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
+ user, oldQueue, oldQueue.getActiveUsersManager(), null);
+ oldMetrics.submitApp(user);
+
+ // Resource request
+ Resource requestedResource = Resource.newInstance(1536, 2);
+ Priority requestedPriority = Priority.newInstance(2);
+ ResourceRequest request = ResourceRequest.newInstance(requestedPriority,
+ ResourceRequest.ANY, requestedResource, 3);
+ app.updateResourceRequests(Arrays.asList(request));
+
+ // Allocated container
+ RMContainer container1 = createRMContainer(appAttId, 1, requestedResource);
+ app.liveContainers.put(container1.getContainerId(), container1);
+ SchedulerNode node = createNode();
+ app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node, requestedPriority,
+ request, container1.getContainer());
+
+ // Reserved container
+ Priority prio1 = Priority.newInstance(1);
+ Resource reservedResource = Resource.newInstance(2048, 3);
+ RMContainer container2 = createReservedRMContainer(appAttId, 1, reservedResource,
+ node.getNodeID(), prio1);
+ Map<NodeId, RMContainer> reservations = new HashMap<NodeId, RMContainer>();
+ reservations.put(node.getNodeID(), container2);
+ app.reservedContainers.put(prio1, reservations);
+ oldMetrics.reserveResource(user, reservedResource);
+
+ checkQueueMetrics(oldMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
+ checkQueueMetrics(newMetrics, 0, 0, 0, 0, 0, 0, 0, 0);
+ checkQueueMetrics(parentMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
+
+ app.move(newQueue);
+
+ checkQueueMetrics(oldMetrics, 0, 0, 0, 0, 0, 0, 0, 0);
+ checkQueueMetrics(newMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
+ checkQueueMetrics(parentMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
+ }
+
+ private void checkQueueMetrics(QueueMetrics metrics, int activeApps,
+ int runningApps, int allocMb, int allocVcores, int reservedMb,
+ int reservedVcores, int pendingMb, int pendingVcores) {
+ assertEquals(activeApps, metrics.getActiveApps());
+ assertEquals(runningApps, metrics.getAppsRunning());
+ assertEquals(allocMb, metrics.getAllocatedMB());
+ assertEquals(allocVcores, metrics.getAllocatedVirtualCores());
+ assertEquals(reservedMb, metrics.getReservedMB());
+ assertEquals(reservedVcores, metrics.getReservedVirtualCores());
+ assertEquals(pendingMb, metrics.getPendingMB());
+ assertEquals(pendingVcores, metrics.getPendingVirtualCores());
+ }
+
+ private SchedulerNode createNode() {
+ SchedulerNode node = mock(SchedulerNode.class);
+ when(node.getNodeName()).thenReturn("somehost");
+ when(node.getRackName()).thenReturn("somerack");
+ when(node.getNodeID()).thenReturn(nodeId);
+ return node;
+ }
+
+ private RMContainer createReservedRMContainer(ApplicationAttemptId appAttId,
+ int id, Resource resource, NodeId nodeId, Priority reservedPriority) {
+ RMContainer container = createRMContainer(appAttId, id, resource);
+ when(container.getReservedResource()).thenReturn(resource);
+ when(container.getReservedPriority()).thenReturn(reservedPriority);
+ when(container.getReservedNode()).thenReturn(nodeId);
+ return container;
+ }
+
+ private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id,
+ Resource resource) {
+ ContainerId containerId = ContainerId.newInstance(appAttId, id);
+ RMContainer rmContainer = mock(RMContainer.class);
+ Container container = mock(Container.class);
+ when(container.getResource()).thenReturn(resource);
+ when(container.getNodeId()).thenReturn(nodeId);
+ when(rmContainer.getContainer()).thenReturn(container);
+ when(rmContainer.getContainerId()).thenReturn(containerId);
+ return rmContainer;
+ }
+
+ private Queue createQueue(String name, Queue parent) {
+ QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
+ ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
+ Queue queue = mock(Queue.class);
+ when(queue.getMetrics()).thenReturn(metrics);
+ when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
+ return queue;
+ }
+
+ private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
+ ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
+ ApplicationAttemptId attId =
+ ApplicationAttemptId.newInstance(appIdImpl, attemptId);
+ return attId;
+ }
+}