You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/02/02 01:42:19 UTC
svn commit: r1239423 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/
hadoop-yarn/hadoop-yarn-s...
Author: vinodkv
Date: Thu Feb 2 00:42:18 2012
New Revision: 1239423
URL: http://svn.apache.org/viewvc?rev=1239423&view=rev
Log:
MAPREDUCE-3752. Modified application limits to include queue max-capacities besides the usual user limits. Contributed by Arun C Murthy.
svn merge --ignore-ancestry -c 1239422 ../../trunk/
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Feb 2 00:42:18 2012
@@ -606,6 +606,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files.
(Arun C Murthy via sseth)
+ MAPREDUCE-3752. Modified application limits to include queue max-capacities
+ besides the usual user limits. (Arun C Murthy via vinodkv)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java Thu Feb 2 00:42:18 2012
@@ -111,4 +111,12 @@ public class Resources {
public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) {
return lhs.getMemory() >= rhs.getMemory();
}
+
+ public static Resource min(Resource lhs, Resource rhs) {
+ return (lhs.getMemory() < rhs.getMemory()) ? lhs : rhs;
+ }
+
+ public static Resource max(Resource lhs, Resource rhs) {
+ return (lhs.getMemory() > rhs.getMemory()) ? lhs : rhs;
+ }
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Thu Feb 2 00:42:18 2012
@@ -162,6 +162,13 @@ public class AppSchedulingInfo {
asks.put(hostName, request);
if (updatePendingResources) {
+
+ // Similarly, deactivate application?
+ if (request.getNumContainers() <= 0) {
+ LOG.info("checking for deactivate... ");
+ checkForDeactivation();
+ }
+
int lastRequestContainers = lastRequest != null ? lastRequest
.getNumContainers() : 0;
Resource lastRequestCapability = lastRequest != null ? lastRequest
@@ -308,19 +315,24 @@ public class AppSchedulingInfo {
// Do we have any outstanding requests?
// If there is nothing, we need to deactivate this application
if (numOffSwitchContainers == 0) {
- boolean deactivate = true;
- for (Priority priority : getPriorities()) {
- ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
- if (request.getNumContainers() > 0) {
- deactivate = false;
- break;
- }
- }
- if (deactivate) {
- activeUsersManager.deactivateApplication(user, applicationId);
+ checkForDeactivation();
+ }
+ }
+
+ synchronized private void checkForDeactivation() {
+ boolean deactivate = true;
+ for (Priority priority : getPriorities()) {
+ ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
+ if (request.getNumContainers() > 0) {
+ deactivate = false;
+ break;
}
}
+ if (deactivate) {
+ activeUsersManager.deactivateApplication(user, applicationId);
+ }
}
+
synchronized private void allocate(Container container) {
// Update consumption and track allocations
//TODO: fixme sharad
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Feb 2 00:42:18 2012
@@ -751,15 +751,15 @@ public class LeafQueue implements CSQueu
continue;
}
- // Compute & set headroom
- // Note: We set the headroom with the highest priority request
- // as the target.
+ // Compute user-limit & set headroom
+ // Note: We compute both user-limit & headroom with the highest
+ // priority request as the target.
// This works since we never assign lower priority requests
// before all higher priority ones are serviced.
Resource userLimit =
- computeAndSetUserResourceLimit(application, clusterResource,
- required);
-
+ computeUserLimitAndSetHeadroom(application, clusterResource,
+ required);
+
// Check queue max-capacity limit
if (!assignToQueue(clusterResource, required)) {
return NULL_ASSIGNMENT;
@@ -777,13 +777,13 @@ public class LeafQueue implements CSQueu
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
null);
-
- Resource assigned = assignment.getResource();
-
+
// Did we schedule or reserve a container?
+ Resource assigned = assignment.getResource();
if (Resources.greaterThan(assigned, Resources.none())) {
- // Book-keeping
+ // Book-keeping
+ // Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned);
// Reset scheduling opportunities
@@ -854,20 +854,50 @@ public class LeafQueue implements CSQueu
}
@Lock({LeafQueue.class, SchedulerApp.class})
- private Resource computeAndSetUserResourceLimit(SchedulerApp application,
- Resource clusterResource, Resource required) {
+ private Resource computeUserLimitAndSetHeadroom(
+ SchedulerApp application, Resource clusterResource, Resource required) {
+
String user = application.getUser();
- Resource limit = computeUserLimit(application, clusterResource, required);
+
+ /**
+ * Headroom is min((userLimit, queue-max-cap) - consumed)
+ */
+
+ Resource userLimit = // User limit
+ computeUserLimit(application, clusterResource, required);
+
+
+ Resource queueMaxCap = // Queue Max-Capacity
+ Resources.createResource(
+ roundDown((int)(absoluteMaxCapacity * clusterResource.getMemory()))
+ );
+
+ Resource userConsumed = getUser(user).getConsumedResources();
Resource headroom =
- Resources.subtract(limit, getUser(user).getConsumedResources());
+ Resources.subtract(Resources.min(userLimit, queueMaxCap), userConsumed);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Headroom calculation for user " + user + ": " +
+ " userLimit=" + userLimit +
+ " queueMaxCap=" + queueMaxCap +
+ " consumed=" + userConsumed +
+ " headroom=" + headroom);
+ }
+
application.setHeadroom(headroom);
metrics.setAvailableResourcesToUser(user, headroom);
- return limit;
+
+ return userLimit;
}
private int roundUp(int memory) {
- return divideAndCeil(memory, minimumAllocation.getMemory()) *
- minimumAllocation.getMemory();
+ int minMemory = minimumAllocation.getMemory();
+ return divideAndCeil(memory, minMemory) * minMemory;
+ }
+
+ private int roundDown(int memory) {
+ int minMemory = minimumAllocation.getMemory();
+ return (memory / minMemory) * minMemory;
}
@Lock(NoLock.class)
@@ -1288,10 +1318,17 @@ public class LeafQueue implements CSQueu
String userName = application.getUser();
User user = getUser(userName);
user.assignContainer(resource);
+ Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
- LOG.info(getQueueName() +
- " used=" + usedResources + " numContainers=" + numContainers +
- " user=" + userName + " user-resources=" + user.getConsumedResources());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info(getQueueName() +
+ " user=" + userName +
+ " used=" + usedResources + " numContainers=" + numContainers +
+ " headroom = " + application.getHeadroom() +
+ " user-resources=" + user.getConsumedResources()
+ );
+ }
}
synchronized void releaseResource(Resource clusterResource,
@@ -1325,8 +1362,8 @@ public class LeafQueue implements CSQueu
// Update application properties
for (SchedulerApp application : activeApplications) {
synchronized (application) {
- computeAndSetUserResourceLimit(
- application, clusterResource, Resources.none());
+ computeUserLimitAndSetHeadroom(application, clusterResource,
+ Resources.none());
}
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Thu Feb 2 00:42:18 2012
@@ -38,6 +38,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -67,6 +69,8 @@ import org.mockito.stubbing.Answer;
public class TestLeafQueue {
+ private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
+
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -473,6 +477,115 @@ public class TestLeafQueue {
}
@Test
+ public void testHeadroomWithMaxCap() throws Exception {
+ // Mock the queue
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+ //unset maxCapacity
+ a.setMaxCapacity(1.0f);
+
+ // Users
+ final String user_0 = "user_0";
+ final String user_1 = "user_1";
+
+ // Submit applications
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ SchedulerApp app_0 =
+ new SchedulerApp(appAttemptId_0, user_0, a,
+ a.getActiveUsersManager(), rmContext, null);
+ a.submitApplication(app_0, user_0, A);
+
+ final ApplicationAttemptId appAttemptId_1 =
+ TestUtils.getMockApplicationAttemptId(1, 0);
+ SchedulerApp app_1 =
+ new SchedulerApp(appAttemptId_1, user_0, a,
+ a.getActiveUsersManager(), rmContext, null);
+ a.submitApplication(app_1, user_0, A); // same user
+
+ final ApplicationAttemptId appAttemptId_2 =
+ TestUtils.getMockApplicationAttemptId(2, 0);
+ SchedulerApp app_2 =
+ new SchedulerApp(appAttemptId_2, user_1, a,
+ a.getActiveUsersManager(), rmContext, null);
+ a.submitApplication(app_2, user_1, A);
+
+ // Setup some nodes
+ String host_0 = "host_0";
+ SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+ String host_1 = "host_1";
+ SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+
+ final int numNodes = 2;
+ Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ // Setup resource-requests
+ Priority priority = TestUtils.createMockPriority(1);
+ app_0.updateResourceRequests(Collections.singletonList(
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
+ recordFactory)));
+
+ app_1.updateResourceRequests(Collections.singletonList(
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
+ recordFactory)));
+
+ /**
+ * Start testing...
+ */
+
+ // Set user-limit
+ a.setUserLimit(50);
+ a.setUserLimitFactor(2);
+
+ // Now, only user_0 should be active since he is the only one with
+ // outstanding requests
+ assertEquals("There should only be 1 active user!",
+ 1, a.getActiveUsersManager().getNumActiveUsers());
+
+ // 1 container to user_0
+ a.assignContainers(clusterResource, node_0);
+ assertEquals(2*GB, a.getUsedResources().getMemory());
+ assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+ assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
+ assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
+
+ // Again one to user_0 since he hasn't exceeded user limit yet
+ a.assignContainers(clusterResource, node_0);
+ assertEquals(3*GB, a.getUsedResources().getMemory());
+ assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+ assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
+ assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
+
+ // Submit requests for app_1 and set max-cap
+ a.setMaxCapacity(.1f);
+ app_2.updateResourceRequests(Collections.singletonList(
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
+ recordFactory)));
+ assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
+
+ // No more to user_0 since he is already over user-limit
+ // and no more containers to queue since it's already at max-cap
+ a.assignContainers(clusterResource, node_1);
+ assertEquals(3*GB, a.getUsedResources().getMemory());
+ assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+ assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
+ assertEquals(0*GB, app_0.getHeadroom().getMemory());
+ assertEquals(0*GB, app_1.getHeadroom().getMemory());
+
+ // Check headroom for app_2
+ LOG.info("here");
+ app_1.updateResourceRequests(Collections.singletonList( // unset
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 0, priority,
+ recordFactory)));
+ assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
+ a.assignContainers(clusterResource, node_1);
+ assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
+ }
+
+ @Test
public void testSingleQueueWithMultipleUsers() throws Exception {
// Mock the queue
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Thu Feb 2 00:42:18 2012
@@ -86,7 +86,7 @@ public class TestParentQueue {
private SchedulerApp getMockApplication(int appId, String user) {
SchedulerApp application = mock(SchedulerApp.class);
doReturn(user).when(application).getUser();
- doReturn(null).when(application).getHeadroom();
+ doReturn(Resources.createResource(0)).when(application).getHeadroom();
return application;
}