You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [16/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Aug 19 23:49:39 2014
@@ -20,14 +20,11 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -39,7 +36,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -53,9 +49,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
@@ -76,10 +69,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -99,6 +90,7 @@ import org.apache.hadoop.yarn.util.resou
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
/**
* A scheduler that schedules resources between a set of queues. The scheduler
@@ -122,11 +114,10 @@ import com.google.common.annotations.Vis
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
-public class FairScheduler extends AbstractYarnScheduler {
- private boolean initialized;
+public class FairScheduler extends
+ AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> {
private FairSchedulerConfiguration conf;
- private Resource minimumAllocation;
- private Resource maximumAllocation;
+
private Resource incrAllocation;
private QueueManager queueMgr;
private Clock clock;
@@ -142,25 +133,32 @@ public class FairScheduler extends Abstr
public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
// How often fair shares are re-calculated (ms)
- protected long UPDATE_INTERVAL = 500;
+ protected long updateInterval;
+ private final int UPDATE_DEBUG_FREQUENCY = 5;
+ private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
+
+ @VisibleForTesting
+ Thread updateThread;
+
+ @VisibleForTesting
+ Thread schedulingThread;
+ // timeout to join when we stop this service
+ protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
// Aggregate metrics
FSQueueMetrics rootMetrics;
+ FSOpDurations fsOpDurations;
// Time when we last updated preemption vars
protected long lastPreemptionUpdateTime;
// Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
- // Nodes in the cluster, indexed by NodeId
- private Map<NodeId, FSSchedulerNode> nodes =
- new ConcurrentHashMap<NodeId, FSSchedulerNode>();
-
- // Aggregate capacity of the cluster
- private Resource clusterCapacity =
- RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
+ // Preemption related variables
+ protected boolean preemptionEnabled;
+ protected float preemptionUtilizationThreshold;
- // How often tasks are preempted
+ // How often tasks are preempted
protected long preemptionInterval;
// ms to wait before force killing stuff (must be longer than a couple
@@ -170,7 +168,6 @@ public class FairScheduler extends Abstr
// Containers whose AMs have been warned that they will be preempted soon.
private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
- protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
@@ -194,6 +191,7 @@ public class FairScheduler extends Abstr
AllocationConfiguration allocConf;
public FairScheduler() {
+ super(FairScheduler.class.getName());
clock = new SystemClock();
allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this);
@@ -246,34 +244,25 @@ public class FairScheduler extends Abstr
return queueMgr;
}
- @Override
- public RMContainer getRMContainer(ContainerId containerId) {
- FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
- return (attempt == null) ? null : attempt.getRMContainer(containerId);
- }
-
- private FSSchedulerApp getCurrentAttemptForContainer(
- ContainerId containerId) {
- SchedulerApplication app =
- applications.get(containerId.getApplicationAttemptId()
- .getApplicationId());
- if (app != null) {
- return (FSSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
-
/**
- * A runnable which calls {@link FairScheduler#update()} every
- * <code>UPDATE_INTERVAL</code> milliseconds.
+ * Thread which calls {@link FairScheduler#update()} every
+ * <code>updateInterval</code> milliseconds.
*/
- private class UpdateThread implements Runnable {
+ private class UpdateThread extends Thread {
+
+ @Override
public void run() {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
- Thread.sleep(UPDATE_INTERVAL);
+ Thread.sleep(updateInterval);
+ long start = getClock().getTime();
update();
preemptTasksIfNecessary();
+ long duration = getClock().getTime() - start;
+ fsOpDurations.addUpdateThreadRunDuration(duration);
+ } catch (InterruptedException ie) {
+ LOG.warn("Update thread interrupted. Exiting.");
+ return;
} catch (Exception e) {
LOG.error("Exception in fair scheduler UpdateThread", e);
}
@@ -282,11 +271,32 @@ public class FairScheduler extends Abstr
}
/**
+ * Thread which attempts scheduling resources continuously,
+ * asynchronous to the node heartbeats.
+ */
+ private class ContinuousSchedulingThread extends Thread {
+
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ continuousSchedulingAttempt();
+ Thread.sleep(getContinuousSchedulingSleepMs());
+ } catch (InterruptedException e) {
+ LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
+ return;
+ }
+ }
+ }
+ }
+
+ /**
* Recompute the internal variables used by the scheduler - per-job weights,
* fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job.
*/
protected synchronized void update() {
+ long start = getClock().getTime();
updatePreemptionVariables(); // Determine if any queues merit preemption
FSQueue rootQueue = queueMgr.getRootQueue();
@@ -294,10 +304,25 @@ public class FairScheduler extends Abstr
// Recursively update demands for all queues
rootQueue.updateDemand();
- rootQueue.setFairShare(clusterCapacity);
+ rootQueue.setFairShare(clusterResource);
// Recursively compute fair shares for all queues
// and update metrics
rootQueue.recomputeShares();
+
+ if (LOG.isDebugEnabled()) {
+ if (--updatesToSkipForDebug < 0) {
+ updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
+ LOG.debug("Cluster Capacity: " + clusterResource +
+ " Allocations: " + rootMetrics.getAllocatedResources() +
+ " Availability: " + Resource.newInstance(
+ rootMetrics.getAvailableMB(),
+ rootMetrics.getAvailableVirtualCores()) +
+ " Demand: " + rootQueue.getDemand());
+ }
+ }
+
+ long duration = getClock().getTime() - start;
+ fsOpDurations.addUpdateCallDuration(duration);
}
/**
@@ -306,7 +331,7 @@ public class FairScheduler extends Abstr
* for each type of task.
*/
private void updatePreemptionVariables() {
- long now = clock.getTime();
+ long now = getClock().getTime();
lastPreemptionUpdateTime = now;
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
if (!isStarvedForMinShare(sched)) {
@@ -322,9 +347,9 @@ public class FairScheduler extends Abstr
* Is a queue below its min share for the given task type?
*/
boolean isStarvedForMinShare(FSLeafQueue sched) {
- Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+ Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getMinShare(), sched.getDemand());
- return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
+ return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), desiredShare);
}
@@ -333,9 +358,10 @@ public class FairScheduler extends Abstr
* defined as being below half its fair share.
*/
boolean isStarvedForFairShare(FSLeafQueue sched) {
- Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+ Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR,
+ clusterResource,
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
- return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
+ return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), desiredFairShare);
}
@@ -347,111 +373,98 @@ public class FairScheduler extends Abstr
* and then select the right ones using preemptTasks.
*/
protected synchronized void preemptTasksIfNecessary() {
- if (!preemptionEnabled) {
+ if (!shouldAttemptPreemption()) {
return;
}
- long curTime = clock.getTime();
+ long curTime = getClock().getTime();
if (curTime - lastPreemptCheckTime < preemptionInterval) {
return;
}
lastPreemptCheckTime = curTime;
- Resource resToPreempt = Resources.none();
-
+ Resource resToPreempt = Resources.clone(Resources.none());
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
- resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
+ Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
}
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt,
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
Resources.none())) {
- preemptResources(queueMgr.getLeafQueues(), resToPreempt);
+ preemptResources(resToPreempt);
}
}
/**
- * Preempt a quantity of resources from a list of QueueSchedulables. The
- * policy for this is to pick apps from queues that are over their fair share,
- * but make sure that no queue is placed below its fair share in the process.
- * We further prioritize preemption by choosing containers with lowest
- * priority to preempt.
+ * Preempt a quantity of resources. Each round, we start from the root queue,
+ * level-by-level, until choosing a candidate application.
+ * The policy for prioritizing preemption for each queue depends on its
+ * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is
+ * most over its fair share; (2) FIFO, choose the childSchedulable that is
+ * latest launched.
+ * Inside each application, we further prioritize preemption by choosing
+ * containers with lowest priority to preempt.
+ * We make sure that no queue is placed below its fair share in the process.
*/
- protected void preemptResources(Collection<FSLeafQueue> scheds,
- Resource toPreempt) {
- if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
+ protected void preemptResources(Resource toPreempt) {
+ long start = getClock().getTime();
+ if (Resources.equals(toPreempt, Resources.none())) {
return;
}
- Map<RMContainer, FSSchedulerApp> apps =
- new HashMap<RMContainer, FSSchedulerApp>();
- Map<RMContainer, FSLeafQueue> queues =
- new HashMap<RMContainer, FSLeafQueue>();
-
- // Collect running containers from over-scheduled queues
- List<RMContainer> runningContainers = new ArrayList<RMContainer>();
- for (FSLeafQueue sched : scheds) {
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
- sched.getResourceUsage(), sched.getFairShare())) {
- for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
- for (RMContainer c : as.getApp().getLiveContainers()) {
- runningContainers.add(c);
- apps.put(c, as.getApp());
- queues.put(c, sched);
- }
- }
- }
- }
-
- // Sort containers into reverse order of priority
- Collections.sort(runningContainers, new Comparator<RMContainer>() {
- public int compare(RMContainer c1, RMContainer c2) {
- int ret = c1.getContainer().getPriority().compareTo(
- c2.getContainer().getPriority());
- if (ret == 0) {
- return c2.getContainerId().compareTo(c1.getContainerId());
- }
- return ret;
- }
- });
-
// Scan down the list of containers we've already warned and kill them
// if we need to. Remove any containers from the list that we don't need
// or that are no longer running.
Iterator<RMContainer> warnedIter = warnedContainers.iterator();
- Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next();
- if (container.getState() == RMContainerState.RUNNING &&
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ if ((container.getState() == RMContainerState.RUNNING ||
+ container.getState() == RMContainerState.ALLOCATED) &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
- warnOrKillContainer(container, apps.get(container), queues.get(container));
- preemptedThisRound.add(container);
+ warnOrKillContainer(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
} else {
warnedIter.remove();
}
}
- // Scan down the rest of the containers until we've preempted enough, making
- // sure we don't preempt too many from any queue
- Iterator<RMContainer> runningIter = runningContainers.iterator();
- while (runningIter.hasNext() &&
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
- toPreempt, Resources.none())) {
- RMContainer container = runningIter.next();
- FSLeafQueue sched = queues.get(container);
- if (!preemptedThisRound.contains(container) &&
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
- sched.getResourceUsage(), sched.getFairShare())) {
- warnOrKillContainer(container, apps.get(container), sched);
-
- warnedContainers.add(container);
- Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+ try {
+ // Reset preemptedResource for each app
+ for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
+ for (FSAppAttempt app : queue.getRunnableAppSchedulables()) {
+ app.resetPreemptedResources();
+ }
+ }
+
+ while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
+ toPreempt, Resources.none())) {
+ RMContainer container =
+ getQueueManager().getRootQueue().preemptContainer();
+ if (container == null) {
+ break;
+ } else {
+ warnOrKillContainer(container);
+ warnedContainers.add(container);
+ Resources.subtractFrom(
+ toPreempt, container.getContainer().getResource());
+ }
+ }
+ } finally {
+ // Clear preemptedResources for each app
+ for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
+ for (FSAppAttempt app : queue.getRunnableAppSchedulables()) {
+ app.clearPreemptedResources();
+ }
}
}
+
+ long duration = getClock().getTime() - start;
+ fsOpDurations.addPreemptCallDuration(duration);
}
- private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
- FSLeafQueue queue) {
+ protected void warnOrKillContainer(RMContainer container) {
+ ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
+ FSAppAttempt app = getSchedulerApp(appAttemptId);
+ FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
") from queue " + queue.getName());
@@ -461,21 +474,22 @@ public class FairScheduler extends Abstr
if (time != null) {
// if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
// proceed with kill
- if (time + waitTimeBeforeKill < clock.getTime()) {
+ if (time + waitTimeBeforeKill < getClock().getTime()) {
ContainerStatus status =
SchedulerUtils.createPreemptedContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
+ recoverResourceRequestForContainer(container);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL);
LOG.info("Killing container" + container +
" (after waiting for premption for " +
- (clock.getTime() - time) + "ms)");
+ (getClock().getTime() - time) + "ms)");
}
} else {
- // track the request in the FSSchedulerApp itself
- app.addPreemption(container, clock.getTime());
+ // track the request in the FSAppAttempt itself
+ app.addPreemption(container, getClock().getTime());
}
}
@@ -496,20 +510,20 @@ public class FairScheduler extends Abstr
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
- Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+ Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getMinShare(), sched.getDemand());
- resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+ resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
- Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+ Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getFairShare(), sched.getDemand());
- resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+ resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
- Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+ Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
resDueToMinShare, resDueToFairShare);
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
resToPreempt, Resources.none())) {
String message = "Should preempt " + resToPreempt + " res for queue "
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
@@ -519,12 +533,13 @@ public class FairScheduler extends Abstr
return resToPreempt;
}
- public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+ public synchronized RMContainerTokenSecretManager
+ getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
// synchronized for sizeBasedWeight
- public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
+ public synchronized ResourceWeights getAppWeight(FSAppAttempt app) {
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on current memory demand
@@ -540,18 +555,12 @@ public class FairScheduler extends Abstr
return resourceWeights;
}
- @Override
- public Resource getMinimumResourceCapability() {
- return minimumAllocation;
- }
-
public Resource getIncrementResourceCapability() {
return incrAllocation;
}
- @Override
- public Resource getMaximumResourceCapability() {
- return maximumAllocation;
+ private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) {
+ return nodes.get(nodeId);
}
public double getNodeLocalityThreshold() {
@@ -578,10 +587,6 @@ public class FairScheduler extends Abstr
return continuousSchedulingSleepMs;
}
- public Resource getClusterCapacity() {
- return clusterCapacity;
- }
-
public synchronized Clock getClock() {
return clock;
}
@@ -600,7 +605,7 @@ public class FairScheduler extends Abstr
* configured limits, but the app will not be marked as runnable.
*/
protected synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user) {
+ String queueName, String user, boolean isAppRecovering) {
if (queueName == null || queueName.isEmpty()) {
String message = "Reject application " + applicationId +
" submitted by user " + user + " with an empty queue name.";
@@ -629,16 +634,22 @@ public class FairScheduler extends Abstr
return;
}
- SchedulerApplication application =
- new SchedulerApplication(queue, user);
+ SchedulerApplication<FSAppAttempt> application =
+ new SchedulerApplication<FSAppAttempt>(queue, user);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName + ", currently num of applications: "
+ applications.size());
- rmContext.getDispatcher().getEventHandler()
+ if (isAppRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+ }
+ } else {
+ rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
}
/**
@@ -646,19 +657,20 @@ public class FairScheduler extends Abstr
*/
protected synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- boolean transferStateFromPreviousAttempt) {
- SchedulerApplication application =
+ boolean transferStateFromPreviousAttempt,
+ boolean isAttemptRecovering) {
+ SchedulerApplication<FSAppAttempt> application =
applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser();
FSLeafQueue queue = (FSLeafQueue) application.getQueue();
- FSSchedulerApp attempt =
- new FSSchedulerApp(applicationAttemptId, user,
+ FSAppAttempt attempt =
+ new FSAppAttempt(this, applicationAttemptId, user,
queue, new ActiveUsersManager(getRootQueueMetrics()),
rmContext);
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(application
- .getCurrentAppAttempt());
+ .getCurrentAppAttempt());
}
application.setCurrentAppAttempt(attempt);
@@ -674,9 +686,17 @@ public class FairScheduler extends Abstr
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user: " + user);
- rmContext.getDispatcher().getEventHandler().handle(
+
+ if (isAttemptRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationAttemptId
+ + " is recovering. Skipping notifying ATTEMPT_ADDED");
+ }
+ } else {
+ rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
+ }
}
/**
@@ -720,7 +740,8 @@ public class FairScheduler extends Abstr
private synchronized void removeApplication(ApplicationId applicationId,
RMAppState finalState) {
- SchedulerApplication application = applications.get(applicationId);
+ SchedulerApplication<FSAppAttempt> application =
+ applications.get(applicationId);
if (application == null){
LOG.warn("Couldn't find application " + applicationId);
return;
@@ -734,9 +755,9 @@ public class FairScheduler extends Abstr
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
- SchedulerApplication application =
+ SchedulerApplication<FSAppAttempt> application =
applications.get(applicationAttemptId.getApplicationId());
- FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
+ FSAppAttempt attempt = getSchedulerApp(applicationAttemptId);
if (attempt == null || application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
@@ -787,7 +808,8 @@ public class FairScheduler extends Abstr
/**
* Clean up a completed container.
*/
- private synchronized void completedContainer(RMContainer rmContainer,
+ @Override
+ protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
@@ -797,7 +819,7 @@ public class FairScheduler extends Abstr
Container container = rmContainer.getContainer();
// Get the application for the finished container
- FSSchedulerApp application =
+ FSAppAttempt application =
getCurrentAttemptForContainer(container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId();
@@ -809,11 +831,10 @@ public class FairScheduler extends Abstr
}
// Get the node on which the container was allocated
- FSSchedulerNode node = nodes.get(container.getNodeId());
+ FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
if (rmContainer.getState() == RMContainerState.RESERVED) {
- application.unreserve(node, rmContainer.getReservedPriority());
- node.unreserveResource(application);
+ application.unreserve(rmContainer.getReservedPriority(), node);
} else {
application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(container);
@@ -827,20 +848,20 @@ public class FairScheduler extends Abstr
private synchronized void addNode(RMNode node) {
nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
- Resources.addTo(clusterCapacity, node.getTotalCapability());
+ Resources.addTo(clusterResource, node.getTotalCapability());
updateRootQueueMetrics();
LOG.info("Added node " + node.getNodeAddress() +
- " cluster capacity: " + clusterCapacity);
+ " cluster capacity: " + clusterResource);
}
private synchronized void removeNode(RMNode rmNode) {
- FSSchedulerNode node = nodes.get(rmNode.getNodeID());
+ FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID());
// This can occur when an UNHEALTHY node reconnects
if (node == null) {
return;
}
- Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
+ Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
updateRootQueueMetrics();
// Remove running containers
@@ -865,7 +886,7 @@ public class FairScheduler extends Abstr
nodes.remove(rmNode.getNodeID());
LOG.info("Removed node " + rmNode.getNodeAddress() +
- " cluster capacity: " + clusterCapacity);
+ " cluster capacity: " + clusterResource);
}
@Override
@@ -873,7 +894,7 @@ public class FairScheduler extends Abstr
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Make sure this application exists
- FSSchedulerApp application = getSchedulerApp(appAttemptId);
+ FSAppAttempt application = getSchedulerApp(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
@@ -882,25 +903,17 @@ public class FairScheduler extends Abstr
// Sanity check
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
- clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
+ clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
- // Release containers
- for (ContainerId releasedContainerId : release) {
- RMContainer rmContainer = getRMContainer(releasedContainerId);
- if (rmContainer == null) {
- RMAuditLogger.logFailure(application.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "FairScheduler",
- "Trying to release container not owned by app or with invalid id",
- application.getApplicationId(), releasedContainerId);
- }
- completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- releasedContainerId,
- SchedulerUtils.RELEASED_CONTAINER),
- RMContainerEventType.RELEASED);
+ // Set amResource for this app
+ if (!application.getUnmanagedAM() && ask.size() == 1
+ && application.getLiveContainers().isEmpty()) {
+ application.setAMResource(ask.get(0).getCapability());
}
+ // Release containers
+ releaseContainers(release, application);
+
synchronized (application) {
if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) {
@@ -913,14 +926,14 @@ public class FairScheduler extends Abstr
// Update application requests
application.updateResourceRequests(ask);
- LOG.debug("allocate: post-update");
application.showRequests();
}
if (LOG.isDebugEnabled()) {
- LOG.debug("allocate:" +
+ LOG.debug("allocate: post-update" +
" applicationAttemptId=" + appAttemptId +
- " #ask=" + ask.size());
+ " #ask=" + ask.size() +
+ " reservation= " + application.getCurrentReservation());
LOG.debug("Preempting " + application.getPreemptionContainers().size()
+ " container(s)");
@@ -941,33 +954,18 @@ public class FairScheduler extends Abstr
}
/**
- * Process a container which has launched on a node, as reported by the node.
- */
- private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
- // Get the application for the finished container
- FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
- return;
- }
-
- application.containerLaunchedOnNode(containerId, node.getNodeID());
- }
-
- /**
* Process a heartbeat update from a node.
*/
private synchronized void nodeUpdate(RMNode nm) {
+ long start = getClock().getTime();
if (LOG.isDebugEnabled()) {
- LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+ LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
}
eventLog.log("HEARTBEAT", nm.getHostName());
- FSSchedulerNode node = nodes.get(nm.getNodeID());
+ FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
// Update resource if any change
- SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
+ SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
@@ -996,41 +994,38 @@ public class FairScheduler extends Abstr
} else {
attemptScheduling(node);
}
+
+ long duration = getClock().getTime() - start;
+ fsOpDurations.addNodeUpdateDuration(duration);
}
- private void continuousScheduling() {
- while (true) {
- List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
- // Sort the nodes by space available on them, so that we offer
- // containers on emptier nodes first, facilitating an even spread. This
- // requires holding the scheduler lock, so that the space available on a
- // node doesn't change during the sort.
- synchronized (this) {
- Collections.sort(nodeIdList, nodeAvailableResourceComparator);
- }
-
- // iterate all nodes
- for (NodeId nodeId : nodeIdList) {
- if (nodes.containsKey(nodeId)) {
- FSSchedulerNode node = nodes.get(nodeId);
- try {
- if (Resources.fitsIn(minimumAllocation,
- node.getAvailableResource())) {
- attemptScheduling(node);
- }
- } catch (Throwable ex) {
- LOG.warn("Error while attempting scheduling for node " + node +
- ": " + ex.toString(), ex);
- }
- }
- }
+ void continuousSchedulingAttempt() throws InterruptedException {
+ long start = getClock().getTime();
+ List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
+ // Sort the nodes by space available on them, so that we offer
+ // containers on emptier nodes first, facilitating an even spread. This
+ // requires holding the scheduler lock, so that the space available on a
+ // node doesn't change during the sort.
+ synchronized (this) {
+ Collections.sort(nodeIdList, nodeAvailableResourceComparator);
+ }
+
+ // iterate all nodes
+ for (NodeId nodeId : nodeIdList) {
+ FSSchedulerNode node = getFSSchedulerNode(nodeId);
try {
- Thread.sleep(getContinuousSchedulingSleepMs());
- } catch (InterruptedException e) {
- LOG.warn("Error while doing sleep in continuous scheduling: " +
- e.toString(), e);
+ if (node != null && Resources.fitsIn(minimumAllocation,
+ node.getAvailableResource())) {
+ attemptScheduling(node);
+ }
+ } catch (Throwable ex) {
+ LOG.error("Error while attempting scheduling for node " + node +
+ ": " + ex.toString(), ex);
}
}
+
+ long duration = getClock().getTime() - start;
+ fsOpDurations.addContinuousSchedulingRunDuration(duration);
}
/** Sort nodes by available resource */
@@ -1038,7 +1033,13 @@ public class FairScheduler extends Abstr
@Override
public int compare(NodeId n1, NodeId n2) {
- return RESOURCE_CALCULATOR.compare(clusterCapacity,
+ if (!nodes.containsKey(n1)) {
+ return 1;
+ }
+ if (!nodes.containsKey(n2)) {
+ return -1;
+ }
+ return RESOURCE_CALCULATOR.compare(clusterResource,
nodes.get(n2).getAvailableResource(),
nodes.get(n1).getAvailableResource());
}
@@ -1049,13 +1050,13 @@ public class FairScheduler extends Abstr
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
- AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
+ FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
Priority reservedPriority = node.getReservedContainer().getReservedPriority();
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application "
- + reservedAppSchedulable.getApp().getApplicationAttemptId()
+ + reservedAppSchedulable.getApplicationAttemptId()
+ " on node " + node);
reservedAppSchedulable.unreserve(reservedPriority, node);
reservedAppSchedulable = null;
@@ -1063,7 +1064,7 @@ public class FairScheduler extends Abstr
// Reservation exists; try to fulfill the reservation
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to fulfill reservation for application "
- + reservedAppSchedulable.getApp().getApplicationAttemptId()
+ + reservedAppSchedulable.getApplicationAttemptId()
+ " on node: " + node);
}
@@ -1075,9 +1076,8 @@ public class FairScheduler extends Abstr
int assignedContainers = 0;
while (node.getReservedContainer() == null) {
boolean assignedContainer = false;
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
- queueMgr.getRootQueue().assignContainer(node),
- Resources.none())) {
+ if (!queueMgr.getRootQueue().assignContainer(node).equals(
+ Resources.none())) {
assignedContainers++;
assignedContainer = true;
}
@@ -1089,45 +1089,8 @@ public class FairScheduler extends Abstr
updateRootQueueMetrics();
}
- @Override
- public SchedulerNodeReport getNodeReport(NodeId nodeId) {
- FSSchedulerNode node = nodes.get(nodeId);
- return node == null ? null : new SchedulerNodeReport(node);
- }
-
- public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
- SchedulerApplication app =
- applications.get(appAttemptId.getApplicationId());
- if (app != null) {
- return (FSSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
-
- @Override
- public SchedulerAppReport getSchedulerAppInfo(
- ApplicationAttemptId appAttemptId) {
- FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
- if (attempt == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
- }
- return null;
- }
- return new SchedulerAppReport(attempt);
- }
-
- @Override
- public ApplicationResourceUsageReport getAppResourceUsageReport(
- ApplicationAttemptId appAttemptId) {
- FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
- if (attempt == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
- }
- return null;
- }
- return attempt.getResourceUsageReport();
+ public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
+ return super.getApplicationAttempt(appAttemptId);
}
/**
@@ -1139,7 +1102,23 @@ public class FairScheduler extends Abstr
private void updateRootQueueMetrics() {
rootMetrics.setAvailableResourcesToQueue(
Resources.subtract(
- clusterCapacity, rootMetrics.getAllocatedResources()));
+ clusterResource, rootMetrics.getAllocatedResources()));
+ }
+
+ /**
+ * Check if preemption is enabled and the utilization threshold for
+ * preemption is met.
+ *
+ * @return true if preemption should be attempted, false otherwise.
+ */
+ private boolean shouldAttemptPreemption() {
+ if (preemptionEnabled) {
+ return (preemptionUtilizationThreshold < Math.max(
+ (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
+ (float) rootMetrics.getAllocatedVirtualCores() /
+ clusterResource.getVirtualCores()));
+ }
+ return false;
}
@Override
@@ -1156,6 +1135,8 @@ public class FairScheduler extends Abstr
}
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
+ recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+ nodeAddedEvent.getAddedRMNode());
break;
case NODE_REMOVED:
if (!(event instanceof NodeRemovedSchedulerEvent)) {
@@ -1177,7 +1158,8 @@ public class FairScheduler extends Abstr
}
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser());
+ appAddedEvent.getQueue(), appAddedEvent.getUser(),
+ appAddedEvent.getIsAppRecovering());
break;
case APP_REMOVED:
if (!(event instanceof AppRemovedSchedulerEvent)) {
@@ -1194,7 +1176,8 @@ public class FairScheduler extends Abstr
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+ appAttemptAddedEvent.getIsAttemptRecovering());
break;
case APP_ATTEMPT_REMOVED:
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@@ -1230,85 +1213,134 @@ public class FairScheduler extends Abstr
// NOT IMPLEMENTED
}
- @Override
- public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+ public synchronized void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ private synchronized void initScheduler(Configuration conf)
throws IOException {
- if (!initialized) {
- this.conf = new FairSchedulerConfiguration(conf);
- validateConf(this.conf);
- minimumAllocation = this.conf.getMinimumAllocation();
- maximumAllocation = this.conf.getMaximumAllocation();
- incrAllocation = this.conf.getIncrementAllocation();
- continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
- continuousSchedulingSleepMs =
- this.conf.getContinuousSchedulingSleepMs();
- nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
- rackLocalityThreshold = this.conf.getLocalityThresholdRack();
- nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
- rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
- preemptionEnabled = this.conf.getPreemptionEnabled();
- assignMultiple = this.conf.getAssignMultiple();
- maxAssign = this.conf.getMaxAssign();
- sizeBasedWeight = this.conf.getSizeBasedWeight();
- preemptionInterval = this.conf.getPreemptionInterval();
- waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
- usePortForNodeName = this.conf.getUsePortForNodeName();
-
- rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
- this.rmContext = rmContext;
- // This stores per-application scheduling information
- this.applications =
- new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
- this.eventLog = new FairSchedulerEventLog();
- eventLog.init(this.conf);
+ this.conf = new FairSchedulerConfiguration(conf);
+ validateConf(this.conf);
+ minimumAllocation = this.conf.getMinimumAllocation();
+ maximumAllocation = this.conf.getMaximumAllocation();
+ incrAllocation = this.conf.getIncrementAllocation();
+ continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+ continuousSchedulingSleepMs =
+ this.conf.getContinuousSchedulingSleepMs();
+ nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+ rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+ nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+ rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+ preemptionEnabled = this.conf.getPreemptionEnabled();
+ preemptionUtilizationThreshold =
+ this.conf.getPreemptionUtilizationThreshold();
+ assignMultiple = this.conf.getAssignMultiple();
+ maxAssign = this.conf.getMaxAssign();
+ sizeBasedWeight = this.conf.getSizeBasedWeight();
+ preemptionInterval = this.conf.getPreemptionInterval();
+ waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+ usePortForNodeName = this.conf.getUsePortForNodeName();
+
+ updateInterval = this.conf.getUpdateInterval();
+ if (updateInterval < 0) {
+ updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
+ LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
+ + " is invalid, so using default value " +
+ + FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+ + " ms instead");
+ }
+
+ rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
+ fsOpDurations = FSOpDurations.getInstance(true);
+
+ // This stores per-application scheduling information
+ this.applications = new ConcurrentHashMap<
+ ApplicationId, SchedulerApplication<FSAppAttempt>>();
+ this.eventLog = new FairSchedulerEventLog();
+ eventLog.init(this.conf);
- initialized = true;
+ allocConf = new AllocationConfiguration(conf);
+ try {
+ queueMgr.initialize(conf);
+ } catch (Exception e) {
+ throw new IOException("Failed to start FairScheduler", e);
+ }
- allocConf = new AllocationConfiguration(conf);
- try {
- queueMgr.initialize(conf);
- } catch (Exception e) {
- throw new IOException("Failed to start FairScheduler", e);
- }
+ updateThread = new UpdateThread();
+ updateThread.setName("FairSchedulerUpdateThread");
+ updateThread.setDaemon(true);
- Thread updateThread = new Thread(new UpdateThread());
- updateThread.setName("FairSchedulerUpdateThread");
- updateThread.setDaemon(true);
- updateThread.start();
+ if (continuousSchedulingEnabled) {
+ // start continuous scheduling thread
+ schedulingThread = new ContinuousSchedulingThread();
+ schedulingThread.setName("FairSchedulerContinuousScheduling");
+ schedulingThread.setDaemon(true);
+ }
- if (continuousSchedulingEnabled) {
- // start continuous scheduling thread
- Thread schedulingThread = new Thread(
- new Runnable() {
- @Override
- public void run() {
- continuousScheduling();
- }
- }
- );
- schedulingThread.setName("ContinuousScheduling");
- schedulingThread.setDaemon(true);
- schedulingThread.start();
+ allocsLoader.init(conf);
+ allocsLoader.setReloadListener(new AllocationReloadListener());
+ // If we fail to load allocations file on initialize, we want to fail
+ // immediately. After a successful load, exceptions on future reloads
+ // will just result in leaving things as they are.
+ try {
+ allocsLoader.reloadAllocations();
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize FairScheduler", e);
+ }
+ }
+
+ private synchronized void startSchedulerThreads() {
+ Preconditions.checkNotNull(updateThread, "updateThread is null");
+ Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
+ updateThread.start();
+ if (continuousSchedulingEnabled) {
+ Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
+ schedulingThread.start();
+ }
+ allocsLoader.start();
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ initScheduler(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ startSchedulerThreads();
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ synchronized (this) {
+ if (updateThread != null) {
+ updateThread.interrupt();
+ updateThread.join(THREAD_JOIN_TIMEOUT_MS);
}
-
- allocsLoader.init(conf);
- allocsLoader.setReloadListener(new AllocationReloadListener());
- // If we fail to load allocations file on initialize, we want to fail
- // immediately. After a successful load, exceptions on future reloads
- // will just result in leaving things as they are.
- try {
- allocsLoader.reloadAllocations();
- } catch (Exception e) {
- throw new IOException("Failed to initialize FairScheduler", e);
+ if (continuousSchedulingEnabled) {
+ if (schedulingThread != null) {
+ schedulingThread.interrupt();
+ schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
+ }
}
- allocsLoader.start();
- } else {
- try {
- allocsLoader.reloadAllocations();
- } catch (Exception e) {
- LOG.error("Failed to reload allocations file", e);
+ if (allocsLoader != null) {
+ allocsLoader.stop();
}
}
+
+ super.serviceStop();
+ }
+
+ @Override
+ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+ throws IOException {
+ try {
+ allocsLoader.reloadAllocations();
+ } catch (Exception e) {
+ LOG.error("Failed to reload allocations file", e);
+ }
}
@Override
@@ -1323,7 +1355,7 @@ public class FairScheduler extends Abstr
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo() {
- UserGroupInformation user = null;
+ UserGroupInformation user;
try {
user = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
@@ -1365,7 +1397,7 @@ public class FairScheduler extends Abstr
// if it does not already exist, so it can be displayed on the web UI.
synchronized (FairScheduler.this) {
allocConf = queueInfo;
- allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity);
+ allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
queueMgr.updateAllocationConfiguration(allocConf);
}
}
@@ -1385,11 +1417,11 @@ public class FairScheduler extends Abstr
@Override
public synchronized String moveApplication(ApplicationId appId,
String queueName) throws YarnException {
- SchedulerApplication app = applications.get(appId);
+ SchedulerApplication<FSAppAttempt> app = applications.get(appId);
if (app == null) {
throw new YarnException("App to be moved " + appId + " not found.");
}
- FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt();
+ FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
// To serialize with FairScheduler#allocate, synchronize on app attempt
synchronized (attempt) {
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
@@ -1402,8 +1434,7 @@ public class FairScheduler extends Abstr
return oldQueue.getQueueName();
}
- if (oldQueue.getRunnableAppSchedulables().contains(
- attempt.getAppSchedulable())) {
+ if (oldQueue.getRunnableAppSchedulables().contains(attempt)) {
verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
}
@@ -1412,7 +1443,7 @@ public class FairScheduler extends Abstr
}
}
- private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app,
+ private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app,
FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException {
String queueName = targetQueue.getQueueName();
ApplicationAttemptId appAttId = app.getApplicationAttemptId();
@@ -1449,8 +1480,8 @@ public class FairScheduler extends Abstr
* Helper for moveApplication, which has appropriate synchronization, so all
* operations will be atomic.
*/
- private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
- FSLeafQueue oldQueue, FSLeafQueue newQueue) {
+ private void executeMove(SchedulerApplication<FSAppAttempt> app,
+ FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) {
boolean wasRunnable = oldQueue.removeApp(attempt);
// if app was not runnable before, it may be runnable now
boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
@@ -1478,8 +1509,9 @@ public class FairScheduler extends Abstr
maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, oldQueue);
}
}
-
- private FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) {
+
+ @VisibleForTesting
+ FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) {
// Because queue names include ancestors, separated by periods, we can find
// the lowest common ancestors by going from the start of the names until
// there's a character that doesn't match.
@@ -1491,7 +1523,7 @@ public class FairScheduler extends Abstr
for (int i = 0; i < Math.max(name1.length(), name2.length()); i++) {
if (name1.length() <= i || name2.length() <= i ||
name1.charAt(i) != name2.charAt(i)) {
- return queueMgr.getQueue(name1.substring(lastPeriodIndex));
+ return queueMgr.getQueue(name1.substring(0, lastPeriodIndex));
} else if (name1.charAt(i) == '.') {
lastPeriodIndex = i;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Tue Aug 19 23:49:39 2014
@@ -101,6 +101,10 @@ public class FairSchedulerConfiguration
/** Whether preemption is enabled. */
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false;
+
+ protected static final String PREEMPTION_THRESHOLD =
+ CONF_PREFIX + "preemption.cluster-utilization-threshold";
+ protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
@@ -119,6 +123,11 @@ public class FairSchedulerConfiguration
protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
protected static final int DEFAULT_MAX_ASSIGN = -1;
+ /** The update interval for calculating resources in FairScheduler .*/
+ public static final String UPDATE_INTERVAL_MS =
+ CONF_PREFIX + "update-interval-ms";
+ public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;
+
public FairSchedulerConfiguration() {
super();
}
@@ -185,6 +194,10 @@ public class FairSchedulerConfiguration
return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
}
+ public float getPreemptionUtilizationThreshold() {
+ return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD);
+ }
+
public boolean getAssignMultiple() {
return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
}
@@ -238,6 +251,10 @@ public class FairSchedulerConfiguration
"Error reading resource config", ex);
}
}
+
+ public long getUpdateInterval() {
+ return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS);
+ }
private static int findResource(String val, String units)
throws AllocationConfigurationException {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java Tue Aug 19 23:49:39 2014
@@ -25,15 +25,15 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
- * Order {@link AppSchedulable} objects by priority and then by submit time, as
+ * Order {@link FSAppAttempt} objects by priority and then by submit time, as
* in the default scheduler in Hadoop.
*/
@Private
@Unstable
-public class FifoAppComparator implements Comparator<AppSchedulable>, Serializable {
+public class FifoAppComparator implements Comparator<FSAppAttempt>, Serializable {
private static final long serialVersionUID = 3428835083489547918L;
- public int compare(AppSchedulable a1, AppSchedulable a2) {
+ public int compare(FSAppAttempt a1, FSAppAttempt a2) {
int res = a1.getPriority().compareTo(a2.getPriority());
if (res == 0) {
if (a1.getStartTime() < a2.getStartTime()) {
@@ -44,7 +44,7 @@ public class FifoAppComparator implement
}
if (res == 0) {
// If there is a tie, break it by app ID to get a deterministic order
- res = a1.getApp().getApplicationId().compareTo(a2.getApp().getApplicationId());
+ res = a1.getApplicationId().compareTo(a2.getApplicationId());
}
return res;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java Tue Aug 19 23:49:39 2014
@@ -43,7 +43,7 @@ public class MaxRunningAppsEnforcer {
// Tracks the number of running applications by user.
private final Map<String, Integer> usersNumRunnableApps;
@VisibleForTesting
- final ListMultimap<String, AppSchedulable> usersNonRunnableApps;
+ final ListMultimap<String, FSAppAttempt> usersNonRunnableApps;
public MaxRunningAppsEnforcer(FairScheduler scheduler) {
this.scheduler = scheduler;
@@ -80,7 +80,7 @@ public class MaxRunningAppsEnforcer {
* Tracks the given new runnable app for purposes of maintaining max running
* app limits.
*/
- public void trackRunnableApp(FSSchedulerApp app) {
+ public void trackRunnableApp(FSAppAttempt app) {
String user = app.getUser();
FSLeafQueue queue = app.getQueue();
// Increment running counts for all parent queues
@@ -99,9 +99,9 @@ public class MaxRunningAppsEnforcer {
* Tracks the given new non runnable app so that it can be made runnable when
* it would not violate max running app limits.
*/
- public void trackNonRunnableApp(FSSchedulerApp app) {
+ public void trackNonRunnableApp(FSAppAttempt app) {
String user = app.getUser();
- usersNonRunnableApps.put(user, app.getAppSchedulable());
+ usersNonRunnableApps.put(user, app);
}
/**
@@ -111,7 +111,7 @@ public class MaxRunningAppsEnforcer {
* Runs in O(n log(n)) where n is the number of queues that are under the
* highest queue that went from having no slack to having slack.
*/
- public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) {
+ public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
// childqueueX might have no pending apps itself, but if a queue higher up
@@ -133,8 +133,8 @@ public class MaxRunningAppsEnforcer {
parent = parent.getParent();
}
- List<List<AppSchedulable>> appsNowMaybeRunnable =
- new ArrayList<List<AppSchedulable>>();
+ List<List<FSAppAttempt>> appsNowMaybeRunnable =
+ new ArrayList<List<FSAppAttempt>>();
// Compile lists of apps which may now be runnable
// We gather lists instead of building a set of all non-runnable apps so
@@ -150,26 +150,26 @@ public class MaxRunningAppsEnforcer {
userNumRunning = 0;
}
if (userNumRunning == allocConf.getUserMaxApps(user) - 1) {
- List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
+ List<FSAppAttempt> userWaitingApps = usersNonRunnableApps.get(user);
if (userWaitingApps != null) {
appsNowMaybeRunnable.add(userWaitingApps);
}
}
// Scan through and check whether this means that any apps are now runnable
- Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
+ Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator(
appsNowMaybeRunnable);
- FSSchedulerApp prev = null;
- List<AppSchedulable> noLongerPendingApps = new ArrayList<AppSchedulable>();
+ FSAppAttempt prev = null;
+ List<FSAppAttempt> noLongerPendingApps = new ArrayList<FSAppAttempt>();
while (iter.hasNext()) {
- FSSchedulerApp next = iter.next();
+ FSAppAttempt next = iter.next();
if (next == prev) {
continue;
}
if (canAppBeRunnable(next.getQueue(), next.getUser())) {
trackRunnableApp(next);
- AppSchedulable appSched = next.getAppSchedulable();
+ FSAppAttempt appSched = next;
next.getQueue().getRunnableAppSchedulables().add(appSched);
noLongerPendingApps.add(appSched);
@@ -186,14 +186,14 @@ public class MaxRunningAppsEnforcer {
// We remove the apps from their pending lists afterwards so that we don't
// pull them out from under the iterator. If they are not in these lists
// in the first place, there is a bug.
- for (AppSchedulable appSched : noLongerPendingApps) {
- if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables()
+ for (FSAppAttempt appSched : noLongerPendingApps) {
+ if (!appSched.getQueue().getNonRunnableAppSchedulables()
.remove(appSched)) {
LOG.error("Can't make app runnable that does not already exist in queue"
+ " as non-runnable: " + appSched + ". This should never happen.");
}
- if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) {
+ if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
LOG.error("Waiting app " + appSched + " expected to be in "
+ "usersNonRunnableApps, but was not. This should never happen.");
}
@@ -204,7 +204,7 @@ public class MaxRunningAppsEnforcer {
* Updates the relevant tracking variables after a runnable app with the given
* queue and user has been removed.
*/
- public void untrackRunnableApp(FSSchedulerApp app) {
+ public void untrackRunnableApp(FSAppAttempt app) {
// Update usersRunnableApps
String user = app.getUser();
int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
@@ -226,8 +226,8 @@ public class MaxRunningAppsEnforcer {
/**
* Stops tracking the given non-runnable app
*/
- public void untrackNonRunnableApp(FSSchedulerApp app) {
- usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable());
+ public void untrackNonRunnableApp(FSAppAttempt app) {
+ usersNonRunnableApps.remove(app.getUser(), app);
}
/**
@@ -235,7 +235,7 @@ public class MaxRunningAppsEnforcer {
* of non-runnable applications.
*/
private void gatherPossiblyRunnableAppLists(FSQueue queue,
- List<List<AppSchedulable>> appLists) {
+ List<List<FSAppAttempt>> appLists) {
if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration()
.getQueueMaxApps(queue.getName())) {
if (queue instanceof FSLeafQueue) {
@@ -259,14 +259,14 @@ public class MaxRunningAppsEnforcer {
* of O(num lists) time.
*/
static class MultiListStartTimeIterator implements
- Iterator<FSSchedulerApp> {
+ Iterator<FSAppAttempt> {
- private List<AppSchedulable>[] appLists;
+ private List<FSAppAttempt>[] appLists;
private int[] curPositionsInAppLists;
private PriorityQueue<IndexAndTime> appListsByCurStartTime;
@SuppressWarnings("unchecked")
- public MultiListStartTimeIterator(List<List<AppSchedulable>> appListList) {
+ public MultiListStartTimeIterator(List<List<FSAppAttempt>> appListList) {
appLists = appListList.toArray(new List[appListList.size()]);
curPositionsInAppLists = new int[appLists.length];
appListsByCurStartTime = new PriorityQueue<IndexAndTime>();
@@ -284,10 +284,10 @@ public class MaxRunningAppsEnforcer {
}
@Override
- public FSSchedulerApp next() {
+ public FSAppAttempt next() {
IndexAndTime indexAndTime = appListsByCurStartTime.remove();
int nextListIndex = indexAndTime.index;
- AppSchedulable next = appLists[nextListIndex]
+ FSAppAttempt next = appLists[nextListIndex]
.get(curPositionsInAppLists[nextListIndex]);
curPositionsInAppLists[nextListIndex]++;
@@ -299,7 +299,7 @@ public class MaxRunningAppsEnforcer {
}
appListsByCurStartTime.add(indexAndTime);
- return next.getApp();
+ return next;
}
@Override
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java Tue Aug 19 23:49:39 2014
@@ -48,7 +48,7 @@ public class NewAppWeightBooster extends
super.setConf(conf);
}
- public double adjustWeight(AppSchedulable app, double curWeight) {
+ public double adjustWeight(FSAppAttempt app, double curWeight) {
long start = app.getStartTime();
long now = System.currentTimeMillis();
if (now - start < duration) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Tue Aug 19 23:49:39 2014
@@ -74,7 +74,7 @@ public class QueueManager {
}
/**
- * Get a queue by name, creating it if the create param is true and is necessary.
+ * Get a leaf queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
* parent queue, or one of the parents in its name is already a leaf queue,
* null is returned.
@@ -85,31 +85,53 @@ public class QueueManager {
* could be referred to as just "parent1.queue2".
*/
public FSLeafQueue getLeafQueue(String name, boolean create) {
+ FSQueue queue = getQueue(name, create, FSQueueType.LEAF);
+ if (queue instanceof FSParentQueue) {
+ return null;
+ }
+ return (FSLeafQueue) queue;
+ }
+
+ /**
+ * Get a parent queue by name, creating it if the create param is true and is necessary.
+ * If the queue is not or can not be a parent queue, i.e. it already exists as a
+ * leaf queue, or one of the parents in its name is already a leaf queue,
+ * null is returned.
+ *
+ * The root part of the name is optional, so a queue underneath the root
+ * named "queue1" could be referred to as just "queue1", and a queue named
+ * "queue2" underneath a parent named "parent1" that is underneath the root
+ * could be referred to as just "parent1.queue2".
+ */
+ public FSParentQueue getParentQueue(String name, boolean create) {
+ FSQueue queue = getQueue(name, create, FSQueueType.PARENT);
+ if (queue instanceof FSLeafQueue) {
+ return null;
+ }
+ return (FSParentQueue) queue;
+ }
+
+ private FSQueue getQueue(String name, boolean create, FSQueueType queueType) {
name = ensureRootPrefix(name);
synchronized (queues) {
FSQueue queue = queues.get(name);
if (queue == null && create) {
- FSLeafQueue leafQueue = createLeafQueue(name);
- if (leafQueue == null) {
- return null;
- }
- queue = leafQueue;
- } else if (queue instanceof FSParentQueue) {
- return null;
+ // if the queue doesn't exist,create it and return
+ queue = createQueue(name, queueType);
}
- return (FSLeafQueue)queue;
+ return queue;
}
}
/**
- * Creates a leaf queue and places it in the tree. Creates any
- * parents that don't already exist.
+ * Creates a leaf or parent queue based on what is specified in 'queueType'
+ * and places it in the tree. Creates any parents that don't already exist.
*
* @return
* the created queue, if successful. null if not allowed (one of the parent
* queues in the queue name is already a leaf queue)
*/
- private FSLeafQueue createLeafQueue(String name) {
+ private FSQueue createQueue(String name, FSQueueType queueType) {
List<String> newQueueNames = new ArrayList<String>();
newQueueNames.add(name);
int sepIndex = name.length();
@@ -143,8 +165,7 @@ public class QueueManager {
FSLeafQueue leafQueue = null;
for (int i = newQueueNames.size()-1; i >= 0; i--) {
String queueName = newQueueNames.get(i);
- if (i == 0) {
- // First name added was the leaf queue
+ if (i == 0 && queueType != FSQueueType.PARENT) {
leafQueue = new FSLeafQueue(name, scheduler, parent);
try {
leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
@@ -155,6 +176,7 @@ public class QueueManager {
parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue);
+ return leafQueue;
} else {
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
try {
@@ -169,53 +191,64 @@ public class QueueManager {
}
}
- return leafQueue;
+ return parent;
}
/**
- * Make way for the given leaf queue if possible, by removing incompatible
+ * Make way for the given queue if possible, by removing incompatible
* queues with no apps in them. Incompatibility could be due to
- * (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in
- * the ancestry of leafToCreate.
+ * (1) queueToCreate being currently a parent but needs to change to leaf
+ * (2) queueToCreate being currently a leaf but needs to change to parent
+ * (3) an existing leaf queue in the ancestry of queueToCreate.
*
* We will never remove the root queue or the default queue in this way.
*
- * @return true if we can create leafToCreate or it already exists.
+ * @return true if we can create queueToCreate or it already exists.
*/
- private boolean removeEmptyIncompatibleQueues(String leafToCreate) {
- leafToCreate = ensureRootPrefix(leafToCreate);
+ private boolean removeEmptyIncompatibleQueues(String queueToCreate,
+ FSQueueType queueType) {
+ queueToCreate = ensureRootPrefix(queueToCreate);
- // Ensure leafToCreate is not root and doesn't have the default queue in its
+ // Ensure queueToCreate is not root and doesn't have the default queue in its
// ancestry.
- if (leafToCreate.equals(ROOT_QUEUE) ||
- leafToCreate.startsWith(
+ if (queueToCreate.equals(ROOT_QUEUE) ||
+ queueToCreate.startsWith(
ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
return false;
}
- FSQueue queue = queues.get(leafToCreate);
+ FSQueue queue = queues.get(queueToCreate);
// Queue exists already.
if (queue != null) {
if (queue instanceof FSLeafQueue) {
- // If it's an already existing leaf, we're ok.
- return true;
+ if (queueType == FSQueueType.LEAF) {
+ // if queue is already a leaf then return true
+ return true;
+ }
+ // remove incompatibility since queue is a leaf currently
+ // needs to change to a parent.
+ return removeQueueIfEmpty(queue);
} else {
- // If it's an existing parent queue, remove it if it's empty.
+ if (queueType == FSQueueType.PARENT) {
+ return true;
+ }
+ // If it's an existing parent queue and needs to change to leaf,
+ // remove it if it's empty.
return removeQueueIfEmpty(queue);
}
}
// Queue doesn't exist already. Check if the new queue would be created
// under an existing leaf queue. If so, try removing that leaf queue.
- int sepIndex = leafToCreate.length();
- sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
+ int sepIndex = queueToCreate.length();
+ sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
while (sepIndex != -1) {
- String prefixString = leafToCreate.substring(0, sepIndex);
+ String prefixString = queueToCreate.substring(0, sepIndex);
FSQueue prefixQueue = queues.get(prefixString);
if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
return removeQueueIfEmpty(prefixQueue);
}
- sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
+ sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
}
return true;
}
@@ -312,12 +345,21 @@ public class QueueManager {
}
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
- // Make sure all queues exist
- for (String name : queueConf.getQueueNames()) {
- if (removeEmptyIncompatibleQueues(name)) {
+ // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
+ for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) {
+ if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
getLeafQueue(name, true);
}
}
+
+ // At this point all leaves and 'parents with at least one child' would have been created.
+ // Now create parents with no configured leaf.
+ for (String name : queueConf.getConfiguredQueues().get(
+ FSQueueType.PARENT)) {
+ if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
+ getParentQueue(name, true);
+ }
+ }
for (FSQueue queue : queues.values()) {
// Update queue metrics
@@ -327,7 +369,7 @@ public class QueueManager {
// Set scheduling policies
try {
SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
- policy.initialize(scheduler.getClusterCapacity());
+ policy.initialize(scheduler.getClusterResource());
queue.setPolicy(policy);
} catch (AllocationConfigurationException ex) {
LOG.warn("Cannot apply configured scheduling policy to queue "
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java Tue Aug 19 23:49:39 2014
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.util.ReflectionUtils;
@@ -32,6 +34,8 @@ import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+@Private
+@Unstable
public class QueuePlacementPolicy {
private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses;
static {
@@ -42,17 +46,19 @@ public class QueuePlacementPolicy {
map.put("secondaryGroupExistingQueue",
QueuePlacementRule.SecondaryGroupExistingQueue.class);
map.put("specified", QueuePlacementRule.Specified.class);
+ map.put("nestedUserQueue",
+ QueuePlacementRule.NestedUserQueue.class);
map.put("default", QueuePlacementRule.Default.class);
map.put("reject", QueuePlacementRule.Reject.class);
ruleClasses = Collections.unmodifiableMap(map);
}
private final List<QueuePlacementRule> rules;
- private final Set<String> configuredQueues;
+ private final Map<FSQueueType, Set<String>> configuredQueues;
private final Groups groups;
public QueuePlacementPolicy(List<QueuePlacementRule> rules,
- Set<String> configuredQueues, Configuration conf)
+ Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
throws AllocationConfigurationException {
for (int i = 0; i < rules.size()-1; i++) {
if (rules.get(i).isTerminal()) {
@@ -72,28 +78,15 @@ public class QueuePlacementPolicy {
/**
* Builds a QueuePlacementPolicy from an xml element.
*/
- public static QueuePlacementPolicy fromXml(Element el, Set<String> configuredQueues,
- Configuration conf) throws AllocationConfigurationException {
+ public static QueuePlacementPolicy fromXml(Element el,
+ Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+ throws AllocationConfigurationException {
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
NodeList elements = el.getChildNodes();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
- Element element = (Element)node;
-
- String ruleName = element.getAttribute("name");
- if ("".equals(ruleName)) {
- throw new AllocationConfigurationException("No name provided for a " +
- "rule element");
- }
-
- Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
- if (clazz == null) {
- throw new AllocationConfigurationException("No rule class found for "
- + ruleName);
- }
- QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
- rule.initializeFromXml(element);
+ QueuePlacementRule rule = createAndInitializeRule(node);
rules.add(rule);
}
}
@@ -101,11 +94,37 @@ public class QueuePlacementPolicy {
}
/**
+ * Create and initialize a rule given a xml node
+ * @param node
+ * @return QueuePlacementPolicy
+ * @throws AllocationConfigurationException
+ */
+ public static QueuePlacementRule createAndInitializeRule(Node node)
+ throws AllocationConfigurationException {
+ Element element = (Element) node;
+
+ String ruleName = element.getAttribute("name");
+ if ("".equals(ruleName)) {
+ throw new AllocationConfigurationException("No name provided for a "
+ + "rule element");
+ }
+
+ Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
+ if (clazz == null) {
+ throw new AllocationConfigurationException("No rule class found for "
+ + ruleName);
+ }
+ QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
+ rule.initializeFromXml(element);
+ return rule;
+ }
+
+ /**
* Build a simple queue placement policy from the allow-undeclared-pools and
* user-as-default-queue configuration options.
*/
public static QueuePlacementPolicy fromConfiguration(Configuration conf,
- Set<String> configuredQueues) {
+ Map<FSQueueType, Set<String>> configuredQueues) {
boolean create = conf.getBoolean(
FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);