You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/01 17:06:18 UTC
[02/50] hadoop git commit: YARN-4606. CapacityScheduler: applications
could get starved because computation of #activeUsers considers pending apps.
Contributed by Manikandan R
YARN-4606. CapacityScheduler: applications could get starved because computation of #activeUsers considers pending apps. Contributed by Manikandan R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9485c9ae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9485c9ae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9485c9ae
Branch: refs/heads/HDFS-12943
Commit: 9485c9aee6e9bb935c3e6ae4da81d70b621781de
Parents: 5f0b924
Author: Eric E Payne <er...@oath.com>
Authored: Wed Jul 25 16:22:04 2018 +0000
Committer: Eric E Payne <er...@oath.com>
Committed: Wed Jul 25 16:22:04 2018 +0000
----------------------------------------------------------------------
.../scheduler/capacity/UsersManager.java | 27 +++-
.../capacity/TestCapacityScheduler.java | 128 +++++++++++++++++++
.../capacity/TestContainerAllocation.java | 43 +++++++
3 files changed, 197 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9485c9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 747a488..83ee6c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -85,6 +85,7 @@ public class UsersManager implements AbstractUsersManager {
private final QueueMetrics metrics;
private AtomicInteger activeUsers = new AtomicInteger(0);
+ private AtomicInteger activeUsersWithOnlyPendingApps = new AtomicInteger(0);
private Map<String, Set<ApplicationId>> usersApplications =
new HashMap<String, Set<ApplicationId>>();
@@ -671,9 +672,23 @@ public class UsersManager implements AbstractUsersManager {
// update in local storage
userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit);
+ computeNumActiveUsersWithOnlyPendingApps();
+
return userLimitPerSchedulingMode;
}
+ // This method is called within the lock.
+ private void computeNumActiveUsersWithOnlyPendingApps() {
+ int numPendingUsers = 0;
+ for (User user : users.values()) {
+ if ((user.getPendingApplications() > 0)
+ && (user.getActiveApplications() <= 0)) {
+ numPendingUsers++;
+ }
+ }
+ activeUsersWithOnlyPendingApps = new AtomicInteger(numPendingUsers);
+ }
+
private Resource computeUserLimit(String userName, Resource clusterResource,
String nodePartition, SchedulingMode schedulingMode, boolean activeUser) {
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
@@ -839,6 +854,11 @@ public class UsersManager implements AbstractUsersManager {
try {
this.writeLock.lock();
+ User userDesc = getUser(user);
+ if (userDesc != null && userDesc.getActiveApplications() <= 0) {
+ return;
+ }
+
Set<ApplicationId> userApps = usersApplications.get(user);
if (userApps == null) {
userApps = new HashSet<ApplicationId>();
@@ -893,7 +913,7 @@ public class UsersManager implements AbstractUsersManager {
@Override
public int getNumActiveUsers() {
- return activeUsers.get();
+ return activeUsers.get() + activeUsersWithOnlyPendingApps.get();
}
float sumActiveUsersTimesWeights() {
@@ -1090,4 +1110,9 @@ public class UsersManager implements AbstractUsersManager {
this.writeLock.unlock();
}
}
+
+ @VisibleForTesting
+ public int getNumActiveUsersWithOnlyPendingApps() {
+ return activeUsersWithOnlyPendingApps.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9485c9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 79cdcfe..8d948b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -4978,4 +4978,132 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
Assert.assertEquals(AllocationState.QUEUE_SKIPPED,
ContainerAllocation.QUEUE_SKIPPED.getAllocationState());
}
+
+ @Test
+ public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception {
+
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+
+ CapacitySchedulerConfiguration newConf =
+ new CapacitySchedulerConfiguration(conf);
+
+ // Define top-level queues
+ newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] { "a", "b" });
+
+ newConf.setCapacity(A, 50);
+ newConf.setCapacity(B, 50);
+
+ // Define 2nd-level queues
+ newConf.setQueues(A, new String[] { "a1" });
+ newConf.setCapacity(A1, 100);
+ newConf.setUserLimitFactor(A1, 2.0f);
+ newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f);
+
+ newConf.setQueues(B, new String[] { "b1" });
+ newConf.setCapacity(B1, 100);
+ newConf.setUserLimitFactor(B1, 2.0f);
+
+ LOG.info("Setup top-level queues a and b");
+
+ MockRM rm = new MockRM(newConf);
+ rm.start();
+
+ CapacityScheduler scheduler =
+ (CapacityScheduler) rm.getResourceScheduler();
+
+ MockNM nm1 = rm.registerNode("h1:1234", 16 * GB);
+
+ // submit an app
+ RMApp app = rm.submitApp(GB, "test-move-1", "u1", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+
+ RMApp app2 = rm.submitApp(1 * GB, "app", "u2", null, "a1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+
+ RMApp app3 = rm.submitApp(1 * GB, "app", "u3", null, "a1");
+
+ RMApp app4 = rm.submitApp(1 * GB, "app", "u4", null, "a1");
+
+ // Each application asks 50 * 1GB containers
+ am1.allocate("*", 1 * GB, 50, null);
+ am2.allocate("*", 1 * GB, 50, null);
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // check preconditions
+ List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(4, appsInA1.size());
+ String queue =
+ scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+ .getQueueName();
+ Assert.assertEquals("a1", queue);
+
+ List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(appAttemptId));
+ assertEquals(4, appsInA.size());
+
+ List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(appAttemptId));
+ assertEquals(4, appsInRoot.size());
+
+ List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+ assertTrue(appsInB1.isEmpty());
+
+ List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+ assertTrue(appsInB.isEmpty());
+
+ UsersManager um =
+ (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager();
+
+ assertEquals(4, um.getNumActiveUsers());
+ assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
+
+ // now move the app
+ scheduler.moveAllApps("a1", "b1");
+
+ //Triggering this event so that user limit computation can
+ //happen again
+ for (int i = 0; i < 10; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Thread.sleep(500);
+ }
+
+ // check postconditions
+ appsInB1 = scheduler.getAppsInQueue("b1");
+
+ assertEquals(4, appsInB1.size());
+ queue =
+ scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+ .getQueueName();
+ Assert.assertEquals("b1", queue);
+
+ appsInB = scheduler.getAppsInQueue("b");
+ assertTrue(appsInB.contains(appAttemptId));
+ assertEquals(4, appsInB.size());
+
+ appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(appAttemptId));
+ assertEquals(4, appsInRoot.size());
+
+ List<ApplicationAttemptId> oldAppsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(0, oldAppsInA1.size());
+
+ UsersManager um_b1 =
+ (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager();
+
+ assertEquals(2, um_b1.getNumActiveUsers());
+ assertEquals(2, um_b1.getNumActiveUsersWithOnlyPendingApps());
+
+ appsInB1 = scheduler.getAppsInQueue("b1");
+ assertEquals(4, appsInB1.size());
+ rm.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9485c9ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 25e535a..b9bfc2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -941,4 +941,47 @@ public class TestContainerAllocation {
rm1.close();
}
+
+ @Test
+ public void testActiveUsersWithOnlyPendingApps() throws Exception {
+
+ CapacitySchedulerConfiguration newConf =
+ new CapacitySchedulerConfiguration(conf);
+ newConf.setMaximumAMResourcePercentPerPartition(
+ CapacitySchedulerConfiguration.ROOT + ".default", "", 0.2f);
+ MockRM rm1 = new MockRM(newConf);
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "u1", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "u2", null, "default");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ RMApp app3 = rm1.submitApp(1 * GB, "app", "u3", null, "default");
+
+ RMApp app4 = rm1.submitApp(1 * GB, "app", "u4", null, "default");
+
+ // Each application asks 50 * 1GB containers
+ am1.allocate("*", 1 * GB, 50, null);
+ am2.allocate("*", 1 * GB, 50, null);
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ for (int i = 0; i < 10; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Thread.sleep(1000);
+ }
+ LeafQueue lq = (LeafQueue) cs.getQueue("default");
+ UsersManager um = (UsersManager) lq.getAbstractUsersManager();
+
+ Assert.assertEquals(4, um.getNumActiveUsers());
+ Assert.assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
+ Assert.assertEquals(2, lq.getMetrics().getAppsPending());
+ rm1.close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org