You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/13 23:16:21 UTC

git commit: YARN-2308. Changed CapacityScheduler to explicitly throw exception if the queue to which the apps were submitted is changed across RM restart. Contributed by Craig Welch & Chang Li (cherry picked from commit f9680d9a160ee527c8f2c1494584abf1a1

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 6b5741d42 -> 536254f5e


YARN-2308. Changed CapacityScheduler to explicitly throw exception if the queue
to which the apps were submitted is changed across RM restart. Contributed by Craig Welch & Chang Li
(cherry picked from commit f9680d9a160ee527c8f2c1494584abf1a1f70f82)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/536254f5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/536254f5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/536254f5

Branch: refs/heads/branch-2
Commit: 536254f5e746eeb2238d2859f8533dc9336ff11c
Parents: 6b5741d
Author: Jian He <ji...@apache.org>
Authored: Mon Oct 13 14:08:38 2014 -0700
Committer: Jian He <ji...@apache.org>
Committed: Mon Oct 13 14:16:12 2014 -0700

----------------------------------------------------------------------
 .../scheduler/capacity/CapacityScheduler.java   | 13 ++++
 .../yarn/server/resourcemanager/MockRM.java     |  9 +++
 .../TestWorkPreservingRMRestart.java            | 78 ++++++++++++++++++++
 3 files changed, 100 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/536254f5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 02f27b8..ed5518c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -648,6 +648,19 @@ public class CapacityScheduler extends
     // sanity checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
+      //During a restart, this indicates a queue was removed, which is
+      //not presently supported
+      if (isAppRecovering) {
+        //throwing RuntimeException because some other exceptions are caught
+        //(including YarnRuntimeException) and we want this to force an exit
+        String queueErrorMsg = "Queue named " + queueName 
+           + " missing during application recovery."
+           + " Queue removal during recovery is not presently supported by the"
+           + " capacity scheduler, please restart with all queues configured"
+           + " which were present before shutdown/restart.";
+        LOG.fatal(queueErrorMsg);
+        throw new RuntimeException(queueErrorMsg);
+      }
       String message = "Application " + applicationId + 
       " submitted by user " + user + " to unknown queue: " + queueName;
       this.rmContext.getDispatcher().getEventHandler()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/536254f5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 5d37d48..cfac585 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -249,6 +249,15 @@ public class MockRM extends ResourceManager {
       super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
   }
+  
+  public RMApp submitApp(int masterMemory, String name, String user,
+      Map<ApplicationAccessType, String> acls, String queue, 
+      boolean waitForAccepted) throws Exception {
+    return submitApp(masterMemory, name, user, acls, false, queue,
+      super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+        waitForAccepted);
+  }
 
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/536254f5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 5f00f31..aadfbba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -336,6 +336,8 @@ public class TestWorkPreservingRMRestart {
   private static final String R = "Default";
   private static final String A = "QueueA";
   private static final String B = "QueueB";
+  //don't ever create the below queue ;-)
+  private static final String QUEUE_DOESNT_EXIST = "NoSuchQueue";
   private static final String USER_1 = "user1";
   private static final String USER_2 = "user2";
 
@@ -351,6 +353,18 @@ public class TestWorkPreservingRMRestart {
     conf.setDouble(CapacitySchedulerConfiguration
       .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
   }
+  
+  private void setupQueueConfigurationOnlyA(
+      CapacitySchedulerConfiguration conf) {
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
+    final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
+    conf.setCapacity(Q_R, 100);
+    final String Q_A = Q_R + "." + A;
+    conf.setQueues(Q_R, new String[] {A});
+    conf.setCapacity(Q_A, 100);
+    conf.setDouble(CapacitySchedulerConfiguration
+      .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f);
+  }
 
   // Test CS recovery with multi-level queues and multi-users:
   // 1. setup 2 NMs each with 8GB memory;
@@ -470,6 +484,70 @@ public class TestWorkPreservingRMRestart {
       totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
       totalUsedResource.getVirtualCores());
   }
+  
+  //Test that we receive a meaningful exit-causing exception if a queue
+  //is removed during recovery
+  //1. Add some apps to two queues, attempt to add an app to a non-existant
+  //   queue to verify that the new logic is not in effect during normal app
+  //   submission
+  //2. Remove one of the queues, restart the RM
+  //3. Verify that the expected exception was thrown
+  @Test (timeout = 30000)
+  public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
+    if (!schedulerClass.equals(CapacityScheduler.class)) {
+      return;
+    }
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+    conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+      DominantResourceCalculator.class.getName());
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(conf);
+    setupQueueConfiguration(csConf);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(csConf);
+    rm1 = new MockRM(csConf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    MockNM nm2 =
+        new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm2.registerNode();
+    RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A);
+    MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
+    RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A);
+    MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2);
+
+    RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+    
+    //Submit an app with a non existant queue to make sure it does not
+    //cause a fatal failure in the non-recovery case
+    RMApp appNA = rm1.submitApp(1024, "app1_2", USER_1, null,
+       QUEUE_DOESNT_EXIST, false);
+
+    // clear queue metrics
+    rm1.clearQueueMetrics(app1_1);
+    rm1.clearQueueMetrics(app1_2);
+    rm1.clearQueueMetrics(app2);
+
+    // Re-start RM
+    csConf =
+        new CapacitySchedulerConfiguration(conf);
+    setupQueueConfigurationOnlyA(csConf);
+    rm2 = new MockRM(csConf, memStore);
+    boolean runtimeThrown = false;
+    try {
+      rm2.start();
+    } catch (RuntimeException e) {
+      //we're catching it because we want to verify the message
+      //and we don't want to set it as an expected exception for the 
+      //test because we only want it to happen here
+      assertTrue(e.getMessage().contains(B + " missing"));
+      runtimeThrown = true;
+    }
+    assertTrue(runtimeThrown);
+  }
 
   private void checkParentQueue(ParentQueue parentQueue, int numContainers,
       Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {