You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2016/01/21 19:32:19 UTC
hadoop git commit: YARN-4610. Reservations continue looking for one
app causes other apps to starve. Contributed by Jason Lowe
Repository: hadoop
Updated Branches:
refs/heads/trunk 1bb31fb22 -> 468a53b22
YARN-4610. Reservations continue looking for one app causes other apps to starve. Contributed by Jason Lowe
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/468a53b2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/468a53b2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/468a53b2
Branch: refs/heads/trunk
Commit: 468a53b22f4ac5bb079dff986ba849a687d709fe
Parents: 1bb31fb
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jan 21 18:30:42 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jan 21 18:31:29 2016 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../scheduler/capacity/LeafQueue.java | 6 +-
.../scheduler/capacity/TestReservations.java | 147 ++++++++++++++++++-
3 files changed, 152 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/468a53b2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ca89b32..f789bcb 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1368,6 +1368,9 @@ Release 2.7.3 - UNRELEASED
YARN-4581. AHS writer thread leak makes RM crash while RM is recovering.
(sandflee via junping_du)
+ YARN-4610. Reservations continue looking for one app causes other apps to
+ starve (jlowe)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/468a53b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 9c6d8ee..9e64b42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1152,9 +1152,11 @@ public class LeafQueue extends AbstractCSQueue {
@Private
protected synchronized boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
- String nodePartition, ResourceLimits currentResoureLimits) {
+ String nodePartition, ResourceLimits currentResourceLimits) {
User user = getUser(userName);
+ currentResourceLimits.setAmountNeededUnreserve(Resources.none());
+
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, but it's a > check, not a >= check, so...
if (Resources
@@ -1181,7 +1183,7 @@ public class LeafQueue extends AbstractCSQueue {
Resources.subtract(user.getUsed(nodePartition), limit);
// we can only acquire a new container if we unreserve first to
// respect user-limit
- currentResoureLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
+ currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
return true;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/468a53b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 49de478..9b920d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -100,12 +100,17 @@ public class TestReservations {
}
private void setup(CapacitySchedulerConfiguration csConf) throws Exception {
+ setup(csConf, false);
+ }
+
+ private void setup(CapacitySchedulerConfiguration csConf,
+ boolean addUserLimits) throws Exception {
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
final String newRoot = "root" + System.currentTimeMillis();
// final String newRoot = "root";
- setupQueueConfiguration(csConf, newRoot);
+ setupQueueConfiguration(csConf, newRoot, addUserLimits);
YarnConfiguration conf = new YarnConfiguration();
cs.setConf(conf);
@@ -146,7 +151,7 @@ public class TestReservations {
private static final String A = "a";
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf,
- final String newRoot) {
+ final String newRoot, boolean addUserLimits) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
@@ -167,6 +172,10 @@ public class TestReservations {
conf.setMaximumCapacity(Q_A, 100);
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
+ if (addUserLimits) {
+ conf.setUserLimit(Q_A, 25);
+ conf.setUserLimitFactor(Q_A, 0.25f);
+ }
}
static LeafQueue stubLeafQueue(LeafQueue queue) {
@@ -334,6 +343,140 @@ public class TestReservations {
assertEquals(0, app_0.getTotalRequiredResources(priorityReduce));
}
+ // Test that hitting a reservation limit and needing to unreserve
+ // does not affect assigning containers for other users
+ @Test
+ public void testReservationLimitOtherUsers() throws Exception {
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
+ setup(csConf, true);
+
+ // Manipulate queue 'a'
+ LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
+
+ // Users
+ final String user_0 = "user_0";
+ final String user_1 = "user_1";
+
+ // Submit applications
+ final ApplicationAttemptId appAttemptId_0 = TestUtils
+ .getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext);
+ app_0 = spy(app_0);
+ Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
+ any(String.class));
+ rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
+
+ a.submitApplicationAttempt(app_0, user_0);
+
+ final ApplicationAttemptId appAttemptId_1 = TestUtils
+ .getMockApplicationAttemptId(1, 0);
+ FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_1, a,
+ mock(ActiveUsersManager.class), spyRMContext);
+ app_1 = spy(app_1);
+ Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
+ any(String.class));
+ rmContext.getRMApps().put(app_1.getApplicationId(), mock(RMApp.class));
+
+ a.submitApplicationAttempt(app_1, user_1);
+
+ // Setup some nodes
+ String host_0 = "host_0";
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
+ 8 * GB);
+ String host_1 = "host_1";
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
+ 8 * GB);
+ String host_2 = "host_2";
+ FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
+ 8 * GB);
+
+ when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
+ when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
+ when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
+
+ cs.getAllNodes().put(node_0.getNodeID(), node_0);
+ cs.getAllNodes().put(node_1.getNodeID(), node_1);
+ cs.getAllNodes().put(node_2.getNodeID(), node_2);
+
+ final int numNodes = 3;
+ Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ // Setup resource-requests
+ Priority priorityAM = TestUtils.createMockPriority(1);
+ Priority priorityMap = TestUtils.createMockPriority(5);
+ Priority priorityReduce = TestUtils.createMockPriority(10);
+
+ app_0.updateResourceRequests(Collections.singletonList(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
+ priorityAM, recordFactory)));
+ app_1.updateResourceRequests(Collections.singletonList(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
+ priorityAM, recordFactory)));
+
+ // Start testing...
+ // Only AM
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ assertEquals(2 * GB, a.getUsedResources().getMemory());
+ assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(0 * GB, app_1.getCurrentConsumption().getMemory());
+ assertEquals(0 * GB, a.getMetrics().getReservedMB());
+ assertEquals(2 * GB, a.getMetrics().getAllocatedMB());
+ assertEquals(22 * GB, a.getMetrics().getAvailableMB());
+ assertEquals(2 * GB, node_0.getUsedResource().getMemory());
+ assertEquals(0 * GB, node_1.getUsedResource().getMemory());
+ assertEquals(0 * GB, node_2.getUsedResource().getMemory());
+
+ a.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ assertEquals(4 * GB, a.getUsedResources().getMemory());
+ assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory());
+ assertEquals(0 * GB, a.getMetrics().getReservedMB());
+ assertEquals(4 * GB, a.getMetrics().getAllocatedMB());
+ assertEquals(20 * GB, a.getMetrics().getAvailableMB());
+ assertEquals(2 * GB, node_0.getUsedResource().getMemory());
+ assertEquals(2 * GB, node_1.getUsedResource().getMemory());
+ assertEquals(0 * GB, node_2.getUsedResource().getMemory());
+
+ // Add a few requests to each app
+ app_0.updateResourceRequests(Collections.singletonList(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true,
+ priorityMap, recordFactory)));
+ app_1.updateResourceRequests(Collections.singletonList(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 2 * GB, 2, true,
+ priorityMap, recordFactory)));
+
+ // add a reservation for app_0
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ assertEquals(12 * GB, a.getUsedResources().getMemory());
+ assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory());
+ assertEquals(8 * GB, a.getMetrics().getReservedMB());
+ assertEquals(4 * GB, a.getMetrics().getAllocatedMB());
+ assertEquals(12 * GB, a.getMetrics().getAvailableMB());
+ assertEquals(2 * GB, node_0.getUsedResource().getMemory());
+ assertEquals(2 * GB, node_1.getUsedResource().getMemory());
+ assertEquals(0 * GB, node_2.getUsedResource().getMemory());
+
+ // next assignment is beyond user limit for user_0 but it should assign to
+ // app_1 for user_1
+ a.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ assertEquals(14 * GB, a.getUsedResources().getMemory());
+ assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(4 * GB, app_1.getCurrentConsumption().getMemory());
+ assertEquals(8 * GB, a.getMetrics().getReservedMB());
+ assertEquals(6 * GB, a.getMetrics().getAllocatedMB());
+ assertEquals(10 * GB, a.getMetrics().getAvailableMB());
+ assertEquals(2 * GB, node_0.getUsedResource().getMemory());
+ assertEquals(4 * GB, node_1.getUsedResource().getMemory());
+ assertEquals(0 * GB, node_2.getUsedResource().getMemory());
+ }
+
@Test
public void testReservationNoContinueLook() throws Exception {
// Test that with reservations-continue-look-all-nodes feature off