You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2015/03/05 03:07:03 UTC
hadoop git commit: YARN-3231. FairScheduler: Changing
queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk 430b53718 -> 22426a1c9
YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/22426a1c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/22426a1c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/22426a1c
Branch: refs/heads/trunk
Commit: 22426a1c9f4bd616558089b6862fd34ab42d19a7
Parents: 430b537
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Mar 4 18:06:36 2015 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Mar 4 18:06:58 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../scheduler/fair/FairScheduler.java | 1 +
.../scheduler/fair/MaxRunningAppsEnforcer.java | 40 ++-
.../scheduler/fair/TestFairScheduler.java | 310 ++++++++++++++++++-
4 files changed, 348 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426a1c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0b71bee..9a52325 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -697,6 +697,9 @@ Release 2.7.0 - UNRELEASED
YARN-3131. YarnClientImpl should check FAILED and KILLED state in
submitApplication (Chang Li via jlowe)
+
+ YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending
+ jobs. (Siqi Li via kasha)
Release 2.6.0 - 2014-11-18
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426a1c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 2b59716..e8a9555 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1477,6 +1477,7 @@ public class FairScheduler extends
allocConf = queueInfo;
allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
queueMgr.updateAllocationConfiguration(allocConf);
+ maxRunningEnforcer.updateRunnabilityOnReload();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426a1c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
index 2c90edd..f750438 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
@@ -105,6 +105,26 @@ public class MaxRunningAppsEnforcer {
}
/**
+ * This is called after reloading the allocation configuration when the
+ * scheduler is reinitilized
+ *
+ * Checks to see whether any non-runnable applications become runnable
+ * now that the max running apps of given queue has been changed
+ *
+ * Runs in O(n) where n is the number of apps that are non-runnable and in
+ * the queues that went from having no slack to having slack.
+ */
+ public void updateRunnabilityOnReload() {
+ FSParentQueue rootQueue = scheduler.getQueueManager().getRootQueue();
+ List<List<FSAppAttempt>> appsNowMaybeRunnable =
+ new ArrayList<List<FSAppAttempt>>();
+
+ gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable);
+
+ updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE);
+ }
+
+ /**
* Checks to see whether any other applications runnable now that the given
* application has been removed from the given queue. And makes them so.
*
@@ -156,6 +176,19 @@ public class MaxRunningAppsEnforcer {
}
}
+ updateAppsRunnability(appsNowMaybeRunnable,
+ appsNowMaybeRunnable.size());
+ }
+
+ /**
+ * Checks to see whether applications are runnable now by iterating
+ * through each one of them and check if the queue and user have slack
+ *
+ * if we know how many apps can be runnable, there is no need to iterate
+ * through all apps, maxRunnableApps is used to break out of the iteration
+ */
+ private void updateAppsRunnability(List<List<FSAppAttempt>>
+ appsNowMaybeRunnable, int maxRunnableApps) {
// Scan through and check whether this means that any apps are now runnable
Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator(
appsNowMaybeRunnable);
@@ -173,9 +206,7 @@ public class MaxRunningAppsEnforcer {
next.getQueue().addApp(appSched, true);
noLongerPendingApps.add(appSched);
- // No more than one app per list will be able to be made runnable, so
- // we can stop looking after we've found that many
- if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) {
+ if (noLongerPendingApps.size() >= maxRunnableApps) {
break;
}
}
@@ -194,11 +225,10 @@ public class MaxRunningAppsEnforcer {
if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
LOG.error("Waiting app " + appSched + " expected to be in "
- + "usersNonRunnableApps, but was not. This should never happen.");
+ + "usersNonRunnableApps, but was not. This should never happen.");
}
}
}
-
/**
* Updates the relevant tracking variables after a runnable app with the given
* queue and user has been removed.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426a1c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index c29dbfc..9fadba9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2288,7 +2288,315 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Request should be fulfilled
assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
}
-
+
+ @Test (timeout = 5000)
+ public void testIncreaseQueueMaxRunningAppsOnTheFly() throws Exception {
+ String allocBefore = "<?xml version=\"1.0\"?>" +
+ "<allocations>" +
+ "<queue name=\"root\">" +
+ "<queue name=\"queue1\">" +
+ "<maxRunningApps>1</maxRunningApps>" +
+ "</queue>" +
+ "</queue>" +
+ "</allocations>";
+
+ String allocAfter = "<?xml version=\"1.0\"?>" +
+ "<allocations>" +
+ "<queue name=\"root\">" +
+ "<queue name=\"queue1\">" +
+ "<maxRunningApps>3</maxRunningApps>" +
+ "</queue>" +
+ "</queue>" +
+ "</allocations>";
+
+ testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+ }
+
+ @Test (timeout = 5000)
+ public void testIncreaseUserMaxRunningAppsOnTheFly() throws Exception {
+ String allocBefore = "<?xml version=\"1.0\"?>"+
+ "<allocations>"+
+ "<queue name=\"root\">"+
+ "<queue name=\"queue1\">"+
+ "<maxRunningApps>10</maxRunningApps>"+
+ "</queue>"+
+ "</queue>"+
+ "<user name=\"user1\">"+
+ "<maxRunningApps>1</maxRunningApps>"+
+ "</user>"+
+ "</allocations>";
+
+ String allocAfter = "<?xml version=\"1.0\"?>"+
+ "<allocations>"+
+ "<queue name=\"root\">"+
+ "<queue name=\"queue1\">"+
+ "<maxRunningApps>10</maxRunningApps>"+
+ "</queue>"+
+ "</queue>"+
+ "<user name=\"user1\">"+
+ "<maxRunningApps>3</maxRunningApps>"+
+ "</user>"+
+ "</allocations>";
+
+ testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+ }
+
+ private void testIncreaseQueueSettingOnTheFlyInternal(String allocBefore,
+ String allocAfter) throws Exception {
+ // Set max running apps
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println(allocBefore);
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node
+ RMNode node1 =
+ MockNodes
+ .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Request for app 1
+ ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+ "user1", 1);
+
+ scheduler.update();
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(updateEvent);
+
+ // App 1 should be running
+ assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
+
+ ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
+ "user1", 1);
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
+ "user1", 1);
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
+ "user1", 1);
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 2 should not be running
+ assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+ // App 3 should not be running
+ assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+ // App 4 should not be running
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+ out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println(allocAfter);
+ out.close();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 2 should be running
+ assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 3 should be running
+ assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 4 should not be running
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+ // Now remove app 1
+ AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+ attId1, RMAppAttemptState.FINISHED, false);
+
+ scheduler.handle(appRemovedEvent1);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 4 should be running
+ assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+ }
+
+ @Test (timeout = 5000)
+ public void testDecreaseQueueMaxRunningAppsOnTheFly() throws Exception {
+ String allocBefore = "<?xml version=\"1.0\"?>" +
+ "<allocations>" +
+ "<queue name=\"root\">" +
+ "<queue name=\"queue1\">" +
+ "<maxRunningApps>3</maxRunningApps>" +
+ "</queue>" +
+ "</queue>" +
+ "</allocations>";
+
+ String allocAfter = "<?xml version=\"1.0\"?>" +
+ "<allocations>" +
+ "<queue name=\"root\">" +
+ "<queue name=\"queue1\">" +
+ "<maxRunningApps>1</maxRunningApps>" +
+ "</queue>" +
+ "</queue>" +
+ "</allocations>";
+
+ testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+ }
+
+ @Test (timeout = 5000)
+ public void testDecreaseUserMaxRunningAppsOnTheFly() throws Exception {
+ String allocBefore = "<?xml version=\"1.0\"?>"+
+ "<allocations>"+
+ "<queue name=\"root\">"+
+ "<queue name=\"queue1\">"+
+ "<maxRunningApps>10</maxRunningApps>"+
+ "</queue>"+
+ "</queue>"+
+ "<user name=\"user1\">"+
+ "<maxRunningApps>3</maxRunningApps>"+
+ "</user>"+
+ "</allocations>";
+
+ String allocAfter = "<?xml version=\"1.0\"?>"+
+ "<allocations>"+
+ "<queue name=\"root\">"+
+ "<queue name=\"queue1\">"+
+ "<maxRunningApps>10</maxRunningApps>"+
+ "</queue>"+
+ "</queue>"+
+ "<user name=\"user1\">"+
+ "<maxRunningApps>1</maxRunningApps>"+
+ "</user>"+
+ "</allocations>";
+
+ testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+ }
+
+ private void testDecreaseQueueSettingOnTheFlyInternal(String allocBefore,
+ String allocAfter) throws Exception {
+ // Set max running apps
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println(allocBefore);
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node
+ RMNode node1 =
+ MockNodes
+ .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Request for app 1
+ ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+ "user1", 1);
+
+ scheduler.update();
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(updateEvent);
+
+ // App 1 should be running
+ assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
+
+ ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
+ "user1", 1);
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
+ "user1", 1);
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
+ "user1", 1);
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 2 should be running
+ assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+ // App 3 should be running
+ assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+ // App 4 should not be running
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+ out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println(allocAfter);
+ out.close();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 2 should still be running
+ assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 3 should still be running
+ assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 4 should not be running
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+ // Now remove app 1
+ AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+ attId1, RMAppAttemptState.FINISHED, false);
+
+ scheduler.handle(appRemovedEvent1);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 4 should not be running
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+ // Now remove app 2
+ appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+ attId2, RMAppAttemptState.FINISHED, false);
+
+ scheduler.handle(appRemovedEvent1);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 4 should not be running
+ assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+ // Now remove app 3
+ appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+ attId3, RMAppAttemptState.FINISHED, false);
+
+ scheduler.handle(appRemovedEvent1);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // App 4 should be running now
+ assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+ }
+
@Test (timeout = 5000)
public void testReservationWhileMultiplePriorities() throws IOException {
scheduler.init(conf);