You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/13 23:16:21 UTC
git commit: YARN-2308. Changed CapacityScheduler to explicitly throw
exception if the queue to which the apps were submitted is changed across RM
restart. Contributed by Craig Welch & Chang Li (cherry picked from commit
f9680d9a160ee527c8f2c1494584abf1a1
Repository: hadoop
Updated Branches:
refs/heads/branch-2 6b5741d42 -> 536254f5e
YARN-2308. Changed CapacityScheduler to explicitly throw exception if the queue
to which the apps were submitted is changed across RM restart. Contributed by Craig Welch & Chang Li
(cherry picked from commit f9680d9a160ee527c8f2c1494584abf1a1f70f82)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/536254f5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/536254f5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/536254f5
Branch: refs/heads/branch-2
Commit: 536254f5e746eeb2238d2859f8533dc9336ff11c
Parents: 6b5741d
Author: Jian He <ji...@apache.org>
Authored: Mon Oct 13 14:08:38 2014 -0700
Committer: Jian He <ji...@apache.org>
Committed: Mon Oct 13 14:16:12 2014 -0700
----------------------------------------------------------------------
.../scheduler/capacity/CapacityScheduler.java | 13 ++++
.../yarn/server/resourcemanager/MockRM.java | 9 +++
.../TestWorkPreservingRMRestart.java | 78 ++++++++++++++++++++
3 files changed, 100 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/536254f5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 02f27b8..ed5518c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -648,6 +648,19 @@ public class CapacityScheduler extends
// sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
+ //During a restart, this indicates a queue was removed, which is
+ //not presently supported
+ if (isAppRecovering) {
+ //throwing RuntimeException because some other exceptions are caught
+ //(including YarnRuntimeException) and we want this to force an exit
+ String queueErrorMsg = "Queue named " + queueName
+ + " missing during application recovery."
+ + " Queue removal during recovery is not presently supported by the"
+ + " capacity scheduler, please restart with all queues configured"
+ + " which were present before shutdown/restart.";
+ LOG.fatal(queueErrorMsg);
+ throw new RuntimeException(queueErrorMsg);
+ }
String message = "Application " + applicationId +
" submitted by user " + user + " to unknown queue: " + queueName;
this.rmContext.getDispatcher().getEventHandler()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/536254f5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 5d37d48..cfac585 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -249,6 +249,15 @@ public class MockRM extends ResourceManager {
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
}
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map<ApplicationAccessType, String> acls, String queue,
+ boolean waitForAccepted) throws Exception {
+ return submitApp(masterMemory, name, user, acls, false, queue,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ waitForAccepted);
+ }
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/536254f5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 5f00f31..aadfbba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -336,6 +336,8 @@ public class TestWorkPreservingRMRestart {
private static final String R = "Default";
private static final String A = "QueueA";
private static final String B = "QueueB";
+ //don't ever create the below queue ;-)
+ private static final String QUEUE_DOESNT_EXIST = "NoSuchQueue";
private static final String USER_1 = "user1";
private static final String USER_2 = "user2";
@@ -351,6 +353,18 @@ public class TestWorkPreservingRMRestart {
conf.setDouble(CapacitySchedulerConfiguration
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
}
+
+ private void setupQueueConfigurationOnlyA(
+ CapacitySchedulerConfiguration conf) {
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
+ final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
+ conf.setCapacity(Q_R, 100);
+ final String Q_A = Q_R + "." + A;
+ conf.setQueues(Q_R, new String[] {A});
+ conf.setCapacity(Q_A, 100);
+ conf.setDouble(CapacitySchedulerConfiguration
+ .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f);
+ }
// Test CS recovery with multi-level queues and multi-users:
// 1. setup 2 NMs each with 8GB memory;
@@ -470,6 +484,70 @@ public class TestWorkPreservingRMRestart {
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
totalUsedResource.getVirtualCores());
}
+
+ //Test that we receive a meaningful exit-causing exception if a queue
+ //is removed during recovery
+ //1. Add some apps to two queues, attempt to add an app to a non-existant
+ // queue to verify that the new logic is not in effect during normal app
+ // submission
+ //2. Remove one of the queues, restart the RM
+ //3. Verify that the expected exception was thrown
+ @Test (timeout = 30000)
+ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
+ if (!schedulerClass.equals(CapacityScheduler.class)) {
+ return;
+ }
+ conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+ conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+ DominantResourceCalculator.class.getName());
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(conf);
+ setupQueueConfiguration(csConf);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(csConf);
+ rm1 = new MockRM(csConf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+ MockNM nm2 =
+ new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm2.registerNode();
+ RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A);
+ MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
+ RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A);
+ MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2);
+
+ RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ //Submit an app with a non existant queue to make sure it does not
+ //cause a fatal failure in the non-recovery case
+ RMApp appNA = rm1.submitApp(1024, "app1_2", USER_1, null,
+ QUEUE_DOESNT_EXIST, false);
+
+ // clear queue metrics
+ rm1.clearQueueMetrics(app1_1);
+ rm1.clearQueueMetrics(app1_2);
+ rm1.clearQueueMetrics(app2);
+
+ // Re-start RM
+ csConf =
+ new CapacitySchedulerConfiguration(conf);
+ setupQueueConfigurationOnlyA(csConf);
+ rm2 = new MockRM(csConf, memStore);
+ boolean runtimeThrown = false;
+ try {
+ rm2.start();
+ } catch (RuntimeException e) {
+ //we're catching it because we want to verify the message
+ //and we don't want to set it as an expected exception for the
+ //test because we only want it to happen here
+ assertTrue(e.getMessage().contains(B + " missing"));
+ runtimeThrown = true;
+ }
+ assertTrue(runtimeThrown);
+ }
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {