You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2015/03/05 03:07:03 UTC

hadoop git commit: YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 430b53718 -> 22426a1c9


YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/22426a1c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/22426a1c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/22426a1c

Branch: refs/heads/trunk
Commit: 22426a1c9f4bd616558089b6862fd34ab42d19a7
Parents: 430b537
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Mar 4 18:06:36 2015 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Mar 4 18:06:58 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/fair/FairScheduler.java           |   1 +
 .../scheduler/fair/MaxRunningAppsEnforcer.java  |  40 ++-
 .../scheduler/fair/TestFairScheduler.java       | 310 ++++++++++++++++++-
 4 files changed, 348 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426a1c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0b71bee..9a52325 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -697,6 +697,9 @@ Release 2.7.0 - UNRELEASED
 
     YARN-3131. YarnClientImpl should check FAILED and KILLED state in
     submitApplication (Chang Li via jlowe)
+    
+    YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending 
+    jobs. (Siqi Li via kasha)
 
 Release 2.6.0 - 2014-11-18
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426a1c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 2b59716..e8a9555 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1477,6 +1477,7 @@ public class FairScheduler extends
         allocConf = queueInfo;
         allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
         queueMgr.updateAllocationConfiguration(allocConf);
+        maxRunningEnforcer.updateRunnabilityOnReload();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426a1c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
index 2c90edd..f750438 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
@@ -105,6 +105,26 @@ public class MaxRunningAppsEnforcer {
   }
 
   /**
+   * This is called after reloading the allocation configuration when the
+   * scheduler is reinitilized
+   *
+   * Checks to see whether any non-runnable applications become runnable
+   * now that the max running apps of given queue has been changed
+   *
+   * Runs in O(n) where n is the number of apps that are non-runnable and in
+   * the queues that went from having no slack to having slack.
+   */
+  public void updateRunnabilityOnReload() {
+    FSParentQueue rootQueue = scheduler.getQueueManager().getRootQueue();
+    List<List<FSAppAttempt>> appsNowMaybeRunnable =
+        new ArrayList<List<FSAppAttempt>>();
+
+    gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable);
+
+    updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE);
+  }
+
+  /**
    * Checks to see whether any other applications runnable now that the given
    * application has been removed from the given queue.  And makes them so.
    * 
@@ -156,6 +176,19 @@ public class MaxRunningAppsEnforcer {
       }
     }
 
+    updateAppsRunnability(appsNowMaybeRunnable,
+        appsNowMaybeRunnable.size());
+  }
+
+  /**
+   * Checks to see whether applications are runnable now by iterating
+   * through each one of them and check if the queue and user have slack
+   *
+   * if we know how many apps can be runnable, there is no need to iterate
+   * through all apps, maxRunnableApps is used to break out of the iteration
+   */
+  private void updateAppsRunnability(List<List<FSAppAttempt>>
+      appsNowMaybeRunnable, int maxRunnableApps) {
     // Scan through and check whether this means that any apps are now runnable
     Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator(
         appsNowMaybeRunnable);
@@ -173,9 +206,7 @@ public class MaxRunningAppsEnforcer {
         next.getQueue().addApp(appSched, true);
         noLongerPendingApps.add(appSched);
 
-        // No more than one app per list will be able to be made runnable, so
-        // we can stop looking after we've found that many
-        if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) {
+        if (noLongerPendingApps.size() >= maxRunnableApps) {
           break;
         }
       }
@@ -194,11 +225,10 @@ public class MaxRunningAppsEnforcer {
       
       if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
         LOG.error("Waiting app " + appSched + " expected to be in "
-        		+ "usersNonRunnableApps, but was not. This should never happen.");
+            + "usersNonRunnableApps, but was not. This should never happen.");
       }
     }
   }
-  
   /**
    * Updates the relevant tracking variables after a runnable app with the given
    * queue and user has been removed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426a1c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index c29dbfc..9fadba9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2288,7 +2288,315 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     // Request should be fulfilled
     assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
   }
-  
+
+  @Test (timeout = 5000)
+  public void testIncreaseQueueMaxRunningAppsOnTheFly() throws Exception {
+  String allocBefore = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>1</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>3</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  @Test (timeout = 5000)
+  public void testIncreaseUserMaxRunningAppsOnTheFly() throws Exception {
+    String allocBefore = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>1</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>3</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  private void testIncreaseQueueSettingOnTheFlyInternal(String allocBefore,
+      String allocAfter) throws Exception {
+    // Set max running apps
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocBefore);
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node
+    RMNode node1 =
+        MockNodes
+            .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Request for app 1
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(updateEvent);
+
+    // App 1 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
+
+    ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+    // App 3 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocAfter);
+    out.close();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 3 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 1
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId1, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+  }
+
+  @Test (timeout = 5000)
+  public void testDecreaseQueueMaxRunningAppsOnTheFly() throws Exception {
+  String allocBefore = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>3</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>1</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  @Test (timeout = 5000)
+  public void testDecreaseUserMaxRunningAppsOnTheFly() throws Exception {
+    String allocBefore = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>3</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>1</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  private void testDecreaseQueueSettingOnTheFlyInternal(String allocBefore,
+      String allocAfter) throws Exception {
+    // Set max running apps
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocBefore);
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node
+    RMNode node1 =
+        MockNodes
+            .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Request for app 1
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(updateEvent);
+
+    // App 1 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
+
+    ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+    // App 3 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocAfter);
+    out.close();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should still be running
+    assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 3 should still be running
+    assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 1
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId1, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 2
+    appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId2, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 3
+    appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId3, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should be running now
+    assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+  }
+
   @Test (timeout = 5000)
   public void testReservationWhileMultiplePriorities() throws IOException {
     scheduler.init(conf);