You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/12/13 17:20:08 UTC
hadoop git commit: YARN-7643. Handle recovery of applications in case
of auto-created leaf queue mapping. Contributed by Suma Shivaprasad.
Repository: hadoop
Updated Branches:
refs/heads/trunk 10fc8d2a7 -> cb87e4dc9
YARN-7643. Handle recovery of applications in case of auto-created leaf queue mapping. Contributed by Suma Shivaprasad.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cb87e4dc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cb87e4dc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cb87e4dc
Branch: refs/heads/trunk
Commit: cb87e4dc927731e32b0bbcf678bb5600835ff28d
Parents: 10fc8d2
Author: Sunil G <su...@apache.org>
Authored: Wed Dec 13 22:49:58 2017 +0530
Committer: Sunil G <su...@apache.org>
Committed: Wed Dec 13 22:49:58 2017 +0530
----------------------------------------------------------------------
.../server/resourcemanager/RMAppManager.java | 26 ++--
.../UserGroupMappingPlacementRule.java | 2 +
.../scheduler/capacity/AbstractCSQueue.java | 2 +-
.../scheduler/capacity/CapacityScheduler.java | 95 +++++++----
.../TestWorkPreservingRMRestart.java | 156 +++++++++++++++++++
...stCapacitySchedulerAutoCreatedQueueBase.java | 46 +++++-
6 files changed, 275 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb87e4dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index b21fb73..5ea1152 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -366,22 +366,20 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
String user, boolean isRecovery, long startTime) throws YarnException {
ApplicationPlacementContext placementContext = null;
+ try {
+ placementContext = placeApplication(rmContext, submissionContext, user);
+ } catch (YarnException e) {
+ String msg =
+ "Failed to place application " + submissionContext.getApplicationId()
+ + " to queue and specified " + "queue is invalid : "
+ + submissionContext.getQueue();
+ LOG.error(msg, e);
+ throw e;
+ }
- // We only do queue mapping when it's a new application
+ // We only replace the queue when it's a new application
if (!isRecovery) {
- try {
- // Do queue mapping
- placementContext = placeApplication(rmContext,
- submissionContext, user);
- replaceQueueFromPlacementContext(placementContext,
- submissionContext);
- } catch (YarnException e) {
- String msg = "Failed to place application " +
- submissionContext.getApplicationId() + " to queue and specified "
- + "queue is invalid : " + submissionContext.getQueue();
- LOG.error(msg, e);
- throw e;
- }
+ replaceQueueFromPlacementContext(placementContext, submissionContext);
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb87e4dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
index 9901f4a..d03b832 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
@@ -184,6 +184,8 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
if (mappedQueue != null) {
// We have a mapping, should we use it?
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+ //queueName will be same as mapped queue name in case of recovery
+ || queueName.equals(mappedQueue.getQueue())
|| overrideWithQueueMappings) {
LOG.info("Application " + applicationId + " user " + user
+ " mapping [" + queueName + "] to [" + mappedQueue
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb87e4dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 4df4cf2..9afbdd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -147,7 +147,7 @@ public abstract class AbstractCSQueue implements CSQueue {
this.metrics = old != null ?
(CSQueueMetrics) old.getMetrics() :
CSQueueMetrics.forQueue(getQueuePath(), parent,
- configuration.getEnableUserMetrics(), cs.getConf());
+ cs.getConfiguration().getEnableUserMetrics(), cs.getConf());
this.csContext = cs;
this.minimumAllocation = csContext.getMinimumResourceCapability();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb87e4dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 8de3631..000f59c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -650,24 +650,28 @@ public class CapacityScheduler extends
return this.queueManager.getQueue(queueName);
}
- private void addApplicationOnRecovery(
- ApplicationId applicationId, String queueName, String user,
- Priority priority) {
+ private void addApplicationOnRecovery(ApplicationId applicationId,
+ String queueName, String user,
+ Priority priority, ApplicationPlacementContext placementContext) {
try {
writeLock.lock();
- CSQueue queue = getQueue(queueName);
+ //check if the queue needs to be auto-created during recovery
+ CSQueue queue = getOrCreateQueueFromPlacementContext(applicationId, user,
+ queueName, placementContext, true);
+
if (queue == null) {
//During a restart, this indicates a queue was removed, which is
//not presently supported
if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.KILL,
- "Application killed on recovery as it was submitted to queue "
- + queueName + " which no longer exists after restart."));
+ "Application killed on recovery as it"
+ + " was submitted to queue " + queueName
+ + " which no longer exists after restart."));
return;
} else{
- String queueErrorMsg = "Queue named " + queueName
- + " missing during application recovery."
+ String queueErrorMsg = "Queue named " + queueName + " missing "
+ + "during application recovery."
+ " Queue removal during recovery is not presently "
+ "supported by the capacity scheduler, please "
+ "restart with all queues configured"
@@ -682,8 +686,8 @@ public class CapacityScheduler extends
if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.KILL,
- "Application killed on recovery as it was submitted to queue "
- + queueName
+ "Application killed on recovery as it was "
+ + "submitted to queue " + queueName
+ " which is no longer a leaf queue after restart."));
return;
} else{
@@ -719,6 +723,51 @@ public class CapacityScheduler extends
}
}
+ private CSQueue getOrCreateQueueFromPlacementContext(ApplicationId
+ applicationId, String user, String queueName,
+ ApplicationPlacementContext placementContext,
+ boolean isRecovery) {
+
+ CSQueue queue = getQueue(queueName);
+
+ if (queue == null) {
+ if (placementContext != null && placementContext.hasParentQueue()) {
+ try {
+ return autoCreateLeafQueue(placementContext);
+ } catch (YarnException | IOException e) {
+ if (isRecovery) {
+ if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
+ LOG.error("Could not auto-create leaf queue " + queueName +
+ " due to : ", e);
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.KILL,
+ "Application killed on recovery"
+ + " as it was submitted to queue " + queueName
+ + " which could not be auto-created"));
+ } else{
+ String queueErrorMsg =
+ "Queue named " + queueName + " could not be "
+ + "auto-created during application recovery.";
+ LOG.fatal(queueErrorMsg, e);
+ throw new QueueInvalidException(queueErrorMsg);
+ }
+ } else{
+ LOG.error("Could not auto-create leaf queue due to : ", e);
+ final String message =
+ "Application " + applicationId + " submission by user : "
+ + user
+ + " to queue : " + queueName + " failed : " + e
+ .getMessage();
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ }
+ }
+ }
+ }
+ return queue;
+ }
+
private void addApplication(ApplicationId applicationId, String queueName,
String user, Priority priority,
ApplicationPlacementContext placementContext) {
@@ -732,23 +781,10 @@ public class CapacityScheduler extends
message));
return;
}
- // Sanity checks.
- CSQueue queue = getQueue(queueName);
- if (queue == null && placementContext != null) {
- //Could be a potential auto-created leaf queue
- try {
- queue = autoCreateLeafQueue(placementContext);
- } catch (YarnException | IOException e) {
- LOG.error("Could not auto-create leaf queue due to : ", e);
- final String message =
- "Application " + applicationId + " submission by user : " + user
- + " to queue : " + queueName + " failed : " + e.getMessage();
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
- message));
- }
- }
+ //Could be a potential auto-created leaf queue
+ CSQueue queue = getOrCreateQueueFromPlacementContext(applicationId, user,
+ queueName, placementContext, false);
if (queue == null) {
final String message =
@@ -1534,7 +1570,8 @@ public class CapacityScheduler extends
appAddedEvent.getPlacementContext());
} else {
addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,
- appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
+ appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(),
+ appAddedEvent.getPlacementContext());
}
}
}
@@ -2058,10 +2095,10 @@ public class CapacityScheduler extends
+ " (should be set and be a PlanQueue or ManagedParentQueue)");
}
- AbstractManagedParentQueue parentPlan =
+ AbstractManagedParentQueue parent =
(AbstractManagedParentQueue) newQueue.getParent();
String queuename = newQueue.getQueueName();
- parentPlan.addChildQueue(newQueue);
+ parent.addChildQueue(newQueue);
this.queueManager.addQueue(queuename, newQueue);
LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb87e4dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 064e217..efde781 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -64,6 +64,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+ .TestCapacitySchedulerAutoCreatedQueueBase;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueMetrics;
@@ -97,6 +100,10 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER1;
+import static org.apache.hadoop.yarn.server.resourcemanager.webapp
+ .RMWebServices.DEFAULT_QUEUE;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -281,6 +288,18 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
}
}
+ private CapacitySchedulerConfiguration
+ getSchedulerAutoCreatedQueueConfiguration(
+ boolean overrideWithQueueMappings) throws IOException {
+ CapacitySchedulerConfiguration schedulerConf =
+ new CapacitySchedulerConfiguration(conf);
+ TestCapacitySchedulerAutoCreatedQueueBase
+ .setupQueueConfigurationForSingleAutoCreatedLeafQueue(schedulerConf);
+ TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(schedulerConf,
+ "c", overrideWithQueueMappings, new int[] {0, 1});
+ return schedulerConf;
+ }
+
// Test work preserving recovery of apps running under reservation.
// This involves:
// 1. Setting up a dynamic reservable queue,
@@ -1532,4 +1551,141 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState());
}
+
+ @Test(timeout = 30000)
+ public void testDynamicAutoCreatedQueueRecoveryWithDefaultQueue()
+ throws Exception {
+ //if queue name is not specified, it should submit to 'default' queue
+ testDynamicAutoCreatedQueueRecovery(USER1, null);
+ }
+
+ @Test(timeout = 30000)
+ public void testDynamicAutoCreatedQueueRecoveryWithOverrideQueueMappingFlag()
+ throws Exception {
+ testDynamicAutoCreatedQueueRecovery(USER1, USER1);
+ }
+
+ // Test work preserving recovery of apps running on auto-created queues.
+ // This involves:
+ // 1. Setting up a dynamic auto-created queue,
+ // 2. Submitting an app to it,
+ // 3. Failing over RM,
+ // 4. Validating that the app is recovered post failover,
+ // 5. Check if all running containers are recovered,
+ // 6. Verify the scheduler state like attempt info,
+ // 7. Verify the queue/user metrics for the dynamic auto-created queue.
+
+ public void testDynamicAutoCreatedQueueRecovery(String user, String queueName)
+ throws Exception {
+ conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+ conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+ DominantResourceCalculator.class.getName());
+ conf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true);
+
+ // 1. Set up dynamic auto-created queue.
+ CapacitySchedulerConfiguration schedulerConf = null;
+ if (queueName == null || queueName.equals(DEFAULT_QUEUE)) {
+ schedulerConf = getSchedulerAutoCreatedQueueConfiguration(false);
+ } else{
+ schedulerConf = getSchedulerAutoCreatedQueueConfiguration(true);
+ }
+ int containerMemory = 1024;
+ Resource containerResource = Resource.newInstance(containerMemory, 1);
+
+ rm1 = new MockRM(schedulerConf);
+ rm1.start();
+ MockNM nm1 = new MockNM("127.0.0.1:1234", 8192,
+ rm1.getResourceTrackerService());
+ nm1.registerNode();
+ // 2. submit app to queue which is auto-created.
+ RMApp app1 = rm1.submitApp(200, "autoCreatedQApp", user, null, queueName);
+ Resource amResources = app1.getAMResourceRequests().get(0).getCapability();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // clear queue metrics
+ rm1.clearQueueMetrics(app1);
+
+ // 3. Fail over (restart) RM.
+ rm2 = new MockRM(schedulerConf, rm1.getRMStateStore());
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ // 4. Validate app is recovered post failover.
+ RMApp recoveredApp1 = rm2.getRMContext().getRMApps().get(
+ app1.getApplicationId());
+ RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt();
+ NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus(
+ am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+ NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus(
+ am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+ NMContainerStatus completedContainer =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+ ContainerState.COMPLETE);
+
+ nm1.registerNode(
+ Arrays.asList(amContainer, runningContainer, completedContainer), null);
+
+ // Wait for RM to settle down on recovering containers.
+ waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
+ Set<ContainerId> launchedContainers =
+ ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
+ .getLaunchedContainers();
+ assertTrue(launchedContainers.contains(amContainer.getContainerId()));
+ assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
+
+ // 5. Check RMContainers are re-recreated and the container state is
+ // correct.
+ rm2.waitForState(nm1, amContainer.getContainerId(),
+ RMContainerState.RUNNING);
+ rm2.waitForState(nm1, runningContainer.getContainerId(),
+ RMContainerState.RUNNING);
+ rm2.waitForContainerToComplete(loadedAttempt1, completedContainer);
+
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm2.getResourceScheduler();
+ SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId());
+
+ // ********* check scheduler node state.*******
+ // 2 running containers.
+ Resource usedResources = Resources.multiply(containerResource, 2);
+ Resource nmResource = Resource.newInstance(nm1.getMemory(),
+ nm1.getvCores());
+
+ assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
+ assertTrue(
+ schedulerNode1.isValidContainer(runningContainer.getContainerId()));
+ assertFalse(
+ schedulerNode1.isValidContainer(completedContainer.getContainerId()));
+ // 2 launched containers, 1 completed container
+ assertEquals(2, schedulerNode1.getNumContainers());
+
+ assertEquals(Resources.subtract(nmResource, usedResources),
+ schedulerNode1.getUnallocatedResource());
+ assertEquals(usedResources, schedulerNode1.getAllocatedResource());
+ // Resource availableResources = Resources.subtract(nmResource,
+ // usedResources);
+
+ // 6. Verify the scheduler state like attempt info.
+ Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> sa =
+ ((AbstractYarnScheduler) rm2.getResourceScheduler())
+ .getSchedulerApplications();
+ SchedulerApplication<SchedulerApplicationAttempt> schedulerApp = sa.get(
+ recoveredApp1.getApplicationId());
+
+ // 7. Verify the queue/user metrics for the dynamic reservable queue.
+ if (getSchedulerType() == SchedulerType.CAPACITY) {
+ checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
+ }
+
+ // *********** check scheduler attempt state.********
+ SchedulerApplicationAttempt schedulerAttempt =
+ schedulerApp.getCurrentAppAttempt();
+ assertTrue(schedulerAttempt.getLiveContainers()
+ .contains(scheduler.getRMContainer(amContainer.getContainerId())));
+ assertTrue(schedulerAttempt.getLiveContainers()
+ .contains(scheduler.getRMContainer(runningContainer.getContainerId())));
+ assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
+
+ // *********** check appSchedulingInfo state ***********
+ assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb87e4dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
index d6282a1..035c460 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
@@ -100,6 +100,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
public static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
public static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
public static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
+ public static final String E = CapacitySchedulerConfiguration.ROOT + ".e";
public static final String A1 = A + ".a1";
public static final String A2 = A + ".a2";
public static final String B1 = B + ".b1";
@@ -129,8 +130,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
public static final String USER = "user_";
public static final String USER0 = USER + 0;
public static final String USER1 = USER + 1;
- public static final String USER3 = USER + 3;
public static final String USER2 = USER + 2;
+ public static final String USER3 = USER + 3;
public static final String PARENT_QUEUE = "c";
public static final Set<String> accessibleNodeLabelsOnC = new HashSet<>();
@@ -183,7 +184,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
- setupQueueMappings(conf);
+ setupQueueMappings(conf, PARENT_QUEUE, true, new int[] {0, 1, 2, 3});
dispatcher = new SpyDispatcher();
rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler();
@@ -225,27 +226,33 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
}
public static CapacitySchedulerConfiguration setupQueueMappings(
- CapacitySchedulerConfiguration conf) {
+ CapacitySchedulerConfiguration conf, String parentQueue, boolean
+ overrideWithQueueMappings, int[] userIds) {
List<String> queuePlacementRules = new ArrayList<>();
queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
conf.setQueuePlacementRules(queuePlacementRules);
+ List<UserGroupMappingPlacementRule.QueueMapping> existingMappings = conf
+ .getQueueMappings();
+
//set queue mapping
List<UserGroupMappingPlacementRule.QueueMapping> queueMappings =
new ArrayList<>();
- for (int i = 0; i <= 3; i++) {
+ for (int i = 0; i < userIds.length; i++) {
//Set C as parent queue name for auto queue creation
UserGroupMappingPlacementRule.QueueMapping userQueueMapping =
new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
- USER + i, getQueueMapping(PARENT_QUEUE, USER + i));
+ USER + userIds[i], getQueueMapping(parentQueue, USER +
+ userIds[i]));
queueMappings.add(userQueueMapping);
}
- conf.setQueueMappings(queueMappings);
+ existingMappings.addAll(queueMappings);
+ conf.setQueueMappings(existingMappings);
//override with queue mappings
- conf.setOverrideWithQueueMappings(true);
+ conf.setOverrideWithQueueMappings(overrideWithQueueMappings);
return conf;
}
@@ -327,6 +334,29 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
return conf;
}
+ public static CapacitySchedulerConfiguration
+ setupQueueConfigurationForSingleAutoCreatedLeafQueue(
+ CapacitySchedulerConfiguration conf) {
+
+ //setup new queues with one of them auto enabled
+ // Define top-level queues
+ // Set childQueue for root
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"c"});
+ conf.setCapacity(C, 100f);
+
+ conf.setUserLimitFactor(C, 1.0f);
+ conf.setAutoCreateChildQueueEnabled(C, true);
+
+ //Setup leaf queue template configs
+ conf.setAutoCreatedLeafQueueConfigCapacity(C, 100f);
+ conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f);
+ conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100);
+ conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f);
+
+ return conf;
+ }
+
@After
public void tearDown() throws Exception {
if (mockRM != null) {
@@ -395,7 +425,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
- setupQueueMappings(conf);
+ setupQueueMappings(conf, PARENT_QUEUE, true, new int[] {0, 1, 2, 3});
RMNodeLabelsManager mgr = setupNodeLabelManager(conf);
MockRM newMockRM = new MockRM(conf) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org