You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/01/12 05:32:04 UTC
[05/23] hadoop git commit: YARN-4479. Change CS LeafQueue
pendingOrderingPolicy to hornor recovered apps. Contributed by Rohith Sharma
K S
YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps. Contributed by Rohith Sharma K S
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/109e528e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/109e528e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/109e528e
Branch: refs/heads/HDFS-1312
Commit: 109e528ef5d8df07443373751266b4417acc981a
Parents: fd8065a
Author: Jian He <ji...@apache.org>
Authored: Fri Jan 8 15:51:10 2016 -0800
Committer: Jian He <ji...@apache.org>
Committed: Fri Jan 8 15:51:10 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../dev-support/findbugs-exclude.xml | 2 +
.../scheduler/SchedulerApplicationAttempt.java | 9 +
.../scheduler/capacity/CapacityScheduler.java | 2 +-
.../scheduler/capacity/LeafQueue.java | 63 ++++++-
.../scheduler/common/fica/FiCaSchedulerApp.java | 5 +-
.../capacity/TestApplicationPriority.java | 164 +++++++++++++++++++
.../scheduler/capacity/TestLeafQueue.java | 6 +-
8 files changed, 241 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 00d31d8..b896b06 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1229,6 +1229,9 @@ Release 2.8.0 - UNRELEASED
YARN-4546. ResourceManager crash due to scheduling opportunity overflow.
(Jason Lowe via junping_du)
+ YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps.
+ (Rohith Sharma K S via jianhe)
+
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 2d0d5d6..c79a35e 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -205,6 +205,8 @@
<Field name="userLimitFactor" />
<Field name="maxAMResourcePerQueuePercent" />
<Field name="lastClusterResource" />
+ <Field name="pendingOrderingPolicy" />
+ <Field name="pendingOPForRecoveredApps" />
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index c1f1c3d..b43c106 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -109,6 +109,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private LogAggregationContext logAggregationContext;
private volatile Priority appPriority = null;
+ private boolean isAttemptRecovering;
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
@@ -967,6 +968,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// queue's resource usage for specific partition
}
+ public boolean isAttemptRecovering() {
+ return isAttemptRecovering;
+ }
+
+ protected void setAttemptRecovering(boolean isRecovering) {
+ this.isAttemptRecovering = isRecovering;
+ }
+
public static enum AMState {
UNMANAGED("User launched the Application Master, since it's unmanaged. "),
INACTIVATED("Application is added to the scheduler and is not yet activated. "),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index c7b73fb..b3b9713 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -783,7 +783,7 @@ public class CapacityScheduler extends
FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
- application.getPriority());
+ application.getPriority(), isAttemptRecovering);
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(
application.getCurrentAppAttempt());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 5c3f4b9..ff7d04f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -95,6 +95,9 @@ public class LeafQueue extends AbstractCSQueue {
private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
+ // Always give preference to this while activating the application attempts.
+ private OrderingPolicy<FiCaSchedulerApp> pendingOPForRecoveredApps = null;
+
private volatile float minimumAllocationFactor;
private Map<String, User> users = new HashMap<String, User>();
@@ -156,6 +159,8 @@ public class LeafQueue extends AbstractCSQueue {
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
setPendingAppsOrderingPolicy(conf
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
+ setPendingAppsOrderingPolicyRecovery(conf
+ .<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
userLimit = conf.getUserLimit(getQueuePath());
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
@@ -320,7 +325,8 @@ public class LeafQueue extends AbstractCSQueue {
}
public synchronized int getNumPendingApplications() {
- return pendingOrderingPolicy.getNumSchedulableEntities();
+ return pendingOrderingPolicy.getNumSchedulableEntities()
+ + pendingOPForRecoveredApps.getNumSchedulableEntities();
}
public synchronized int getNumActiveApplications() {
@@ -599,9 +605,19 @@ public class LeafQueue extends AbstractCSQueue {
Map<String, Resource> userAmPartitionLimit =
new HashMap<String, Resource>();
- for (Iterator<FiCaSchedulerApp> i = getPendingAppsOrderingPolicy()
- .getAssignmentIterator(); i.hasNext();) {
- FiCaSchedulerApp application = i.next();
+ activateApplications(getPendingAppsOrderingPolicyRecovery()
+ .getAssignmentIterator(), amPartitionLimit, userAmPartitionLimit);
+
+ activateApplications(
+ getPendingAppsOrderingPolicy().getAssignmentIterator(),
+ amPartitionLimit, userAmPartitionLimit);
+ }
+
+ private synchronized void activateApplications(
+ Iterator<FiCaSchedulerApp> fsApp, Map<String, Resource> amPartitionLimit,
+ Map<String, Resource> userAmPartitionLimit) {
+ while (fsApp.hasNext()) {
+ FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId();
// Get the am-node-partition associated with each application
@@ -692,7 +708,7 @@ public class LeafQueue extends AbstractCSQueue {
metrics.incAMUsed(application.getUser(),
application.getAMResource(partitionName));
metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
- i.remove();
+ fsApp.remove();
LOG.info("Application " + applicationId + " from user: "
+ application.getUser() + " activated in queue: " + getQueueName());
}
@@ -702,7 +718,11 @@ public class LeafQueue extends AbstractCSQueue {
User user) {
// Accept
user.submitApplication();
- getPendingAppsOrderingPolicy().addSchedulableEntity(application);
+ if (application.isAttemptRecovering()) {
+ getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application);
+ } else {
+ getPendingAppsOrderingPolicy().addSchedulableEntity(application);
+ }
applicationAttemptMap.put(application.getApplicationAttemptId(), application);
// Activate applications
@@ -742,7 +762,11 @@ public class LeafQueue extends AbstractCSQueue {
boolean wasActive =
orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) {
- pendingOrderingPolicy.removeSchedulableEntity(application);
+ if (application.isAttemptRecovering()) {
+ pendingOPForRecoveredApps.removeSchedulableEntity(application);
+ } else {
+ pendingOrderingPolicy.removeSchedulableEntity(application);
+ }
} else {
queueUsage.decAMUsed(partitionName,
application.getAMResource(partitionName));
@@ -1491,7 +1515,11 @@ public class LeafQueue extends AbstractCSQueue {
* Obtain (read-only) collection of pending applications.
*/
public Collection<FiCaSchedulerApp> getPendingApplications() {
- return pendingOrderingPolicy.getSchedulableEntities();
+ Collection<FiCaSchedulerApp> pendingApps =
+ new ArrayList<FiCaSchedulerApp>();
+ pendingApps.addAll(pendingOPForRecoveredApps.getSchedulableEntities());
+ pendingApps.addAll(pendingOrderingPolicy.getSchedulableEntities());
+ return pendingApps;
}
/**
@@ -1535,6 +1563,10 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public synchronized void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
+ for (FiCaSchedulerApp pendingApp : pendingOPForRecoveredApps
+ .getSchedulableEntities()) {
+ apps.add(pendingApp.getApplicationAttemptId());
+ }
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
@@ -1670,6 +1702,21 @@ public class LeafQueue extends AbstractCSQueue {
this.pendingOrderingPolicy = pendingOrderingPolicy;
}
+ public synchronized OrderingPolicy<FiCaSchedulerApp>
+ getPendingAppsOrderingPolicyRecovery() {
+ return pendingOPForRecoveredApps;
+ }
+
+ public synchronized void setPendingAppsOrderingPolicyRecovery(
+ OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicyRecovery) {
+ if (null != this.pendingOPForRecoveredApps) {
+ pendingOrderingPolicyRecovery
+ .addAllSchedulableEntities(this.pendingOPForRecoveredApps
+ .getSchedulableEntities());
+ }
+ this.pendingOPForRecoveredApps = pendingOrderingPolicyRecovery;
+ }
+
/*
* Holds shared values used by all applications in
* the queue to calculate headroom on demand
http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index c9c792e..4b88415 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -99,12 +99,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
- Priority.newInstance(0));
+ Priority.newInstance(0), false);
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
- RMContext rmContext, Priority appPriority) {
+ RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
@@ -129,6 +129,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
setAppAMNodePartitionName(partition);
setAMResource(partition, amResource);
setPriority(appPriority);
+ setAttemptRecovering(isAttemptRecovering);
scheduler = rmContext.getScheduler();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
index 169e9f6..2ad805a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@@ -567,4 +569,166 @@ public class TestApplicationPriority {
Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size());
rm.stop();
}
+
+ /**
+ * <p>
+ * Test case verifies the order of applications activated after RM Restart.
+ * </p>
+ * <li>App-1 and app-2 submitted and scheduled and running with a priority
+ * 5 and 6 Respectively</li>
+ * <li>App-3 submitted and scheduled with a priority 7. This
+ * is not activated since AMResourceLimit is reached</li>
+ * <li>RM restarted</li>
+ * <li>App-1 get activated nevertheless of AMResourceLimit</li>
+ * <li>App-2 and app-3 put in pendingOrderingPolicy</li>
+ * <li>After NM registration, app-3 is activated</li>
+ * <p>
+ * Expected Output : App-2 must get activated since app-2 was running earlier
+ * </p>
+ * @throws Exception
+ */
+ @Test
+ public void testOrderOfActivatingThePriorityApplicationOnRMRestart()
+ throws Exception {
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+ conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+ final DrainDispatcher dispatcher = new DrainDispatcher();
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MockRM rm1 = new MockRM(conf, memStore) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+ rm1.start();
+
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ dispatcher.await();
+
+ ResourceScheduler scheduler = rm1.getRMContext().getScheduler();
+ LeafQueue defaultQueue =
+ (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
+ int memory = defaultQueue.getAMResourceLimit().getMemory() / 2;
+
+ // App-1 with priority 5 submitted and running
+ Priority appPriority1 = Priority.newInstance(5);
+ RMApp app1 = rm1.submitApp(memory, appPriority1);
+ MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
+ am1.registerAppAttempt();
+
+ // App-2 with priority 6 submitted and running
+ Priority appPriority2 = Priority.newInstance(6);
+ RMApp app2 = rm1.submitApp(memory, appPriority2);
+ MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
+ am2.registerAppAttempt();
+
+ dispatcher.await();
+ Assert.assertEquals(2, defaultQueue.getNumActiveApplications());
+ Assert.assertEquals(0, defaultQueue.getNumPendingApplications());
+
+ // App-3 with priority 7 submitted and scheduled. But not activated since
+ // AMResourceLimit threshold
+ Priority appPriority3 = Priority.newInstance(7);
+ RMApp app3 = rm1.submitApp(memory, appPriority3);
+
+ dispatcher.await();
+ Assert.assertEquals(2, defaultQueue.getNumActiveApplications());
+ Assert.assertEquals(1, defaultQueue.getNumPendingApplications());
+
+ Iterator<FiCaSchedulerApp> iterator =
+ defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator();
+ FiCaSchedulerApp fcApp2 = iterator.next();
+ Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(),
+ fcApp2.getApplicationAttemptId());
+
+ FiCaSchedulerApp fcApp1 = iterator.next();
+ Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(),
+ fcApp1.getApplicationAttemptId());
+
+ iterator = defaultQueue.getPendingApplications().iterator();
+ FiCaSchedulerApp fcApp3 = iterator.next();
+ Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(),
+ fcApp3.getApplicationAttemptId());
+
+ final DrainDispatcher dispatcher1 = new DrainDispatcher();
+ // create new RM to represent restart and recover state
+ MockRM rm2 = new MockRM(conf, memStore) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher1;
+ }
+ };
+
+ // start new RM
+ rm2.start();
+ // change NM to point to new RM
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+ // Verify RM Apps after this restart
+ Assert.assertEquals(3, rm2.getRMContext().getRMApps().size());
+
+ dispatcher1.await();
+ scheduler = rm2.getRMContext().getScheduler();
+ defaultQueue =
+ (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
+
+ // wait for all applications to get added to scheduler
+ int count = 5;
+ while (count-- > 0) {
+ if ((defaultQueue.getNumActiveApplications() + defaultQueue
+ .getNumPendingApplications()) == 3) {
+ break;
+ }
+ Thread.sleep(500);
+ }
+
+ // Before NM registration, AMResourceLimit threshold is 0. So 1st
+ // applications get activated nevertheless of AMResourceLimit threshold
+ // Two applications are in pending
+ Assert.assertEquals(1, defaultQueue.getNumActiveApplications());
+ Assert.assertEquals(2, defaultQueue.getNumPendingApplications());
+
+ // NM resync to new RM
+ nm1.registerNode();
+ dispatcher1.await();
+
+ // wait for activating one applications
+ count = 5;
+ while (count-- > 0) {
+ if (defaultQueue.getOrderingPolicy().getSchedulableEntities().size() == 2) {
+ break;
+ }
+ Thread.sleep(500);
+ }
+
+ // verify for order of activated applications iterator
+ iterator =
+ defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator();
+ fcApp2 = iterator.next();
+ Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(),
+ fcApp2.getApplicationAttemptId());
+
+ fcApp1 = iterator.next();
+ Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(),
+ fcApp1.getApplicationAttemptId());
+
+ // verify for pending application iterator. It should be app-3 attempt
+ iterator = defaultQueue.getPendingApplications().iterator();
+ fcApp3 = iterator.next();
+ Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(),
+ fcApp3.getApplicationAttemptId());
+
+ rm2.stop();
+ rm1.stop();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 479e25a..d4b8dcd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -2413,14 +2413,16 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3)));
+ mock(ActiveUsersManager.class), spyRMContext,
+ Priority.newInstance(3), false));
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5)));
+ mock(ActiveUsersManager.class), spyRMContext,
+ Priority.newInstance(5), false));
a.submitApplicationAttempt(app_1, user_0);
Priority priority = TestUtils.createMockPriority(1);