You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/09/05 21:49:47 UTC
svn commit: r1165403 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache...
Author: acmurthy
Date: Mon Sep 5 19:49:47 2011
New Revision: 1165403
URL: http://svn.apache.org/viewvc?rev=1165403&view=rev
Log:
MAPREDUCE-2697. Enhance CapacityScheduler to cap concurrently running applications per-queue & per-user.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Sep 5 19:49:47 2011
@@ -1,6 +1,7 @@
Hadoop MapReduce Change Log
Trunk (unreleased changes)
+
IMPROVEMENTS
MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
@@ -236,6 +237,11 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2735. Add an applications summary log to ResourceManager.
(Thomas Graves via acmurthy)
+ MAPREDUCE-2697. Enhance CapacityScheduler to cap concurrently running
+ applications per-queue & per-user. (acmurthy)
+ Configuration changes:
+ add yarn.capacity-scheduler.maximum-am-resource-percent
+
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java Mon Sep 5 19:49:47 2011
@@ -29,8 +29,6 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProtoOrBuilder;
-import org.mortbay.log.Log;
-
public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements ContainerId {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon Sep 5 19:49:47 2011
@@ -167,6 +167,11 @@ implements ResourceScheduler, CapacitySc
}
@Override
+ public Resource getClusterResources() {
+ return clusterResource;
+ }
+
+ @Override
public synchronized void reinitialize(Configuration conf,
ContainerTokenSecretManager containerTokenSecretManager, RMContext rmContext)
throws IOException {
@@ -621,6 +626,7 @@ implements ResourceScheduler, CapacitySc
private synchronized void addNode(RMNode nodeManager) {
this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+ root.updateClusterResource(clusterResource);
++numNodeManagers;
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
@@ -629,6 +635,7 @@ implements ResourceScheduler, CapacitySc
private synchronized void removeNode(RMNode nodeInfo) {
SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+ root.updateClusterResource(clusterResource);
--numNodeManagers;
// Remove running containers
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Mon Sep 5 19:49:47 2011
@@ -50,6 +50,10 @@ public class CapacitySchedulerConfigurat
PREFIX + "maximum-applications";
@Private
+ public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT =
+ PREFIX + "maximum-am-resource-percent";
+
+ @Private
public static final String QUEUES = "queues";
@Private
@@ -83,6 +87,10 @@ public class CapacitySchedulerConfigurat
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@Private
+ public static final float
+ DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f;
+
+ @Private
public static final int UNDEFINED = -1;
@Private
@@ -124,6 +132,11 @@ public class CapacitySchedulerConfigurat
return maxApplications;
}
+ public float getMaximumApplicationMasterResourcePercent() {
+ return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
+ DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
+ }
+
public int getCapacity(String queue) {
int capacity = getInt(getQueuePrefix(queue) + CAPACITY, UNDEFINED);
if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Mon Sep 5 19:49:47 2011
@@ -37,4 +37,6 @@ public interface CapacitySchedulerContex
int getNumClusterNodes();
RMContext getRMContext();
+
+ Resource getClusterResources();
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon Sep 5 19:49:47 2011
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -77,15 +78,22 @@ public class LeafQueue implements Queue
private int maxApplications;
private int maxApplicationsPerUser;
+
+ private float maxAMResourcePercent;
+ private int maxActiveApplications;
+ private int maxActiveApplicationsPerUser;
+
private Resource usedResources = Resources.createResource(0);
private float utilization = 0.0f;
private float usedCapacity = 0.0f;
private volatile int numContainers;
- Set<SchedulerApp> applications;
+ Set<SchedulerApp> activeApplications;
Map<ApplicationAttemptId, SchedulerApp> applicationsMap =
new HashMap<ApplicationAttemptId, SchedulerApp>();
+ Set<SchedulerApp> pendingApplications;
+
private final Resource minimumAllocation;
private final Resource maximumAllocation;
private final float minimumAllocationFactor;
@@ -108,6 +116,8 @@ public class LeafQueue implements Queue
private CapacitySchedulerContext scheduler;
+ final static int DEFAULT_AM_RESOURCE = 2 * 1024;
+
public LeafQueue(CapacitySchedulerContext cs,
String queueName, Queue parent,
Comparator<SchedulerApp> applicationComparator, Queue old) {
@@ -144,6 +154,15 @@ public class LeafQueue implements Queue
int maxApplicationsPerUser =
(int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
+ this.maxAMResourcePercent =
+ cs.getConfiguration().getMaximumApplicationMasterResourcePercent();
+ int maxActiveApplications =
+ computeMaxActiveApplications(cs.getClusterResources(),
+ maxAMResourcePercent, absoluteCapacity);
+ int maxActiveApplicationsPerUser =
+ computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit,
+ userLimitFactor);
+
this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
this.queueInfo.setQueueName(queueName);
this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
@@ -157,20 +176,38 @@ public class LeafQueue implements Queue
maximumCapacity, absoluteMaxCapacity,
userLimit, userLimitFactor,
maxApplications, maxApplicationsPerUser,
+ maxActiveApplications, maxActiveApplicationsPerUser,
state, acls);
LOG.info("DEBUG --- LeafQueue:" +
" name=" + queueName +
", fullname=" + getQueuePath());
- this.applications = new TreeSet<SchedulerApp>(applicationComparator);
+ this.pendingApplications =
+ new TreeSet<SchedulerApp>(applicationComparator);
+ this.activeApplications = new TreeSet<SchedulerApp>(applicationComparator);
}
+ private int computeMaxActiveApplications(Resource clusterResource,
+ float maxAMResourcePercent, float absoluteCapacity) {
+ return
+ Math.max(
+ (int)((clusterResource.getMemory() / DEFAULT_AM_RESOURCE) *
+ maxAMResourcePercent * absoluteCapacity),
+ 1);
+ }
+
+ private int computeMaxActiveApplicationsPerUser(int maxActiveApplications,
+ int userLimit, float userLimitFactor) {
+ return (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor);
+ }
+
private synchronized void setupQueueConfigs(
float capacity, float absoluteCapacity,
float maxCapacity, float absoluteMaxCapacity,
int userLimit, float userLimitFactor,
int maxApplications, int maxApplicationsPerUser,
+ int maxActiveApplications, int maxActiveApplicationsPerUser,
QueueState state, Map<QueueACL, AccessControlList> acls)
{
this.capacity = capacity;
@@ -185,6 +222,9 @@ public class LeafQueue implements Queue
this.maxApplications = maxApplications;
this.maxApplicationsPerUser = maxApplicationsPerUser;
+ this.maxActiveApplications = maxActiveApplications;
+ this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
+
this.state = state;
this.acls = acls;
@@ -269,6 +309,22 @@ public class LeafQueue implements Queue
return minimumAllocationFactor;
}
+ public int getMaxApplications() {
+ return maxApplications;
+ }
+
+ public int getMaxApplicationsPerUser() {
+ return maxApplicationsPerUser;
+ }
+
+ public int getMaximumActiveApplications() {
+ return maxActiveApplications;
+ }
+
+ public int getMaximumActiveApplicationsPerUser() {
+ return maxActiveApplicationsPerUser;
+ }
+
@Override
public synchronized float getUsedCapacity() {
return usedCapacity;
@@ -329,10 +385,34 @@ public class LeafQueue implements Queue
this.parent = parent;
}
+ @Override
public synchronized int getNumApplications() {
- return applications.size();
+ return getNumPendingApplications() + getNumActiveApplications();
+ }
+
+ public synchronized int getNumPendingApplications() {
+ return pendingApplications.size();
+ }
+
+ public synchronized int getNumActiveApplications() {
+ return activeApplications.size();
+ }
+
+ @Private
+ public synchronized int getNumApplications(String user) {
+ return getUser(user).getTotalApplications();
+ }
+
+ @Private
+ public synchronized int getNumPendingApplications(String user) {
+ return getUser(user).getPendingApplications();
}
+ @Private
+ public synchronized int getNumActiveApplications(String user) {
+ return getUser(user).getActiveApplications();
+ }
+
public synchronized int getNumContainers() {
return numContainers;
}
@@ -342,6 +422,16 @@ public class LeafQueue implements Queue
return state;
}
+ @Private
+ public int getUserLimit() {
+ return userLimit;
+ }
+
+ @Private
+ public float getUserLimitFactor() {
+ return userLimitFactor;
+ }
+
@Override
public synchronized Map<QueueACL, AccessControlList> getQueueAcls() {
return new HashMap<QueueACL, AccessControlList>(acls);
@@ -404,6 +494,8 @@ public class LeafQueue implements Queue
leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity,
leafQueue.userLimit, leafQueue.userLimitFactor,
leafQueue.maxApplications, leafQueue.maxApplicationsPerUser,
+ leafQueue.maxActiveApplications,
+ leafQueue.maxActiveApplicationsPerUser,
leafQueue.state, leafQueue.acls);
updateResource(clusterResource);
@@ -443,7 +535,7 @@ public class LeafQueue implements Queue
synchronized (this) {
// Check if the queue is accepting jobs
- if (state != QueueState.RUNNING) {
+ if (getState() != QueueState.RUNNING) {
String msg = "Queue " + getQueuePath() +
" is STOPPED. Cannot accept submission of application: " +
application.getApplicationId();
@@ -452,7 +544,7 @@ public class LeafQueue implements Queue
}
// Check submission limits for queues
- if (getNumApplications() >= maxApplications) {
+ if (getNumApplications() >= getMaxApplications()) {
String msg = "Queue " + getQueuePath() +
" already has " + getNumApplications() + " applications," +
" cannot accept submission of application: " +
@@ -463,9 +555,9 @@ public class LeafQueue implements Queue
// Check submission limits for the user on this queue
user = getUser(userName);
- if (user.getApplications() >= maxApplicationsPerUser) {
+ if (user.getTotalApplications() >= getMaxApplicationsPerUser()) {
String msg = "Queue " + getQueuePath() +
- " already has " + user.getApplications() +
+ " already has " + user.getTotalApplications() +
" applications from user " + userName +
" cannot accept submission of application: " +
application.getApplicationId();
@@ -490,17 +582,46 @@ public class LeafQueue implements Queue
}
}
+ private synchronized void activateApplications() {
+ for (Iterator<SchedulerApp> i=pendingApplications.iterator();
+ i.hasNext(); ) {
+ SchedulerApp application = i.next();
+
+ // Check queue limit
+ if (getNumActiveApplications() >= getMaximumActiveApplications()) {
+ break;
+ }
+
+ // Check user limit
+ User user = getUser(application.getUser());
+ if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) {
+ user.activateApplication();
+ activeApplications.add(application);
+ i.remove();
+ LOG.info("Application " + application.getApplicationId().getId() +
+ " from user: " + application.getUser() +
+ " activated in queue: " + getQueueName());
+ }
+ }
+ }
+
private synchronized void addApplication(SchedulerApp application, User user) {
// Accept
user.submitApplication();
- applications.add(application);
+ pendingApplications.add(application);
applicationsMap.put(application.getApplicationAttemptId(), application);
+ // Activate applications
+ activateApplications();
+
LOG.info("Application added -" +
" appId: " + application.getApplicationId() +
" user: " + user + "," + " leaf-queue: " + getQueueName() +
- " #user-applications: " + user.getApplications() +
- " #queue-applications: " + getNumApplications());
+ " #user-pending-applications: " + user.getPendingApplications() +
+ " #user-active-applications: " + user.getActiveApplications() +
+ " #queue-pending-applications: " + getNumPendingApplications() +
+ " #queue-active-applications: " + getNumActiveApplications()
+ );
}
@Override
@@ -515,20 +636,26 @@ public class LeafQueue implements Queue
}
public synchronized void removeApplication(SchedulerApp application, User user) {
- applications.remove(application);
+ activeApplications.remove(application);
applicationsMap.remove(application.getApplicationAttemptId());
user.finishApplication();
- if (user.getApplications() == 0) {
+ if (user.getTotalApplications() == 0) {
users.remove(application.getUser());
}
+ // Check if we can activate more applications
+ activateApplications();
+
LOG.info("Application removed -" +
" appId: " + application.getApplicationId() +
" user: " + application.getUser() +
" queue: " + getQueueName() +
- " #user-applications: " + user.getApplications() +
- " #queue-applications: " + getNumApplications());
+ " #user-pending-applications: " + user.getPendingApplications() +
+ " #user-active-applications: " + user.getActiveApplications() +
+ " #queue-pending-applications: " + getNumPendingApplications() +
+ " #queue-active-applications: " + getNumActiveApplications()
+ );
}
private synchronized SchedulerApp getApplication(
@@ -542,7 +669,7 @@ public class LeafQueue implements Queue
LOG.info("DEBUG --- assignContainers:" +
" node=" + node.getHostName() +
- " #applications=" + applications.size());
+ " #applications=" + activeApplications.size());
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
@@ -554,7 +681,7 @@ public class LeafQueue implements Queue
}
// Try to assign containers to applications in order
- for (SchedulerApp application : applications) {
+ for (SchedulerApp application : activeApplications) {
LOG.info("DEBUG --- pre-assignContainers for application "
+ application.getApplicationId());
@@ -1119,7 +1246,16 @@ public class LeafQueue implements Queue
}
@Override
- public synchronized void updateResource(Resource clusterResource) {
+ public synchronized void updateClusterResource(Resource clusterResource) {
+ maxActiveApplications =
+ computeMaxActiveApplications(clusterResource, maxAMResourcePercent,
+ absoluteCapacity);
+ maxActiveApplicationsPerUser =
+ computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit,
+ userLimitFactor);
+ }
+
+ private synchronized void updateResource(Resource clusterResource) {
float queueLimit = clusterResource.getMemory() * absoluteCapacity;
setUtilization(usedResources.getMemory() / queueLimit);
setUsedCapacity(
@@ -1138,22 +1274,36 @@ public class LeafQueue implements Queue
static class User {
Resource consumed = Resources.createResource(0);
- int applications = 0;
+ int pendingApplications = 0;
+ int activeApplications = 0;
public Resource getConsumedResources() {
return consumed;
}
- public int getApplications() {
- return applications;
+ public int getPendingApplications() {
+ return pendingApplications;
}
+ public int getActiveApplications() {
+ return activeApplications;
+ }
+
+ public int getTotalApplications() {
+ return getPendingApplications() + getActiveApplications();
+ }
+
public synchronized void submitApplication() {
- ++applications;
+ ++pendingApplications;
+ }
+
+ public synchronized void activateApplication() {
+ --pendingApplications;
+ ++activeApplications;
}
public synchronized void finishApplication() {
- --applications;
+ --activeApplications;
}
public synchronized void assignContainer(Resource resource) {
@@ -1175,4 +1325,5 @@ public class LeafQueue implements Queue
parent.recoverContainer(clusterResource, application, container);
}
+
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Mon Sep 5 19:49:47 2011
@@ -646,7 +646,14 @@ public class ParentQueue implements Queu
}
@Override
- public synchronized void updateResource(Resource clusterResource) {
+ public synchronized void updateClusterResource(Resource clusterResource) {
+ // Update all children
+ for (Queue childQueue : childQueues) {
+ childQueue.updateClusterResource(clusterResource);
+ }
+ }
+
+ private synchronized void updateResource(Resource clusterResource) {
float queueLimit = clusterResource.getMemory() * absoluteCapacity;
setUtilization(usedResources.getMemory() / queueLimit);
setUsedCapacity(
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Mon Sep 5 19:49:47 2011
@@ -190,7 +190,7 @@ extends org.apache.hadoop.yarn.server.re
* Update the cluster resource for queues as we add/remove nodes
* @param clusterResource the current cluster resource
*/
- public void updateResource(Resource clusterResource);
+ public void updateClusterResource(Resource clusterResource);
/**
* Recover the state of the queue
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml Mon Sep 5 19:49:47 2011
@@ -6,6 +6,11 @@
</property>
<property>
+ <name>yarn.capacity-scheduler.maximum-am-resource-percent</name>
+ <value>0.1</value>
+ </property>
+
+ <property>
<name>yarn.capacity-scheduler.root.queues</name>
<value>default</value>
</property>
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1165403&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Mon Sep 5 19:49:47 2011
@@ -0,0 +1,234 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestApplicationLimits {
+
+ private static final Log LOG = LogFactory.getLog(TestApplicationLimits.class);
+ final static int GB = 1024;
+
+ LeafQueue queue;
+
+ @Before
+ public void setUp() {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+
+
+ CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
+ when(csContext.getConfiguration()).thenReturn(csConf);
+ when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
+ when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
+ when(csContext.getClusterResources()).thenReturn(Resources.createResource(10 * 16 * GB));
+
+ Map<String, Queue> queues = new HashMap<String, Queue>();
+ Queue root =
+ CapacityScheduler.parseQueue(csContext, csConf, null, "root",
+ queues, queues,
+ CapacityScheduler.queueComparator,
+ CapacityScheduler.applicationComparator,
+ TestUtils.spyHook);
+
+
+ queue = spy(
+ new LeafQueue(csContext, A, root,
+ CapacityScheduler.applicationComparator, null)
+ );
+
+ // Stub out ACL checks
+ doReturn(true).
+ when(queue).hasAccess(any(QueueACL.class),
+ any(UserGroupInformation.class));
+
+ // Some default values
+ doReturn(100).when(queue).getMaxApplications();
+ doReturn(25).when(queue).getMaxApplicationsPerUser();
+ doReturn(10).when(queue).getMaximumActiveApplications();
+ doReturn(2).when(queue).getMaximumActiveApplicationsPerUser();
+ }
+
+ private static final String A = "a";
+ private static final String B = "b";
+ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+
+ // Define top-level queues
+ conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B});
+ conf.setCapacity(CapacityScheduler.ROOT, 100);
+
+ final String Q_A = CapacityScheduler.ROOT + "." + A;
+ conf.setCapacity(Q_A, 10);
+
+ final String Q_B = CapacityScheduler.ROOT + "." + B;
+ conf.setCapacity(Q_B, 90);
+
+ LOG.info("Setup top-level queues a and b");
+ }
+
+ private SchedulerApp getMockApplication(int appId, String user) {
+ SchedulerApp application = mock(SchedulerApp.class);
+ ApplicationAttemptId applicationAttemptId =
+ TestUtils.getMockApplicationAttemptId(appId, 0);
+ doReturn(applicationAttemptId.getApplicationId()).
+ when(application).getApplicationId();
+ doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
+ doReturn(user).when(application).getUser();
+ return application;
+ }
+
+ @Test
+ public void testLimitsComputation() throws Exception {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+
+
+ CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
+ when(csContext.getConfiguration()).thenReturn(csConf);
+ when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
+ when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
+
+ // Say cluster has 100 nodes of 16G each
+ Resource clusterResource = Resources.createResource(100 * 16 * GB);
+ when(csContext.getClusterResources()).thenReturn(clusterResource);
+
+ Map<String, Queue> queues = new HashMap<String, Queue>();
+ Queue root =
+ CapacityScheduler.parseQueue(csContext, csConf, null, "root",
+ queues, queues,
+ CapacityScheduler.queueComparator,
+ CapacityScheduler.applicationComparator,
+ TestUtils.spyHook);
+
+ LeafQueue queue = (LeafQueue)queues.get(A);
+
+ LOG.info("Queue 'A' -" +
+ " maxActiveApplications=" + queue.getMaximumActiveApplications() +
+ " maxActiveApplicationsPerUser=" +
+ queue.getMaximumActiveApplicationsPerUser());
+ int expectedMaxActiveApps =
+ Math.max(1,
+ (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) *
+ csConf.getMaximumApplicationMasterResourcePercent() *
+ queue.getAbsoluteCapacity()));
+ assertEquals(expectedMaxActiveApps,
+ queue.getMaximumActiveApplications());
+ assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) *
+ queue.getUserLimitFactor()),
+ queue.getMaximumActiveApplicationsPerUser());
+
+ // Add some nodes to the cluster & test new limits
+ clusterResource = Resources.createResource(120 * 16 * GB);
+ root.updateClusterResource(clusterResource);
+ expectedMaxActiveApps =
+ Math.max(1,
+ (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) *
+ csConf.getMaximumApplicationMasterResourcePercent() *
+ queue.getAbsoluteCapacity()));
+ assertEquals(expectedMaxActiveApps,
+ queue.getMaximumActiveApplications());
+ assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) *
+ queue.getUserLimitFactor()),
+ queue.getMaximumActiveApplicationsPerUser());
+
+ }
+
+ @Test
+ public void testActiveApplicationLimits() throws Exception {
+ final String user_0 = "user_0";
+ final String user_1 = "user_1";
+
+ int APPLICATION_ID = 0;
+ // Submit first application
+ SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+ queue.submitApplication(app_0, user_0, A);
+ assertEquals(1, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_0));
+ assertEquals(0, queue.getNumPendingApplications(user_0));
+
+ // Submit second application
+ SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+ queue.submitApplication(app_1, user_0, A);
+ assertEquals(2, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(2, queue.getNumActiveApplications(user_0));
+ assertEquals(0, queue.getNumPendingApplications(user_0));
+
+ // Submit third application, should remain pending
+ SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+ queue.submitApplication(app_2, user_0, A);
+ assertEquals(2, queue.getNumActiveApplications());
+ assertEquals(1, queue.getNumPendingApplications());
+ assertEquals(2, queue.getNumActiveApplications(user_0));
+ assertEquals(1, queue.getNumPendingApplications(user_0));
+
+ // Finish one application, app_2 should be activated
+ queue.finishApplication(app_0, A);
+ assertEquals(2, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(2, queue.getNumActiveApplications(user_0));
+ assertEquals(0, queue.getNumPendingApplications(user_0));
+
+ // Submit another one for user_0
+ SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+ queue.submitApplication(app_3, user_0, A);
+ assertEquals(2, queue.getNumActiveApplications());
+ assertEquals(1, queue.getNumPendingApplications());
+ assertEquals(2, queue.getNumActiveApplications(user_0));
+ assertEquals(1, queue.getNumPendingApplications(user_0));
+
+ // Change queue limit to be smaller so 2 users can fill it up
+ doReturn(3).when(queue).getMaximumActiveApplications();
+
+ // Submit first app for user_1
+ SchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
+ queue.submitApplication(app_4, user_1, A);
+ assertEquals(3, queue.getNumActiveApplications());
+ assertEquals(1, queue.getNumPendingApplications());
+ assertEquals(2, queue.getNumActiveApplications(user_0));
+ assertEquals(1, queue.getNumPendingApplications(user_0));
+ assertEquals(1, queue.getNumActiveApplications(user_1));
+ assertEquals(0, queue.getNumPendingApplications(user_1));
+
+ // Submit second app for user_1, should block due to queue-limit
+ SchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
+ queue.submitApplication(app_5, user_1, A);
+ assertEquals(3, queue.getNumActiveApplications());
+ assertEquals(2, queue.getNumPendingApplications());
+ assertEquals(2, queue.getNumActiveApplications(user_0));
+ assertEquals(1, queue.getNumPendingApplications(user_0));
+ assertEquals(1, queue.getNumActiveApplications(user_1));
+ assertEquals(1, queue.getNumPendingApplications(user_1));
+
+ // Now finish one app of user_1 so app_5 should be activated
+ queue.finishApplication(app_4, A);
+ assertEquals(3, queue.getNumActiveApplications());
+ assertEquals(1, queue.getNumPendingApplications());
+ assertEquals(2, queue.getNumActiveApplications(user_0));
+ assertEquals(1, queue.getNumPendingApplications(user_0));
+ assertEquals(1, queue.getNumActiveApplications(user_1));
+ assertEquals(0, queue.getNumPendingApplications(user_1));
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Mon Sep 5 19:49:47 2011
@@ -83,8 +83,12 @@ public class TestLeafQueue {
csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
- when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
- when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
+ when(csContext.getMinimumResourceCapability()).
+ thenReturn(Resources.createResource(GB));
+ when(csContext.getMaximumResourceCapability()).
+ thenReturn(Resources.createResource(16*GB));
+ when(csContext.getClusterResources()).
+ thenReturn(Resources.createResource(100 * 16 * GB));
root =
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
queues, queues,
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Mon Sep 5 19:49:47 2011
@@ -60,6 +60,8 @@ public class TestParentQueue {
Resources.createResource(GB));
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(16*GB));
+ when(csContext.getClusterResources()).
+ thenReturn(Resources.createResource(100 * 16 * GB));
}
private static final String A = "a";
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java?rev=1165403&r1=1165402&r2=1165403&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java Mon Sep 5 19:49:47 2011
@@ -116,6 +116,13 @@ public class TestUtils {
return request;
}
+ public static ApplicationId getMockApplicationId(int appId) {
+ ApplicationId applicationId = mock(ApplicationId.class);
+ when(applicationId.getClusterTimestamp()).thenReturn(0L);
+ when(applicationId.getId()).thenReturn(appId);
+ return applicationId;
+ }
+
public static ApplicationAttemptId
getMockApplicationAttemptId(int appId, int attemptId) {
ApplicationId applicationId = mock(ApplicationId.class);