You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/01 23:04:58 UTC

hadoop git commit: YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6.1 2692401df -> dbc5bab9f


YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha)

(cherry picked from commit 22426a1c9f4bd616558089b6862fd34ab42d19a7)
(cherry picked from commit 721d7b574126c4070322f70ec5b49a7b8558a4c7)
(cherry picked from commit 5dfa25f22a989222e8b3d1013117b3350a48b2c5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dbc5bab9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dbc5bab9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dbc5bab9

Branch: refs/heads/branch-2.6.1
Commit: dbc5bab9fd19fe76713174f257457b4359784127
Parents: 2692401
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Mar 4 18:06:36 2015 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue Sep 1 13:54:04 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/fair/FairScheduler.java           |   1 +
 .../scheduler/fair/MaxRunningAppsEnforcer.java  |  40 ++-
 .../scheduler/fair/TestFairScheduler.java       | 310 ++++++++++++++++++-
 4 files changed, 348 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbc5bab9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 78c191f..334a672 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -91,6 +91,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3222. Fixed RMNode to send scheduler events in sequential order when a
     node reconnects. (Rohith Sharma K S via jianhe)
 
+    YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending 
+    jobs. (Siqi Li via kasha)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbc5bab9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index d633981..6c50e6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1388,6 +1388,7 @@ public class FairScheduler extends
         allocConf = queueInfo;
         allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
         queueMgr.updateAllocationConfiguration(allocConf);
+        maxRunningEnforcer.updateRunnabilityOnReload();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbc5bab9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
index feeda1e..45a8907 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
@@ -105,6 +105,26 @@ public class MaxRunningAppsEnforcer {
   }
 
   /**
+   * This is called after reloading the allocation configuration when the
+   * scheduler is reinitilized
+   *
+   * Checks to see whether any non-runnable applications become runnable
+   * now that the max running apps of given queue has been changed
+   *
+   * Runs in O(n) where n is the number of apps that are non-runnable and in
+   * the queues that went from having no slack to having slack.
+   */
+  public void updateRunnabilityOnReload() {
+    FSParentQueue rootQueue = scheduler.getQueueManager().getRootQueue();
+    List<List<FSAppAttempt>> appsNowMaybeRunnable =
+        new ArrayList<List<FSAppAttempt>>();
+
+    gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable);
+
+    updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE);
+  }
+
+  /**
    * Checks to see whether any other applications runnable now that the given
    * application has been removed from the given queue.  And makes them so.
    * 
@@ -156,6 +176,19 @@ public class MaxRunningAppsEnforcer {
       }
     }
 
+    updateAppsRunnability(appsNowMaybeRunnable,
+        appsNowMaybeRunnable.size());
+  }
+
+  /**
+   * Checks to see whether applications are runnable now by iterating
+   * through each one of them and check if the queue and user have slack
+   *
+   * if we know how many apps can be runnable, there is no need to iterate
+   * through all apps, maxRunnableApps is used to break out of the iteration
+   */
+  private void updateAppsRunnability(List<List<FSAppAttempt>>
+      appsNowMaybeRunnable, int maxRunnableApps) {
     // Scan through and check whether this means that any apps are now runnable
     Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator(
         appsNowMaybeRunnable);
@@ -173,9 +206,7 @@ public class MaxRunningAppsEnforcer {
         next.getQueue().getRunnableAppSchedulables().add(appSched);
         noLongerPendingApps.add(appSched);
 
-        // No more than one app per list will be able to be made runnable, so
-        // we can stop looking after we've found that many
-        if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) {
+        if (noLongerPendingApps.size() >= maxRunnableApps) {
           break;
         }
       }
@@ -195,11 +226,10 @@ public class MaxRunningAppsEnforcer {
       
       if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
         LOG.error("Waiting app " + appSched + " expected to be in "
-        		+ "usersNonRunnableApps, but was not. This should never happen.");
+            + "usersNonRunnableApps, but was not. This should never happen.");
       }
     }
   }
-  
   /**
    * Updates the relevant tracking variables after a runnable app with the given
    * queue and user has been removed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbc5bab9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index ca0e954..2c59637 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2035,7 +2035,315 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     // Request should be fulfilled
     assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
   }
-  
+
+  @Test (timeout = 5000)
+  public void testIncreaseQueueMaxRunningAppsOnTheFly() throws Exception {
+  String allocBefore = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>1</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>3</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  @Test (timeout = 5000)
+  public void testIncreaseUserMaxRunningAppsOnTheFly() throws Exception {
+    String allocBefore = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>1</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>3</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  private void testIncreaseQueueSettingOnTheFlyInternal(String allocBefore,
+      String allocAfter) throws Exception {
+    // Set max running apps
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocBefore);
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node
+    RMNode node1 =
+        MockNodes
+            .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Request for app 1
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(updateEvent);
+
+    // App 1 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
+
+    ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+    // App 3 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocAfter);
+    out.close();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 3 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 1
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId1, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+  }
+
+  @Test (timeout = 5000)
+  public void testDecreaseQueueMaxRunningAppsOnTheFly() throws Exception {
+  String allocBefore = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>3</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>" +
+        "<allocations>" +
+        "<queue name=\"root\">" +
+        "<queue name=\"queue1\">" +
+        "<maxRunningApps>1</maxRunningApps>" +
+        "</queue>" +
+        "</queue>" +
+        "</allocations>";
+
+    testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  @Test (timeout = 5000)
+  public void testDecreaseUserMaxRunningAppsOnTheFly() throws Exception {
+    String allocBefore = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>3</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    String allocAfter = "<?xml version=\"1.0\"?>"+
+        "<allocations>"+
+        "<queue name=\"root\">"+
+        "<queue name=\"queue1\">"+
+        "<maxRunningApps>10</maxRunningApps>"+
+        "</queue>"+
+        "</queue>"+
+        "<user name=\"user1\">"+
+        "<maxRunningApps>1</maxRunningApps>"+
+        "</user>"+
+        "</allocations>";
+
+    testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
+  }
+
+  private void testDecreaseQueueSettingOnTheFlyInternal(String allocBefore,
+      String allocAfter) throws Exception {
+    // Set max running apps
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocBefore);
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node
+    RMNode node1 =
+        MockNodes
+            .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Request for app 1
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(updateEvent);
+
+    // App 1 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
+
+    ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
+        "user1", 1);
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+    // App 3 should be running
+    assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println(allocAfter);
+    out.close();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 2 should still be running
+    assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 3 should still be running
+    assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
+
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 1
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId1, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 2
+    appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId2, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should not be running
+    assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+
+    // Now remove app 3
+    appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId3, RMAppAttemptState.FINISHED, false);
+
+    scheduler.handle(appRemovedEvent1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // App 4 should be running now
+    assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
+  }
+
   @Test (timeout = 5000)
   public void testReservationWhileMultiplePriorities() throws IOException {
     scheduler.init(conf);