You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2012/11/30 20:52:57 UTC
svn commit: r1415809 [2/3] - in
/hadoop/common/branches/HDFS-2802/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/
hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/...
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Nov 30 19:52:49 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -50,7 +51,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -75,6 +76,25 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+/**
+ * A scheduler that schedules resources between a set of queues. The scheduler
+ * keeps track of the resources used by each queue, and attempts to maintain
+ * fairness by scheduling tasks at queues whose allocations are farthest below
+ * an ideal fair distribution.
+ *
+ * The fair scheduler supports hierarchical queues. All queues descend from a
+ * queue named "root". Available resources are distributed among the children
+ * of the root queue in the typical fair scheduling fashion. Then, the children
+ * distribute the resources assigned to them to their children in the same
+ * fashion. Applications may only be scheduled on leaf queues. Queues can be
+ * specified as children of other queues by placing them as sub-elements of their
+ * parents in the fair scheduler configuration file.
+ *
+ * A queue's name starts with the names of its parents, with periods as
+ * separators. So a queue named "queue1" under the root named, would be
+ * referred to as "root.queue1", and a queue named "queue2" under a queue
+ * named "parent1" would be referred to as "root.parent1.queue2".
+ */
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
@@ -105,23 +125,22 @@ public class FairScheduler implements Re
// Aggregate metrics
QueueMetrics rootMetrics;
- //Time when we last updated preemption vars
+ // Time when we last updated preemption vars
protected long lastPreemptionUpdateTime;
- //Time we last ran preemptTasksIfNecessary
+ // Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
-
// This stores per-application scheduling information, indexed by
// attempt ID's for fast lookup.
- protected Map<ApplicationAttemptId, FSSchedulerApp> applications
- = new HashMap<ApplicationAttemptId, FSSchedulerApp>();
+ protected Map<ApplicationAttemptId, FSSchedulerApp> applications =
+ new HashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId
- private Map<NodeId, FSSchedulerNode> nodes =
+ private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
// Aggregate capacity of the cluster
- private Resource clusterCapacity =
+ private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
// How often tasks are preempted (must be longer than a couple
@@ -131,10 +150,11 @@ public class FairScheduler implements Re
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
- protected double nodeLocalityThreshold; // Cluster threshold for node locality
- protected double rackLocalityThreshold; // Cluster threshold for rack locality
- private FairSchedulerEventLog eventLog; // Machine-readable event log
- protected boolean assignMultiple; // Allocate multiple containers per heartbeat
+ protected double nodeLocalityThreshold; // Cluster threshold for node locality
+ protected double rackLocalityThreshold; // Cluster threshold for rack locality
+ private FairSchedulerEventLog eventLog; // Machine-readable event log
+ protected boolean assignMultiple; // Allocate multiple containers per
+ // heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
public FairScheduler() {
@@ -150,16 +170,8 @@ public class FairScheduler implements Re
return queueMgr;
}
- public List<FSQueueSchedulable> getQueueSchedulables() {
- List<FSQueueSchedulable> scheds = new ArrayList<FSQueueSchedulable>();
- for (FSQueue queue: queueMgr.getQueues()) {
- scheds.add(queue.getQueueSchedulable());
- }
- return scheds;
- }
-
private RMContainer getRMContainer(ContainerId containerId) {
- FSSchedulerApp application =
+ FSSchedulerApp application =
applications.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
@@ -183,34 +195,24 @@ public class FairScheduler implements Re
}
/**
- * Recompute the internal variables used by the scheduler - per-job weights,
- * fair shares, deficits, minimum slot allocations, and amount of used and
- * required resources per job.
- */
+ * Recompute the internal variables used by the scheduler - per-job weights,
+ * fair shares, deficits, minimum slot allocations, and amount of used and
+ * required resources per job.
+ */
protected synchronized void update() {
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
updateRunnability(); // Set job runnability based on user/queue limits
updatePreemptionVariables(); // Determine if any queues merit preemption
- // Update demands of apps and queues
- for (FSQueue queue: queueMgr.getQueues()) {
- queue.getQueueSchedulable().updateDemand();
- }
-
- // Compute fair shares based on updated demands
- List<FSQueueSchedulable> queueScheds = getQueueSchedulables();
- SchedulingAlgorithms.computeFairShares(
- queueScheds, clusterCapacity);
+ FSQueue rootQueue = queueMgr.getRootQueue();
- // Update queue metrics for this queue
- for (FSQueueSchedulable sched : queueScheds) {
- sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
- }
+ // Recursively update demands for all queues
+ rootQueue.updateDemand();
- // Use the computed shares to assign shares within each queue
- for (FSQueue queue: queueMgr.getQueues()) {
- queue.getQueueSchedulable().redistributeShare();
- }
+ rootQueue.setFairShare(clusterCapacity);
+ // Recursively compute fair shares for all queues
+ // and update metrics
+ rootQueue.recomputeFairShares();
// Update recorded capacity of root queue (child queues are updated
// when fair share is calculated).
@@ -225,7 +227,7 @@ public class FairScheduler implements Re
private void updatePreemptionVariables() {
long now = clock.getTime();
lastPreemptionUpdateTime = now;
- for (FSQueueSchedulable sched: getQueueSchedulables()) {
+ for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
if (!isStarvedForMinShare(sched)) {
sched.setLastTimeAtMinShare(now);
}
@@ -238,16 +240,16 @@ public class FairScheduler implements Re
/**
* Is a queue below its min share for the given task type?
*/
- boolean isStarvedForMinShare(FSQueueSchedulable sched) {
+ boolean isStarvedForMinShare(FSLeafQueue sched) {
Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredShare);
}
/**
- * Is a queue being starved for fair share for the given task type?
- * This is defined as being below half its fair share.
+ * Is a queue being starved for fair share for the given task type? This is
+ * defined as being below half its fair share.
*/
- boolean isStarvedForFairShare(FSQueueSchedulable sched) {
+ boolean isStarvedForFairShare(FSLeafQueue sched) {
Resource desiredFairShare = Resources.max(
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredFairShare);
@@ -255,10 +257,10 @@ public class FairScheduler implements Re
/**
* Check for queues that need tasks preempted, either because they have been
- * below their guaranteed share for minSharePreemptionTimeout or they
- * have been below half their fair share for the fairSharePreemptionTimeout.
- * If such queues exist, compute how many tasks of each type need to be
- * preempted and then select the right ones using preemptTasks.
+ * below their guaranteed share for minSharePreemptionTimeout or they have
+ * been below half their fair share for the fairSharePreemptionTimeout. If
+ * such queues exist, compute how many tasks of each type need to be preempted
+ * and then select the right ones using preemptTasks.
*/
protected synchronized void preemptTasksIfNecessary() {
if (!preemptionEnabled) {
@@ -273,35 +275,37 @@ public class FairScheduler implements Re
Resource resToPreempt = Resources.none();
- for (FSQueueSchedulable sched: getQueueSchedulables()) {
+ for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
}
if (Resources.greaterThan(resToPreempt, Resources.none())) {
- preemptResources(getQueueSchedulables(), resToPreempt);
+ preemptResources(queueMgr.getLeafQueues(), resToPreempt);
}
}
/**
- * Preempt a quantity of resources from a list of QueueSchedulables.
- * The policy for this is to pick apps from queues that are over their fair
- * share, but make sure that no queue is placed below its fair share in the
- * process. We further prioritize preemption by choosing containers with
- * lowest priority to preempt.
+ * Preempt a quantity of resources from a list of QueueSchedulables. The
+ * policy for this is to pick apps from queues that are over their fair share,
+ * but make sure that no queue is placed below its fair share in the process.
+ * We further prioritize preemption by choosing containers with lowest
+ * priority to preempt.
*/
- protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) {
+ protected void preemptResources(Collection<FSLeafQueue> scheds,
+ Resource toPreempt) {
if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
return;
}
Map<RMContainer, FSSchedulerApp> apps =
new HashMap<RMContainer, FSSchedulerApp>();
- Map<RMContainer, FSQueueSchedulable> queues = new HashMap<RMContainer, FSQueueSchedulable>();
+ Map<RMContainer, FSLeafQueue> queues =
+ new HashMap<RMContainer, FSLeafQueue>();
// Collect running containers from over-scheduled queues
List<RMContainer> runningContainers = new ArrayList<RMContainer>();
- for (FSQueueSchedulable sched: scheds) {
+ for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
- for (AppSchedulable as: sched.getAppSchedulables()) {
+ for (AppSchedulable as : sched.getAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c);
apps.put(c, as.getApp());
@@ -321,12 +325,12 @@ public class FairScheduler implements Re
// Scan down the sorted list of task statuses until we've killed enough
// tasks, making sure we don't kill too many from any queue
- for (RMContainer container: runningContainers) {
- FSQueueSchedulable sched = queues.get(container);
+ for (RMContainer container : runningContainers) {
+ FSLeafQueue sched = queues.get(container);
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
- ") from queue " + sched.getQueue().getName());
+ ") from queue " + sched.getName());
ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
@@ -348,12 +352,12 @@ public class FairScheduler implements Re
* If the queue has been below its min share for at least its preemption
* timeout, it should preempt the difference between its current share and
* this min share. If it has been below half its fair share for at least the
- * fairSharePreemptionTimeout, it should preempt enough tasks to get up to
- * its full fair share. If both conditions hold, we preempt the max of the
- * two amounts (this shouldn't happen unless someone sets the timeouts to
- * be identical for some reason).
+ * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
+ * full fair share. If both conditions hold, we preempt the max of the two
+ * amounts (this shouldn't happen unless someone sets the timeouts to be
+ * identical for some reason).
*/
- protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) {
+ protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
String queue = sched.getName();
long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
@@ -362,7 +366,7 @@ public class FairScheduler implements Re
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.min(sched.getMinShare(), sched.getDemand());
resDueToMinShare = Resources.max(Resources.none(),
- Resources.subtract(target, sched.getResourceUsage()));
+ Resources.subtract(target, sched.getResourceUsage()));
}
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
Resource target = Resources.min(sched.getFairShare(), sched.getDemand());
@@ -380,15 +384,15 @@ public class FairScheduler implements Re
}
/**
- * This updates the runnability of all apps based on whether or not
- * any users/queues have exceeded their capacity.
+ * This updates the runnability of all apps based on whether or not any
+ * users/queues have exceeded their capacity.
*/
private void updateRunnability() {
List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
// Start by marking everything as not runnable
- for (FSQueue p: queueMgr.getQueues()) {
- for (AppSchedulable a: p.getQueueSchedulable().getAppSchedulables()) {
+ for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
+ for (AppSchedulable a : leafQueue.getAppSchedulables()) {
a.setRunnable(false);
apps.add(a);
}
@@ -400,7 +404,7 @@ public class FairScheduler implements Re
Map<String, Integer> userApps = new HashMap<String, Integer>();
Map<String, Integer> queueApps = new HashMap<String, Integer>();
- for (AppSchedulable app: apps) {
+ for (AppSchedulable app : apps) {
String user = app.getApp().getUser();
String queue = app.getApp().getQueueName();
int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
@@ -473,22 +477,25 @@ public class FairScheduler implements Re
}
/**
- * Add a new application to the scheduler, with a given id, queue name,
- * and user. This will accept a new app even if the user or queue is above
+ * Add a new application to the scheduler, with a given id, queue name, and
+ * user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
- protected synchronized void
- addApplication(ApplicationAttemptId applicationAttemptId,
- String queueName, String user) {
+ protected synchronized void addApplication(
+ ApplicationAttemptId applicationAttemptId, String queueName, String user) {
- FSQueue queue = queueMgr.getQueue(queueName);
+ FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
+ if (queue == null) {
+ // queue is not an existing or createable leaf queue
+ queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ }
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user,
- queue.getQueueSchedulable(), new ActiveUsersManager(getRootQueueMetrics()),
- rmContext, null);
-
- // Inforce ACLs
+ queue, new ActiveUsersManager(getRootQueueMetrics()),
+ rmContext);
+
+ // Enforce ACLs
UserGroupInformation userUgi;
try {
userUgi = UserGroupInformation.getCurrentUser();
@@ -497,8 +504,8 @@ public class FairScheduler implements Re
return;
}
- List<QueueUserACLInfo> info = queue.getQueueSchedulable().getQueueUserAclInfo(
- userUgi); // Always a signleton list
+ // Always a singleton list
+ List<QueueUserACLInfo> info = queue.getQueueUserAclInfo(userUgi);
if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) {
LOG.info("User " + userUgi.getUserName() +
" cannot submit" + " applications to queue " + queue.getName());
@@ -506,14 +513,13 @@ public class FairScheduler implements Re
}
queue.addApp(schedulerApp);
- queue.getQueueSchedulable().getMetrics().submitApp(user,
- applicationAttemptId.getAttemptId());
+ queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
rootMetrics.submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp);
LOG.info("Application Submission: " + applicationAttemptId +
- ", user: " + user +
+ ", user: "+ user +
", currently active: " + applications.size());
rmContext.getDispatcher().getEventHandler().handle(
@@ -540,10 +546,10 @@ public class FairScheduler implements Re
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
+ RMContainerEventType.KILL);
}
- // Release all reserved containers
+ // Release all reserved containers
for (RMContainer rmContainer : application.getReservedContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
@@ -556,7 +562,8 @@ public class FairScheduler implements Re
application.stop(rmAppAttemptFinalState);
// Inform the queue
- FSQueue queue = queueMgr.getQueue(application.getQueue().getQueueName());
+ FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
+ .getQueueName());
queue.removeApp(application);
// Remove from our data-structure
@@ -658,11 +665,11 @@ public class FairScheduler implements Re
for (ContainerId releasedContainerId : release) {
RMContainer rmContainer = getRMContainer(releasedContainerId);
if (rmContainer == null) {
- RMAuditLogger.logFailure(application.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "FairScheduler",
- "Trying to release container not owned by app or with invalid id",
- application.getApplicationId(), releasedContainerId);
+ RMAuditLogger.logFailure(application.getUser(),
+ AuditConstants.RELEASE_CONTAINER,
+ "Unauthorized access or invalid container", "FairScheduler",
+ "Trying to release container not owned by app or with invalid id",
+ application.getApplicationId(), releasedContainerId);
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
@@ -675,8 +682,8 @@ public class FairScheduler implements Re
if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: pre-update" +
- " applicationAttemptId=" + appAttemptId +
- " application=" + application.getApplicationId());
+ " applicationAttemptId=" + appAttemptId +
+ " application=" + application.getApplicationId());
}
application.showRequests();
@@ -689,19 +696,17 @@ public class FairScheduler implements Re
if (LOG.isDebugEnabled()) {
LOG.debug("allocate:" +
- " applicationAttemptId=" + appAttemptId +
- " #ask=" + ask.size());
+ " applicationAttemptId=" + appAttemptId +
+ " #ask=" + ask.size());
}
- return new Allocation(
- application.pullNewlyAllocatedContainers(),
+ return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom());
}
}
/**
- * Process a container which has launched on a node, as reported by the
- * node.
+ * Process a container which has launched on a node, as reported by the node.
*/
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
@@ -723,7 +728,9 @@ public class FairScheduler implements Re
private synchronized void nodeUpdate(RMNode nm,
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
- LOG.info("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+ }
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = nodes.get(nm.getNodeID());
@@ -755,20 +762,20 @@ public class FairScheduler implements Re
LOG.info("Trying to fulfill reservation for application " +
reservedApplication.getApplicationId() + " on node: " + nm);
- FSQueue queue = queueMgr.getQueue(reservedApplication.getQueueName());
- queue.getQueueSchedulable().assignContainer(node, true);
+ FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName());
+ queue.assignContainer(node, true);
}
-
// Otherwise, schedule at queue which is furthest below fair share
else {
int assignedContainers = 0;
while (true) {
// At most one task is scheduled each iteration of this loop
- List<FSQueueSchedulable> scheds = getQueueSchedulables();
+ List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
+ queueMgr.getLeafQueues());
Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
boolean assignedContainer = false;
- for (FSQueueSchedulable sched : scheds) {
+ for (FSLeafQueue sched : scheds) {
Resource assigned = sched.assignContainer(node, false);
if (Resources.greaterThan(assigned, Resources.none())) {
eventLog.log("ASSIGN", nm.getHostName(), assigned);
@@ -811,7 +818,7 @@ public class FairScheduler implements Re
@Override
public void handle(SchedulerEvent event) {
- switch(event.getType()) {
+ switch (event.getType()) {
case NODE_ADDED:
if (!(event instanceof NodeAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
@@ -830,8 +837,7 @@ public class FairScheduler implements Re
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
- NodeUpdateSchedulerEvent nodeUpdatedEvent =
- (NodeUpdateSchedulerEvent)event;
+ NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
@@ -840,7 +846,7 @@ public class FairScheduler implements Re
if (!(event instanceof AppAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
- AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+ AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
String queue = appAddedEvent.getQueue();
// Potentially set queue to username if configured to do so
@@ -865,7 +871,7 @@ public class FairScheduler implements Re
throw new RuntimeException("Unexpected event type: " + event);
}
ContainerExpiredSchedulerEvent containerExpiredEvent =
- (ContainerExpiredSchedulerEvent) event;
+ (ContainerExpiredSchedulerEvent)event;
ContainerId containerId = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus(
@@ -884,8 +890,8 @@ public class FairScheduler implements Re
}
@Override
- public synchronized void
- reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+ throws IOException {
if (!initialized) {
this.conf = new FairSchedulerConfiguration(conf);
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
@@ -907,11 +913,10 @@ public class FairScheduler implements Re
try {
queueMgr.initialize();
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
-
+
Thread updateThread = new Thread(new UpdateThread());
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
@@ -923,10 +928,9 @@ public class FairScheduler implements Re
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
preemptionEnabled = this.conf.getPreemptionEnabled();
try {
- queueMgr.reloadAllocs();
+ queueMgr.reloadAllocs();
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e);
}
}
@@ -938,8 +942,8 @@ public class FairScheduler implements Re
if (!queueMgr.exists(queueName)) {
return null;
}
- return queueMgr.getQueue(queueName).getQueueSchedulable().getQueueInfo(
- includeChildQueues, recursive);
+ return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
+ recursive);
}
@Override
@@ -951,12 +955,7 @@ public class FairScheduler implements Re
return new ArrayList<QueueUserACLInfo>();
}
- List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
-
- for (FSQueue queue : queueMgr.getQueues()) {
- userAcls.addAll(queue.getQueueSchedulable().getQueueUserAclInfo(user));
- }
- return userAcls;
+ return queueMgr.getRootQueue().getQueueUserAclInfo(user);
}
@Override
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Fri Nov 30 19:52:49 2012
@@ -18,7 +18,7 @@ public class FairSchedulerConfiguration
/** Whether to use the user name as the queue name (instead of "default") if
* the request does not specify a queue. */
protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
- protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = false;
+ protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
protected static final String LOCALITY_THRESHOLD = CONF_PREFIX + "locality.threshold";
protected static final float DEFAULT_LOCALITY_THRESHOLD = -1.0f;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java Fri Nov 30 19:52:49 2012
@@ -77,12 +77,11 @@ class FairSchedulerEventLog {
boolean init(FairSchedulerConfiguration conf) {
try {
logDir = conf.getEventlogDir();
- Path logDirPath = new Path(logDir);
- FileSystem fs = logDirPath.getFileSystem(conf);
- if (!fs.exists(logDirPath)) {
- if (!fs.mkdirs(logDirPath)) {
+ File logDirFile = new File(logDir);
+ if (!logDirFile.exists()) {
+ if (!logDirFile.mkdirs()) {
throw new IOException(
- "Mkdirs failed to create " + logDirPath.toString());
+ "Mkdirs failed to create " + logDirFile.toString());
}
}
String username = System.getProperty("user.name");
@@ -142,4 +141,8 @@ class FairSchedulerEventLog {
synchronized boolean isEnabled() {
return !logDisabled;
}
+
+ public String getLogFile() {
+ return logFile;
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Nov 30 19:52:49 2012
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -52,6 +53,7 @@ import org.xml.sax.SAXException;
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
+ *
*/
@Private
@Unstable
@@ -59,6 +61,8 @@ public class QueueManager {
public static final Log LOG = LogFactory.getLog(
QueueManager.class.getName());
+ public static final String ROOT_QUEUE = "root";
+
/** Time to wait between checks of the allocation file */
public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
@@ -76,7 +80,10 @@ public class QueueManager {
// used) or a String to specify an absolute path (if
// mapred.fairscheduler.allocation.file is used).
- private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+ private final Collection<FSLeafQueue> leafQueues =
+ new CopyOnWriteArrayList<FSLeafQueue>();
+ private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+ private FSParentQueue rootQueue;
private volatile QueueManagerInfo info = new QueueManagerInfo();
@@ -87,10 +94,17 @@ public class QueueManager {
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
+
+ public FSParentQueue getRootQueue() {
+ return rootQueue;
+ }
public void initialize() throws IOException, SAXException,
AllocationConfigurationException, ParserConfigurationException {
FairSchedulerConfiguration conf = scheduler.getConf();
+ rootQueue = new FSParentQueue("root", this, scheduler, null);
+ queues.put(rootQueue.getName(), rootQueue);
+
this.allocFile = conf.getAllocationFile();
if (allocFile == null) {
// No allocation file specified in jobconf. Use the default allocation
@@ -106,21 +120,106 @@ public class QueueManager {
lastSuccessfulReload = scheduler.getClock().getTime();
lastReloadAttempt = scheduler.getClock().getTime();
// Create the default queue
- getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
-
+
/**
- * Get a queue by name, creating it if necessary
- */
- public FSQueue getQueue(String name) {
+ * Get a queue by name, creating it if necessary. If the queue
+ * is not or can not be a leaf queue, i.e. it already exists as a parent queue,
+ * or one of the parents in its name is already a leaf queue, null is returned.
+ *
+ * The root part of the name is optional, so a queue underneath the root
+ * named "queue1" could be referred to as just "queue1", and a queue named
+ * "queue2" underneath a parent named "parent1" that is underneath the root
+ * could be referred to as just "parent1.queue2".
+ */
+ public FSLeafQueue getLeafQueue(String name) {
+ if (!name.startsWith(ROOT_QUEUE + ".")) {
+ name = ROOT_QUEUE + "." + name;
+ }
synchronized (queues) {
FSQueue queue = queues.get(name);
if (queue == null) {
- queue = new FSQueue(scheduler, name);
- queue.setSchedulingMode(info.defaultSchedulingMode);
- queues.put(name, queue);
+ FSLeafQueue leafQueue = createLeafQueue(name);
+ if (leafQueue == null) {
+ return null;
+ }
+ leafQueue.setSchedulingMode(info.defaultSchedulingMode);
+ queue = leafQueue;
+ } else if (queue instanceof FSParentQueue) {
+ return null;
+ }
+ return (FSLeafQueue)queue;
+ }
+ }
+
+ /**
+ * Creates a leaf queue and places it in the tree. Creates any
+ * parents that don't already exist.
+ *
+ * @return
+ * the created queue, if successful. null if not allowed (one of the parent
+ * queues in the queue name is already a leaf queue)
+ */
+ private FSLeafQueue createLeafQueue(String name) {
+ List<String> newQueueNames = new ArrayList<String>();
+ newQueueNames.add(name);
+ int sepIndex = name.length();
+ FSParentQueue parent = null;
+
+ // Move up the queue tree until we reach one that exists.
+ while (sepIndex != -1) {
+ sepIndex = name.lastIndexOf('.', sepIndex-1);
+ FSQueue queue;
+ String curName = null;
+ curName = name.substring(0, sepIndex);
+ queue = queues.get(curName);
+
+ if (queue == null) {
+ newQueueNames.add(curName);
+ } else {
+ if (queue instanceof FSParentQueue) {
+ parent = (FSParentQueue)queue;
+ break;
+ } else {
+ return null;
+ }
}
- return queue;
+ }
+
+ // At this point, parent refers to the deepest existing parent of the
+ // queue to create.
+ // Now that we know everything worked out, make all the queues
+ // and add them to the map.
+ FSLeafQueue leafQueue = null;
+ for (int i = newQueueNames.size()-1; i >= 0; i--) {
+ String queueName = newQueueNames.get(i);
+ if (i == 0) {
+ // First name added was the leaf queue
+ leafQueue = new FSLeafQueue(name, this, scheduler, parent);
+ parent.addChildQueue(leafQueue);
+ queues.put(leafQueue.getName(), leafQueue);
+ leafQueues.add(leafQueue);
+ } else {
+ FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent);
+ parent.addChildQueue(newParent);
+ queues.put(newParent.getName(), newParent);
+ parent = newParent;
+ }
+ }
+
+ return leafQueue;
+ }
+
+ /**
+ * Gets a queue by name.
+ */
+ public FSQueue getQueue(String name) {
+ if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
+ name = ROOT_QUEUE + "." + name;
+ }
+ synchronized (queues) {
+ return queues.get(name);
}
}
@@ -136,8 +235,8 @@ public class QueueManager {
/**
* Get the queue for a given AppSchedulable.
*/
- public FSQueue getQueueForApp(AppSchedulable app) {
- return getQueue(app.getApp().getQueueName());
+ public FSLeafQueue getQueueForApp(AppSchedulable app) {
+ return getLeafQueue(app.getApp().getQueueName());
}
/**
@@ -237,54 +336,9 @@ public class QueueManager {
Element element = (Element)node;
if ("queue".equals(element.getTagName()) ||
"pool".equals(element.getTagName())) {
- String queueName = element.getAttribute("name");
- Map<QueueACL, AccessControlList> acls =
- new HashMap<QueueACL, AccessControlList>();
- queueNamesInAllocFile.add(queueName);
- NodeList fields = element.getChildNodes();
- for (int j = 0; j < fields.getLength(); j++) {
- Node fieldNode = fields.item(j);
- if (!(fieldNode instanceof Element))
- continue;
- Element field = (Element) fieldNode;
- if ("minResources".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- minQueueResources.put(queueName, Resources.createResource(val));
- } else if ("maxResources".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- maxQueueResources.put(queueName, Resources.createResource(val));
- } else if ("maxRunningApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- queueMaxApps.put(queueName, val);
- } else if ("weight".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- double val = Double.parseDouble(text);
- queueWeights.put(queueName, val);
- } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- long val = Long.parseLong(text) * 1000L;
- minSharePreemptionTimeouts.put(queueName, val);
- } else if ("schedulingMode".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- queueModes.put(queueName, parseSchedulingMode(text));
- } else if ("aclSubmitApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
- } else if ("aclAdministerApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
- }
- }
- queueAcls.put(queueName, acls);
- if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
- && Resources.lessThan(maxQueueResources.get(queueName),
- minQueueResources.get(queueName))) {
- LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
- queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
- }
+ loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
+ userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+ queueAcls, queueNamesInAllocFile);
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
NodeList fields = element.getChildNodes();
@@ -331,7 +385,7 @@ public class QueueManager {
queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
for (String name: queueNamesInAllocFile) {
- FSQueue queue = getQueue(name);
+ FSLeafQueue queue = getLeafQueue(name);
if (queueModes.containsKey(name)) {
queue.setSchedulingMode(queueModes.get(name));
} else {
@@ -340,6 +394,75 @@ public class QueueManager {
}
}
}
+
+ /**
+ * Loads a queue from a queue element in the configuration file
+ */
+ private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
+ Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
+ Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
+ Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts,
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
+ throws AllocationConfigurationException {
+ String queueName = parentName + "." + element.getAttribute("name");
+ Map<QueueACL, AccessControlList> acls =
+ new HashMap<QueueACL, AccessControlList>();
+ NodeList fields = element.getChildNodes();
+ boolean isLeaf = true;
+
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element))
+ continue;
+ Element field = (Element) fieldNode;
+ if ("minResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ minQueueResources.put(queueName, Resources.createResource(val));
+ } else if ("maxResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ maxQueueResources.put(queueName, Resources.createResource(val));
+ } else if ("maxRunningApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ queueMaxApps.put(queueName, val);
+ } else if ("weight".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ double val = Double.parseDouble(text);
+ queueWeights.put(queueName, val);
+ } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ minSharePreemptionTimeouts.put(queueName, val);
+ } else if ("schedulingMode".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ queueModes.put(queueName, parseSchedulingMode(text));
+ } else if ("aclSubmitApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
+ } else if ("aclAdministerApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
+ } else if ("queue".endsWith(field.getTagName()) ||
+ "pool".equals(field.getTagName())) {
+ loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps,
+ userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+ queueAcls, queueNamesInAllocFile);
+ isLeaf = false;
+ }
+ }
+ if (isLeaf) {
+ queueNamesInAllocFile.add(queueName);
+ }
+ queueAcls.put(queueName, acls);
+ if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
+ && Resources.lessThan(maxQueueResources.get(queueName),
+ minQueueResources.get(queueName))) {
+ LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
+ queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
+ }
+ }
private SchedulingMode parseSchedulingMode(String text)
throws AllocationConfigurationException {
@@ -384,9 +507,9 @@ public class QueueManager {
/**
* Get a collection of all queues
*/
- public Collection<FSQueue> getQueues() {
+ public Collection<FSLeafQueue> getLeafQueues() {
synchronized (queues) {
- return new ArrayList<FSQueue>(queues.values());
+ return leafQueues;
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Fri Nov 30 19:52:49 2012
@@ -92,12 +92,6 @@ abstract class Schedulable {
public abstract void updateDemand();
/**
- * Distribute the fair share assigned to this Schedulable among its
- * children (used in queues where the internal scheduler is fair sharing).
- */
- public abstract void redistributeShare();
-
- /**
* Assign a container on this node if possible, and return the amount of
* resources assigned. If {@code reserved} is true, it means a reservation
* already exists on this node, and the schedulable should fulfill that
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Nov 30 19:52:49 2012
@@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -292,7 +292,7 @@ public class FifoScheduler implements Re
// TODO: Fix store
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
- this.rmContext, null);
+ this.rmContext);
applications.put(appAttemptId, schedulerApp);
metrics.submitApp(user, appAttemptId.getAttemptId());
LOG.info("Application Submission: " + appAttemptId.getApplicationId() +
@@ -763,13 +763,7 @@ public class FifoScheduler implements Re
@Override
public void recover(RMState state) {
- // TODO fix recovery
-// for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
-// ApplicationId appId = entry.getKey();
-// ApplicationInfo appInfo = entry.getValue();
-// SchedulerApp app = applications.get(appId);
-// app.allocate(appInfo.getContainers());
-// }
+ // NOT IMPLEMENTED
}
@Override
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java Fri Nov 30 19:52:49 2012
@@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
public class FairSchedulerInfo {
@@ -32,9 +32,9 @@ public class FairSchedulerInfo {
public FairSchedulerInfo(FairScheduler fs) {
scheduler = fs;
- Collection<FSQueue> queues = fs.getQueueManager().getQueues();
+ Collection<FSLeafQueue> queues = fs.getQueueManager().getLeafQueues();
queueInfos = new ArrayList<FairSchedulerQueueInfo>();
- for (FSQueue queue : queues) {
+ for (FSLeafQueue queue : queues) {
queueInfos.add(new FairSchedulerQueueInfo(queue, fs));
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java Fri Nov 30 19:52:49 2012
@@ -22,9 +22,8 @@ import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueSchedulable;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
@@ -49,17 +48,16 @@ public class FairSchedulerQueueInfo {
private String queueName;
- public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
- Collection<FSSchedulerApp> apps = queue.getApplications();
- for (FSSchedulerApp app : apps) {
- if (app.isPending()) {
+ public FairSchedulerQueueInfo(FSLeafQueue queue, FairScheduler scheduler) {
+ Collection<AppSchedulable> apps = queue.getAppSchedulables();
+ for (AppSchedulable app : apps) {
+ if (app.getApp().isPending()) {
numPendingApps++;
} else {
numActiveApps++;
}
}
- FSQueueSchedulable schedulable = queue.getQueueSchedulable();
QueueManager manager = scheduler.getQueueManager();
queueName = queue.getName();
@@ -67,11 +65,11 @@ public class FairSchedulerQueueInfo {
Resource clusterMax = scheduler.getClusterCapacity();
clusterMaxMem = clusterMax.getMemory();
- usedResources = schedulable.getResourceUsage();
+ usedResources = queue.getResourceUsage();
fractionUsed = (float)usedResources.getMemory() / clusterMaxMem;
- fairShare = schedulable.getFairShare().getMemory();
- minResources = schedulable.getMinShare();
+ fairShare = queue.getFairShare().getMemory();
+ minResources = queue.getMinShare();
minShare = minResources.getMemory();
maxResources = scheduler.getQueueManager().getMaxResources(queueName);
if (maxResources.getMemory() > clusterMaxMem) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Fri Nov 30 19:52:49 2012
@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -93,7 +92,7 @@ public class TestAppManager{
rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
rmDispatcher);
- return new RMContextImpl(new MemStore(), rmDispatcher,
+ return new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null) {
@Override
@@ -366,7 +365,6 @@ public class TestAppManager{
YarnConfiguration.DEFAULT_QUEUE_NAME,
app.getQueue());
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
- Assert.assertNotNull("app store is null", app.getApplicationStore());
// wait for event to be processed
int timeoutSecs = 0;
@@ -413,7 +411,6 @@ public class TestAppManager{
Assert.assertEquals("app name doesn't match", "testApp1", app.getName());
Assert.assertEquals("app queue doesn't match", "testQueue", app.getQueue());
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
- Assert.assertNotNull("app store is null", app.getApplicationStore());
// wait for event to be processed
int timeoutSecs = 0;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java Fri Nov 30 19:52:49 2012
@@ -50,7 +50,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.service.Service.STATE;
@@ -85,7 +85,7 @@ public class TestApplicationACLs {
@BeforeClass
public static void setup() throws InterruptedException, IOException {
- Store store = StoreFactory.getStore(conf);
+ RMStateStore store = StoreFactory.getStore(conf);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
AccessControlList adminACL = new AccessControlList("");
adminACL.addGroup(SUPER_GROUP);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Fri Nov 30 19:52:49 2012
@@ -182,7 +182,7 @@ public class TestClientRMService {
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
ApplicationId applicationId3, YarnConfiguration config, String queueName) {
return new RMAppImpl(applicationId3, rmContext, config, null, null,
- queueName, null, null, null, yarnScheduler, null, System
+ queueName, null, null, yarnScheduler, null, System
.currentTimeMillis());
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Fri Nov 30 19:52:49 2012
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@@ -79,7 +78,7 @@ public class TestRMNodeTransitions {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
- new RMContextImpl(new MemStore(), rmDispatcher, null, null, null,
+ new RMContextImpl(rmDispatcher, null, null, null,
mock(DelegationTokenRenewer.class), null, null, null);
scheduler = mock(YarnScheduler.class);
doAnswer(
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Fri Nov 30 19:52:49 2012
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -47,7 +47,7 @@ public class TestResourceManager {
@Before
public void setUp() throws Exception {
Configuration conf = new YarnConfiguration();
- Store store = StoreFactory.getStore(conf);
+ RMStateStore store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store);
resourceManager.init(conf);
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Fri Nov 30 19:52:49 2012
@@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -194,10 +193,6 @@ public abstract class MockAsm extends Mo
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public ApplicationStore getApplicationStore() {
- throw new UnsupportedOperationException("Not supported yet.");
- }
- @Override
public float getProgress() {
throw new UnsupportedOperationException("Not supported yet.");
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java Fri Nov 30 19:52:49 2012
@@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Fri Nov 30 19:52:49 2012
@@ -53,9 +53,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java Fri Nov 30 19:52:49 2012
@@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.After;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java Fri Nov 30 19:52:49 2012
@@ -36,8 +36,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.Before;
import org.junit.Test;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java Fri Nov 30 19:52:49 2012
@@ -43,9 +43,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java Fri Nov 30 19:52:49 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -70,7 +69,7 @@ public class TestNMExpiry {
Configuration conf = new Configuration();
// Dispatcher that processes events inline
Dispatcher dispatcher = new InlineDispatcher();
- RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
+ RMContext context = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null);
dispatcher.register(SchedulerEventType.class,
new InlineDispatcher.EmptyEventHandler());
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java Fri Nov 30 19:52:49 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -66,7 +65,7 @@ public class TestRMNMRPCResponseId {
}
});
RMContext context =
- new RMContextImpl(new MemStore(), dispatcher, null, null, null, null,
+ new RMContextImpl(dispatcher, null, null, null, null,
null, null, null);
dispatcher.register(RMNodeEventType.class,
new ResourceManager.NodeEventDispatcher(context));
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Fri Nov 30 19:52:49 2012
@@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.MockApps;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -138,11 +137,6 @@ public class MockRMApp implements RMApp
}
@Override
- public ApplicationStore getApplicationStore() {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
public long getFinishTime() {
return finish;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Fri Nov 30 19:52:49 2012
@@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -140,7 +138,7 @@ public class TestRMAppTransitions {
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
this.rmContext =
- new RMContextImpl(new MemStore(), rmDispatcher,
+ new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new ApplicationTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
@@ -171,7 +169,6 @@ public class TestRMAppTransitions {
// ensure max retries set to known value
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, maxRetries);
String clientTokenStr = "bogusstring";
- ApplicationStore appStore = mock(ApplicationStore.class);
YarnScheduler scheduler = mock(YarnScheduler.class);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
@@ -183,7 +180,7 @@ public class TestRMAppTransitions {
RMApp application = new RMAppImpl(applicationId, rmContext,
conf, name, user,
queue, submissionContext, clientTokenStr,
- appStore, scheduler,
+ scheduler,
masterService, System.currentTimeMillis());
testAppStartState(applicationId, user, name, queue, application);