You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [16/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Aug 19 23:49:39 2014
@@ -20,14 +20,11 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -39,7 +36,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -53,9 +49,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
@@ -76,10 +69,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -99,6 +90,7 @@ import org.apache.hadoop.yarn.util.resou
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * A scheduler that schedules resources between a set of queues. The scheduler
@@ -122,11 +114,10 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Unstable
 @SuppressWarnings("unchecked")
-public class FairScheduler extends AbstractYarnScheduler {
-  private boolean initialized;
+public class FairScheduler extends
+    AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> {
   private FairSchedulerConfiguration conf;
-  private Resource minimumAllocation;
-  private Resource maximumAllocation;
+
   private Resource incrAllocation;
   private QueueManager queueMgr;
   private Clock clock;
@@ -142,25 +133,32 @@ public class FairScheduler extends Abstr
   public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
 
   // How often fair shares are re-calculated (ms)
-  protected long UPDATE_INTERVAL = 500;
+  protected long updateInterval;
+  private final int UPDATE_DEBUG_FREQUENCY = 5;
+  private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
+
+  @VisibleForTesting
+  Thread updateThread;
+
+  @VisibleForTesting
+  Thread schedulingThread;
+  // timeout to join when we stop this service
+  protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
   // Aggregate metrics
   FSQueueMetrics rootMetrics;
+  FSOpDurations fsOpDurations;
 
   // Time when we last updated preemption vars
   protected long lastPreemptionUpdateTime;
   // Time we last ran preemptTasksIfNecessary
   private long lastPreemptCheckTime;
 
-  // Nodes in the cluster, indexed by NodeId
-  private Map<NodeId, FSSchedulerNode> nodes = 
-      new ConcurrentHashMap<NodeId, FSSchedulerNode>();
-
-  // Aggregate capacity of the cluster
-  private Resource clusterCapacity = 
-      RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
+  // Preemption related variables
+  protected boolean preemptionEnabled;
+  protected float preemptionUtilizationThreshold;
 
-  // How often tasks are preempted 
+  // How often tasks are preempted
   protected long preemptionInterval; 
   
   // ms to wait before force killing stuff (must be longer than a couple
@@ -170,7 +168,6 @@ public class FairScheduler extends Abstr
   // Containers whose AMs have been warned that they will be preempted soon.
   private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
   
-  protected boolean preemptionEnabled;
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
   protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
@@ -194,6 +191,7 @@ public class FairScheduler extends Abstr
   AllocationConfiguration allocConf;
   
   public FairScheduler() {
+    super(FairScheduler.class.getName());
     clock = new SystemClock();
     allocsLoader = new AllocationFileLoaderService();
     queueMgr = new QueueManager(this);
@@ -246,34 +244,25 @@ public class FairScheduler extends Abstr
     return queueMgr;
   }
 
-  @Override
-  public RMContainer getRMContainer(ContainerId containerId) {
-    FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
-    return (attempt == null) ? null : attempt.getRMContainer(containerId);
-  }
-
-  private FSSchedulerApp getCurrentAttemptForContainer(
-      ContainerId containerId) {
-    SchedulerApplication app =
-        applications.get(containerId.getApplicationAttemptId()
-          .getApplicationId());
-    if (app != null) {
-      return (FSSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-
   /**
-   * A runnable which calls {@link FairScheduler#update()} every
-   * <code>UPDATE_INTERVAL</code> milliseconds.
+   * Thread which calls {@link FairScheduler#update()} every
+   * <code>updateInterval</code> milliseconds.
    */
-  private class UpdateThread implements Runnable {
+  private class UpdateThread extends Thread {
+
+    @Override
     public void run() {
-      while (true) {
+      while (!Thread.currentThread().isInterrupted()) {
         try {
-          Thread.sleep(UPDATE_INTERVAL);
+          Thread.sleep(updateInterval);
+          long start = getClock().getTime();
           update();
           preemptTasksIfNecessary();
+          long duration = getClock().getTime() - start;
+          fsOpDurations.addUpdateThreadRunDuration(duration);
+        } catch (InterruptedException ie) {
+          LOG.warn("Update thread interrupted. Exiting.");
+          return;
         } catch (Exception e) {
           LOG.error("Exception in fair scheduler UpdateThread", e);
         }
@@ -282,11 +271,32 @@ public class FairScheduler extends Abstr
   }
 
   /**
+   * Thread which attempts scheduling resources continuously,
+   * asynchronous to the node heartbeats.
+   */
+  private class ContinuousSchedulingThread extends Thread {
+
+    @Override
+    public void run() {
+      while (!Thread.currentThread().isInterrupted()) {
+        try {
+          continuousSchedulingAttempt();
+          Thread.sleep(getContinuousSchedulingSleepMs());
+        } catch (InterruptedException e) {
+          LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
+          return;
+        }
+      }
+    }
+  }
+
+  /**
    * Recompute the internal variables used by the scheduler - per-job weights,
    * fair shares, deficits, minimum slot allocations, and amount of used and
    * required resources per job.
    */
   protected synchronized void update() {
+    long start = getClock().getTime();
     updatePreemptionVariables(); // Determine if any queues merit preemption
 
     FSQueue rootQueue = queueMgr.getRootQueue();
@@ -294,10 +304,25 @@ public class FairScheduler extends Abstr
     // Recursively update demands for all queues
     rootQueue.updateDemand();
 
-    rootQueue.setFairShare(clusterCapacity);
+    rootQueue.setFairShare(clusterResource);
     // Recursively compute fair shares for all queues
     // and update metrics
     rootQueue.recomputeShares();
+
+    if (LOG.isDebugEnabled()) {
+      if (--updatesToSkipForDebug < 0) {
+        updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
+        LOG.debug("Cluster Capacity: " + clusterResource +
+            "  Allocations: " + rootMetrics.getAllocatedResources() +
+            "  Availability: " + Resource.newInstance(
+            rootMetrics.getAvailableMB(),
+            rootMetrics.getAvailableVirtualCores()) +
+            "  Demand: " + rootQueue.getDemand());
+      }
+    }
+
+    long duration = getClock().getTime() - start;
+    fsOpDurations.addUpdateCallDuration(duration);
   }
 
   /**
@@ -306,7 +331,7 @@ public class FairScheduler extends Abstr
    * for each type of task.
    */
   private void updatePreemptionVariables() {
-    long now = clock.getTime();
+    long now = getClock().getTime();
     lastPreemptionUpdateTime = now;
     for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
       if (!isStarvedForMinShare(sched)) {
@@ -322,9 +347,9 @@ public class FairScheduler extends Abstr
    * Is a queue below its min share for the given task type?
    */
   boolean isStarvedForMinShare(FSLeafQueue sched) {
-    Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+    Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
       sched.getMinShare(), sched.getDemand());
-    return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
+    return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
         sched.getResourceUsage(), desiredShare);
   }
 
@@ -333,9 +358,10 @@ public class FairScheduler extends Abstr
    * defined as being below half its fair share.
    */
   boolean isStarvedForFairShare(FSLeafQueue sched) {
-    Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+    Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR,
+        clusterResource,
         Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
-    return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
+    return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
         sched.getResourceUsage(), desiredFairShare);
   }
 
@@ -347,111 +373,98 @@ public class FairScheduler extends Abstr
    * and then select the right ones using preemptTasks.
    */
   protected synchronized void preemptTasksIfNecessary() {
-    if (!preemptionEnabled) {
+    if (!shouldAttemptPreemption()) {
       return;
     }
 
-    long curTime = clock.getTime();
+    long curTime = getClock().getTime();
     if (curTime - lastPreemptCheckTime < preemptionInterval) {
       return;
     }
     lastPreemptCheckTime = curTime;
 
-    Resource resToPreempt = Resources.none();
-
+    Resource resToPreempt = Resources.clone(Resources.none());
     for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
+      Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
     }
-    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt,
+    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
         Resources.none())) {
-      preemptResources(queueMgr.getLeafQueues(), resToPreempt);
+      preemptResources(resToPreempt);
     }
   }
 
   /**
-   * Preempt a quantity of resources from a list of QueueSchedulables. The
-   * policy for this is to pick apps from queues that are over their fair share,
-   * but make sure that no queue is placed below its fair share in the process.
-   * We further prioritize preemption by choosing containers with lowest
-   * priority to preempt.
+   * Preempt a quantity of resources. Each round, we start from the root queue,
+   * level-by-level, until choosing a candidate application.
+   * The policy for prioritizing preemption for each queue depends on its
+   * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is
+   * most over its fair share; (2) FIFO, choose the childSchedulable that is
+   * latest launched.
+   * Inside each application, we further prioritize preemption by choosing
+   * containers with lowest priority to preempt.
+   * We make sure that no queue is placed below its fair share in the process.
    */
-  protected void preemptResources(Collection<FSLeafQueue> scheds,
-      Resource toPreempt) {
-    if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
+  protected void preemptResources(Resource toPreempt) {
+    long start = getClock().getTime();
+    if (Resources.equals(toPreempt, Resources.none())) {
       return;
     }
 
-    Map<RMContainer, FSSchedulerApp> apps = 
-        new HashMap<RMContainer, FSSchedulerApp>();
-    Map<RMContainer, FSLeafQueue> queues = 
-        new HashMap<RMContainer, FSLeafQueue>();
-
-    // Collect running containers from over-scheduled queues
-    List<RMContainer> runningContainers = new ArrayList<RMContainer>();
-    for (FSLeafQueue sched : scheds) {
-      if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-          sched.getResourceUsage(), sched.getFairShare())) {
-        for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
-          for (RMContainer c : as.getApp().getLiveContainers()) {
-            runningContainers.add(c);
-            apps.put(c, as.getApp());
-            queues.put(c, sched);
-          }
-        }
-      }
-    }
-
-    // Sort containers into reverse order of priority
-    Collections.sort(runningContainers, new Comparator<RMContainer>() {
-      public int compare(RMContainer c1, RMContainer c2) {
-        int ret = c1.getContainer().getPriority().compareTo(
-            c2.getContainer().getPriority());
-        if (ret == 0) {
-          return c2.getContainerId().compareTo(c1.getContainerId());
-        }
-        return ret;
-      }
-    });
-    
     // Scan down the list of containers we've already warned and kill them
     // if we need to.  Remove any containers from the list that we don't need
     // or that are no longer running.
     Iterator<RMContainer> warnedIter = warnedContainers.iterator();
-    Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
     while (warnedIter.hasNext()) {
       RMContainer container = warnedIter.next();
-      if (container.getState() == RMContainerState.RUNNING &&
-          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+      if ((container.getState() == RMContainerState.RUNNING ||
+              container.getState() == RMContainerState.ALLOCATED) &&
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
               toPreempt, Resources.none())) {
-        warnOrKillContainer(container, apps.get(container), queues.get(container));
-        preemptedThisRound.add(container);
+        warnOrKillContainer(container);
         Resources.subtractFrom(toPreempt, container.getContainer().getResource());
       } else {
         warnedIter.remove();
       }
     }
 
-    // Scan down the rest of the containers until we've preempted enough, making
-    // sure we don't preempt too many from any queue
-    Iterator<RMContainer> runningIter = runningContainers.iterator();
-    while (runningIter.hasNext() &&
-        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-            toPreempt, Resources.none())) {
-      RMContainer container = runningIter.next();
-      FSLeafQueue sched = queues.get(container);
-      if (!preemptedThisRound.contains(container) &&
-          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-              sched.getResourceUsage(), sched.getFairShare())) {
-        warnOrKillContainer(container, apps.get(container), sched);
-        
-        warnedContainers.add(container);
-        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+    try {
+      // Reset preemptedResource for each app
+      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
+        for (FSAppAttempt app : queue.getRunnableAppSchedulables()) {
+          app.resetPreemptedResources();
+        }
+      }
+
+      while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
+          toPreempt, Resources.none())) {
+        RMContainer container =
+            getQueueManager().getRootQueue().preemptContainer();
+        if (container == null) {
+          break;
+        } else {
+          warnOrKillContainer(container);
+          warnedContainers.add(container);
+          Resources.subtractFrom(
+              toPreempt, container.getContainer().getResource());
+        }
+      }
+    } finally {
+      // Clear preemptedResources for each app
+      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
+        for (FSAppAttempt app : queue.getRunnableAppSchedulables()) {
+          app.clearPreemptedResources();
+        }
       }
     }
+
+    long duration = getClock().getTime() - start;
+    fsOpDurations.addPreemptCallDuration(duration);
   }
   
-  private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
-      FSLeafQueue queue) {
+  protected void warnOrKillContainer(RMContainer container) {
+    ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
+    FSAppAttempt app = getSchedulerApp(appAttemptId);
+    FSLeafQueue queue = app.getQueue();
     LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
         "res=" + container.getContainer().getResource() +
         ") from queue " + queue.getName());
@@ -461,21 +474,22 @@ public class FairScheduler extends Abstr
     if (time != null) {
       // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
       // proceed with kill
-      if (time + waitTimeBeforeKill < clock.getTime()) {
+      if (time + waitTimeBeforeKill < getClock().getTime()) {
         ContainerStatus status =
           SchedulerUtils.createPreemptedContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
+        recoverResourceRequestForContainer(container);
         // TODO: Not sure if this ever actually adds this to the list of cleanup
         // containers on the RMNode (see SchedulerNode.releaseContainer()).
         completedContainer(container, status, RMContainerEventType.KILL);
         LOG.info("Killing container" + container +
             " (after waiting for premption for " +
-            (clock.getTime() - time) + "ms)");
+            (getClock().getTime() - time) + "ms)");
       }
     } else {
-      // track the request in the FSSchedulerApp itself
-      app.addPreemption(container, clock.getTime());
+      // track the request in the FSAppAttempt itself
+      app.addPreemption(container, getClock().getTime());
     }
   }
 
@@ -496,20 +510,20 @@ public class FairScheduler extends Abstr
     Resource resDueToMinShare = Resources.none();
     Resource resDueToFairShare = Resources.none();
     if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
-      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
           sched.getMinShare(), sched.getDemand());
-      resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+      resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
           Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
     }
     if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
-      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
           sched.getFairShare(), sched.getDemand());
-      resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+      resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
           Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
     }
-    Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+    Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
         resDueToMinShare, resDueToFairShare);
-    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
         resToPreempt, Resources.none())) {
       String message = "Should preempt " + resToPreempt + " res for queue "
           + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
@@ -519,12 +533,13 @@ public class FairScheduler extends Abstr
     return resToPreempt;
   }
 
-  public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+  public synchronized RMContainerTokenSecretManager
+      getContainerTokenSecretManager() {
     return rmContext.getContainerTokenSecretManager();
   }
 
   // synchronized for sizeBasedWeight
-  public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
+  public synchronized ResourceWeights getAppWeight(FSAppAttempt app) {
     double weight = 1.0;
     if (sizeBasedWeight) {
       // Set weight based on current memory demand
@@ -540,18 +555,12 @@ public class FairScheduler extends Abstr
     return resourceWeights;
   }
 
-  @Override
-  public Resource getMinimumResourceCapability() {
-    return minimumAllocation;
-  }
-
   public Resource getIncrementResourceCapability() {
     return incrAllocation;
   }
 
-  @Override
-  public Resource getMaximumResourceCapability() {
-    return maximumAllocation;
+  private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) {
+    return nodes.get(nodeId);
   }
 
   public double getNodeLocalityThreshold() {
@@ -578,10 +587,6 @@ public class FairScheduler extends Abstr
     return continuousSchedulingSleepMs;
   }
 
-  public Resource getClusterCapacity() {
-    return clusterCapacity;
-  }
-
   public synchronized Clock getClock() {
     return clock;
   }
@@ -600,7 +605,7 @@ public class FairScheduler extends Abstr
    * configured limits, but the app will not be marked as runnable.
    */
   protected synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     if (queueName == null || queueName.isEmpty()) {
       String message = "Reject application " + applicationId +
               " submitted by user " + user + " with an empty queue name.";
@@ -629,16 +634,22 @@ public class FairScheduler extends Abstr
       return;
     }
   
-    SchedulerApplication application =
-        new SchedulerApplication(queue, user);
+    SchedulerApplication<FSAppAttempt> application =
+        new SchedulerApplication<FSAppAttempt>(queue, user);
     applications.put(applicationId, application);
     queue.getMetrics().submitApp(user);
 
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName + ", currently num of applications: "
         + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   /**
@@ -646,19 +657,20 @@ public class FairScheduler extends Abstr
    */
   protected synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
-    SchedulerApplication application =
+      boolean transferStateFromPreviousAttempt,
+      boolean isAttemptRecovering) {
+    SchedulerApplication<FSAppAttempt> application =
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
     FSLeafQueue queue = (FSLeafQueue) application.getQueue();
 
-    FSSchedulerApp attempt =
-        new FSSchedulerApp(applicationAttemptId, user,
+    FSAppAttempt attempt =
+        new FSAppAttempt(this, applicationAttemptId, user,
             queue, new ActiveUsersManager(getRootQueueMetrics()),
             rmContext);
     if (transferStateFromPreviousAttempt) {
       attempt.transferStateFromPreviousAttempt(application
-        .getCurrentAppAttempt());
+          .getCurrentAppAttempt());
     }
     application.setCurrentAppAttempt(attempt);
 
@@ -674,9 +686,17 @@ public class FairScheduler extends Abstr
 
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user: " + user);
-    rmContext.getDispatcher().getEventHandler().handle(
+
+    if (isAttemptRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(applicationAttemptId,
             RMAppAttemptEventType.ATTEMPT_ADDED));
+    }
   }
 
   /**
@@ -720,7 +740,8 @@ public class FairScheduler extends Abstr
 
   private synchronized void removeApplication(ApplicationId applicationId,
       RMAppState finalState) {
-    SchedulerApplication application = applications.get(applicationId);
+    SchedulerApplication<FSAppAttempt> application =
+        applications.get(applicationId);
     if (application == null){
       LOG.warn("Couldn't find application " + applicationId);
       return;
@@ -734,9 +755,9 @@ public class FairScheduler extends Abstr
       RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
     LOG.info("Application " + applicationAttemptId + " is done." +
         " finalState=" + rmAppAttemptFinalState);
-    SchedulerApplication application =
+    SchedulerApplication<FSAppAttempt> application =
         applications.get(applicationAttemptId.getApplicationId());
-    FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
+    FSAppAttempt attempt = getSchedulerApp(applicationAttemptId);
 
     if (attempt == null || application == null) {
       LOG.info("Unknown application " + applicationAttemptId + " has completed!");
@@ -787,7 +808,8 @@ public class FairScheduler extends Abstr
   /**
    * Clean up a completed container.
    */
-  private synchronized void completedContainer(RMContainer rmContainer,
+  @Override
+  protected synchronized void completedContainer(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
     if (rmContainer == null) {
       LOG.info("Null container completed...");
@@ -797,7 +819,7 @@ public class FairScheduler extends Abstr
     Container container = rmContainer.getContainer();
 
     // Get the application for the finished container
-    FSSchedulerApp application =
+    FSAppAttempt application =
         getCurrentAttemptForContainer(container.getId());
     ApplicationId appId =
         container.getId().getApplicationAttemptId().getApplicationId();
@@ -809,11 +831,10 @@ public class FairScheduler extends Abstr
     }
 
     // Get the node on which the container was allocated
-    FSSchedulerNode node = nodes.get(container.getNodeId());
+    FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
 
     if (rmContainer.getState() == RMContainerState.RESERVED) {
-      application.unreserve(node, rmContainer.getReservedPriority());
-      node.unreserveResource(application);
+      application.unreserve(rmContainer.getReservedPriority(), node);
     } else {
       application.containerCompleted(rmContainer, containerStatus, event);
       node.releaseContainer(container);
@@ -827,20 +848,20 @@ public class FairScheduler extends Abstr
 
   private synchronized void addNode(RMNode node) {
     nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
-    Resources.addTo(clusterCapacity, node.getTotalCapability());
+    Resources.addTo(clusterResource, node.getTotalCapability());
     updateRootQueueMetrics();
 
     LOG.info("Added node " + node.getNodeAddress() +
-        " cluster capacity: " + clusterCapacity);
+        " cluster capacity: " + clusterResource);
   }
 
   private synchronized void removeNode(RMNode rmNode) {
-    FSSchedulerNode node = nodes.get(rmNode.getNodeID());
+    FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID());
     // This can occur when an UNHEALTHY node reconnects
     if (node == null) {
       return;
     }
-    Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
+    Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
     updateRootQueueMetrics();
 
     // Remove running containers
@@ -865,7 +886,7 @@ public class FairScheduler extends Abstr
 
     nodes.remove(rmNode.getNodeID());
     LOG.info("Removed node " + rmNode.getNodeAddress() +
-        " cluster capacity: " + clusterCapacity);
+        " cluster capacity: " + clusterResource);
   }
 
   @Override
@@ -873,7 +894,7 @@ public class FairScheduler extends Abstr
       List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
 
     // Make sure this application exists
-    FSSchedulerApp application = getSchedulerApp(appAttemptId);
+    FSAppAttempt application = getSchedulerApp(appAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + appAttemptId);
@@ -882,25 +903,17 @@ public class FairScheduler extends Abstr
 
     // Sanity check
     SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
-        clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
+        clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
 
-    // Release containers
-    for (ContainerId releasedContainerId : release) {
-      RMContainer rmContainer = getRMContainer(releasedContainerId);
-      if (rmContainer == null) {
-        RMAuditLogger.logFailure(application.getUser(),
-            AuditConstants.RELEASE_CONTAINER,
-            "Unauthorized access or invalid container", "FairScheduler",
-            "Trying to release container not owned by app or with invalid id",
-            application.getApplicationId(), releasedContainerId);
-      }
-      completedContainer(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              releasedContainerId,
-              SchedulerUtils.RELEASED_CONTAINER),
-          RMContainerEventType.RELEASED);
+    // Set amResource for this app
+    if (!application.getUnmanagedAM() && ask.size() == 1
+        && application.getLiveContainers().isEmpty()) {
+      application.setAMResource(ask.get(0).getCapability());
     }
 
+    // Release containers
+    releaseContainers(release, application);
+
     synchronized (application) {
       if (!ask.isEmpty()) {
         if (LOG.isDebugEnabled()) {
@@ -913,14 +926,14 @@ public class FairScheduler extends Abstr
         // Update application requests
         application.updateResourceRequests(ask);
 
-        LOG.debug("allocate: post-update");
         application.showRequests();
       }
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("allocate:" +
+        LOG.debug("allocate: post-update" +
             " applicationAttemptId=" + appAttemptId +
-            " #ask=" + ask.size());
+            " #ask=" + ask.size() +
+            " reservation= " + application.getCurrentReservation());
 
         LOG.debug("Preempting " + application.getPreemptionContainers().size()
             + " container(s)");
@@ -941,33 +954,18 @@ public class FairScheduler extends Abstr
   }
 
   /**
-   * Process a container which has launched on a node, as reported by the node.
-   */
-  private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
-    // Get the application for the finished container
-    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      return;
-    }
-
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
-  /**
    * Process a heartbeat update from a node.
    */
   private synchronized void nodeUpdate(RMNode nm) {
+    long start = getClock().getTime();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+      LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
     }
     eventLog.log("HEARTBEAT", nm.getHostName());
-    FSSchedulerNode node = nodes.get(nm.getNodeID());
+    FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
 
     // Update resource if any change
-    SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
+    SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
     
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
@@ -996,41 +994,38 @@ public class FairScheduler extends Abstr
     } else {
       attemptScheduling(node);
     }
+
+    long duration = getClock().getTime() - start;
+    fsOpDurations.addNodeUpdateDuration(duration);
   }
 
-  private void continuousScheduling() {
-    while (true) {
-      List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
-      // Sort the nodes by space available on them, so that we offer
-      // containers on emptier nodes first, facilitating an even spread. This
-      // requires holding the scheduler lock, so that the space available on a
-      // node doesn't change during the sort.
-      synchronized (this) {
-        Collections.sort(nodeIdList, nodeAvailableResourceComparator);
-      }
-
-      // iterate all nodes
-      for (NodeId nodeId : nodeIdList) {
-        if (nodes.containsKey(nodeId)) {
-          FSSchedulerNode node = nodes.get(nodeId);
-          try {
-            if (Resources.fitsIn(minimumAllocation,
-                    node.getAvailableResource())) {
-              attemptScheduling(node);
-            }
-          } catch (Throwable ex) {
-            LOG.warn("Error while attempting scheduling for node " + node +
-                    ": " + ex.toString(), ex);
-          }
-        }
-      }
+  void continuousSchedulingAttempt() throws InterruptedException {
+    long start = getClock().getTime();
+    List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
+    // Sort the nodes by space available on them, so that we offer
+    // containers on emptier nodes first, facilitating an even spread. This
+    // requires holding the scheduler lock, so that the space available on a
+    // node doesn't change during the sort.
+    synchronized (this) {
+      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
+    }
+
+    // iterate all nodes
+    for (NodeId nodeId : nodeIdList) {
+      FSSchedulerNode node = getFSSchedulerNode(nodeId);
       try {
-        Thread.sleep(getContinuousSchedulingSleepMs());
-      } catch (InterruptedException e) {
-        LOG.warn("Error while doing sleep in continuous scheduling: " +
-                e.toString(), e);
+        if (node != null && Resources.fitsIn(minimumAllocation,
+            node.getAvailableResource())) {
+          attemptScheduling(node);
+        }
+      } catch (Throwable ex) {
+        LOG.error("Error while attempting scheduling for node " + node +
+            ": " + ex.toString(), ex);
       }
     }
+
+    long duration = getClock().getTime() - start;
+    fsOpDurations.addContinuousSchedulingRunDuration(duration);
   }
 
   /** Sort nodes by available resource */
@@ -1038,7 +1033,13 @@ public class FairScheduler extends Abstr
 
     @Override
     public int compare(NodeId n1, NodeId n2) {
-      return RESOURCE_CALCULATOR.compare(clusterCapacity,
+      if (!nodes.containsKey(n1)) {
+        return 1;
+      }
+      if (!nodes.containsKey(n2)) {
+        return -1;
+      }
+      return RESOURCE_CALCULATOR.compare(clusterResource,
               nodes.get(n2).getAvailableResource(),
               nodes.get(n1).getAvailableResource());
     }
@@ -1049,13 +1050,13 @@ public class FairScheduler extends Abstr
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations
 
-    AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
+    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
     if (reservedAppSchedulable != null) {
       Priority reservedPriority = node.getReservedContainer().getReservedPriority();
       if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
         // Don't hold the reservation if app can no longer use it
         LOG.info("Releasing reservation that cannot be satisfied for application "
-            + reservedAppSchedulable.getApp().getApplicationAttemptId()
+            + reservedAppSchedulable.getApplicationAttemptId()
             + " on node " + node);
         reservedAppSchedulable.unreserve(reservedPriority, node);
         reservedAppSchedulable = null;
@@ -1063,7 +1064,7 @@ public class FairScheduler extends Abstr
         // Reservation exists; try to fulfill the reservation
         if (LOG.isDebugEnabled()) {
           LOG.debug("Trying to fulfill reservation for application "
-              + reservedAppSchedulable.getApp().getApplicationAttemptId()
+              + reservedAppSchedulable.getApplicationAttemptId()
               + " on node: " + node);
         }
         
@@ -1075,9 +1076,8 @@ public class FairScheduler extends Abstr
       int assignedContainers = 0;
       while (node.getReservedContainer() == null) {
         boolean assignedContainer = false;
-        if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-              queueMgr.getRootQueue().assignContainer(node),
-              Resources.none())) {
+        if (!queueMgr.getRootQueue().assignContainer(node).equals(
+            Resources.none())) {
           assignedContainers++;
           assignedContainer = true;
         }
@@ -1089,45 +1089,8 @@ public class FairScheduler extends Abstr
     updateRootQueueMetrics();
   }
 
-  @Override
-  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    FSSchedulerNode node = nodes.get(nodeId);
-    return node == null ? null : new SchedulerNodeReport(node);
-  }
-  
-  public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
-    SchedulerApplication app =
-        applications.get(appAttemptId.getApplicationId());
-    if (app != null) {
-      return (FSSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-  
-  @Override
-  public SchedulerAppReport getSchedulerAppInfo(
-      ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
-    if (attempt == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
-      }
-      return null;
-    }
-    return new SchedulerAppReport(attempt);
-  }
-  
-  @Override
-  public ApplicationResourceUsageReport getAppResourceUsageReport(
-      ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
-    if (attempt == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
-      }
-      return null;
-    }
-    return attempt.getResourceUsageReport();
+  public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
+    return super.getApplicationAttempt(appAttemptId);
   }
   
   /**
@@ -1139,7 +1102,23 @@ public class FairScheduler extends Abstr
   private void updateRootQueueMetrics() {
     rootMetrics.setAvailableResourcesToQueue(
         Resources.subtract(
-            clusterCapacity, rootMetrics.getAllocatedResources()));
+            clusterResource, rootMetrics.getAllocatedResources()));
+  }
+
+  /**
+   * Check if preemption is enabled and the utilization threshold for
+   * preemption is met.
+   *
+   * @return true if preemption should be attempted, false otherwise.
+   */
+  private boolean shouldAttemptPreemption() {
+    if (preemptionEnabled) {
+      return (preemptionUtilizationThreshold < Math.max(
+          (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
+          (float) rootMetrics.getAllocatedVirtualCores() /
+              clusterResource.getVirtualCores()));
+    }
+    return false;
   }
 
   @Override
@@ -1156,6 +1135,8 @@ public class FairScheduler extends Abstr
       }
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
+      recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+          nodeAddedEvent.getAddedRMNode());
       break;
     case NODE_REMOVED:
       if (!(event instanceof NodeRemovedSchedulerEvent)) {
@@ -1177,7 +1158,8 @@ public class FairScheduler extends Abstr
       }
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
       break;
     case APP_REMOVED:
       if (!(event instanceof AppRemovedSchedulerEvent)) {
@@ -1194,7 +1176,8 @@ public class FairScheduler extends Abstr
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+        appAttemptAddedEvent.getIsAttemptRecovering());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@@ -1230,85 +1213,134 @@ public class FairScheduler extends Abstr
     // NOT IMPLEMENTED
   }
 
-  @Override
-  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
+
+  private synchronized void initScheduler(Configuration conf)
       throws IOException {
-    if (!initialized) {
-      this.conf = new FairSchedulerConfiguration(conf);
-      validateConf(this.conf);
-      minimumAllocation = this.conf.getMinimumAllocation();
-      maximumAllocation = this.conf.getMaximumAllocation();
-      incrAllocation = this.conf.getIncrementAllocation();
-      continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
-      continuousSchedulingSleepMs =
-              this.conf.getContinuousSchedulingSleepMs();
-      nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
-      rackLocalityThreshold = this.conf.getLocalityThresholdRack();
-      nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
-      rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
-      preemptionEnabled = this.conf.getPreemptionEnabled();
-      assignMultiple = this.conf.getAssignMultiple();
-      maxAssign = this.conf.getMaxAssign();
-      sizeBasedWeight = this.conf.getSizeBasedWeight();
-      preemptionInterval = this.conf.getPreemptionInterval();
-      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
-      usePortForNodeName = this.conf.getUsePortForNodeName();
-      
-      rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
-      this.rmContext = rmContext;
-      // This stores per-application scheduling information
-      this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
-      this.eventLog = new FairSchedulerEventLog();
-      eventLog.init(this.conf);
+    this.conf = new FairSchedulerConfiguration(conf);
+    validateConf(this.conf);
+    minimumAllocation = this.conf.getMinimumAllocation();
+    maximumAllocation = this.conf.getMaximumAllocation();
+    incrAllocation = this.conf.getIncrementAllocation();
+    continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+    continuousSchedulingSleepMs =
+        this.conf.getContinuousSchedulingSleepMs();
+    nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+    rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+    nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+    rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+    preemptionEnabled = this.conf.getPreemptionEnabled();
+    preemptionUtilizationThreshold =
+        this.conf.getPreemptionUtilizationThreshold();
+    assignMultiple = this.conf.getAssignMultiple();
+    maxAssign = this.conf.getMaxAssign();
+    sizeBasedWeight = this.conf.getSizeBasedWeight();
+    preemptionInterval = this.conf.getPreemptionInterval();
+    waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+    usePortForNodeName = this.conf.getUsePortForNodeName();
+
+    updateInterval = this.conf.getUpdateInterval();
+    if (updateInterval < 0) {
+      updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
+      LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
+              + " is invalid, so using default value " +
+              + FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+              + " ms instead");
+    }
+
+    rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
+    fsOpDurations = FSOpDurations.getInstance(true);
+
+    // This stores per-application scheduling information
+    this.applications = new ConcurrentHashMap<
+        ApplicationId, SchedulerApplication<FSAppAttempt>>();
+    this.eventLog = new FairSchedulerEventLog();
+    eventLog.init(this.conf);
 
-      initialized = true;
+    allocConf = new AllocationConfiguration(conf);
+    try {
+      queueMgr.initialize(conf);
+    } catch (Exception e) {
+      throw new IOException("Failed to start FairScheduler", e);
+    }
 
-      allocConf = new AllocationConfiguration(conf);
-      try {
-        queueMgr.initialize(conf);
-      } catch (Exception e) {
-        throw new IOException("Failed to start FairScheduler", e);
-      }
+    updateThread = new UpdateThread();
+    updateThread.setName("FairSchedulerUpdateThread");
+    updateThread.setDaemon(true);
 
-      Thread updateThread = new Thread(new UpdateThread());
-      updateThread.setName("FairSchedulerUpdateThread");
-      updateThread.setDaemon(true);
-      updateThread.start();
+    if (continuousSchedulingEnabled) {
+      // start continuous scheduling thread
+      schedulingThread = new ContinuousSchedulingThread();
+      schedulingThread.setName("FairSchedulerContinuousScheduling");
+      schedulingThread.setDaemon(true);
+    }
 
-      if (continuousSchedulingEnabled) {
-        // start continuous scheduling thread
-        Thread schedulingThread = new Thread(
-          new Runnable() {
-            @Override
-            public void run() {
-              continuousScheduling();
-            }
-          }
-        );
-        schedulingThread.setName("ContinuousScheduling");
-        schedulingThread.setDaemon(true);
-        schedulingThread.start();
+    allocsLoader.init(conf);
+    allocsLoader.setReloadListener(new AllocationReloadListener());
+    // If we fail to load allocations file on initialize, we want to fail
+    // immediately.  After a successful load, exceptions on future reloads
+    // will just result in leaving things as they are.
+    try {
+      allocsLoader.reloadAllocations();
+    } catch (Exception e) {
+      throw new IOException("Failed to initialize FairScheduler", e);
+    }
+  }
+
+  private synchronized void startSchedulerThreads() {
+    Preconditions.checkNotNull(updateThread, "updateThread is null");
+    Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
+    updateThread.start();
+    if (continuousSchedulingEnabled) {
+      Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
+      schedulingThread.start();
+    }
+    allocsLoader.start();
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    initScheduler(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    startSchedulerThreads();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized (this) {
+      if (updateThread != null) {
+        updateThread.interrupt();
+        updateThread.join(THREAD_JOIN_TIMEOUT_MS);
       }
-      
-      allocsLoader.init(conf);
-      allocsLoader.setReloadListener(new AllocationReloadListener());
-      // If we fail to load allocations file on initialize, we want to fail
-      // immediately.  After a successful load, exceptions on future reloads
-      // will just result in leaving things as they are.
-      try {
-        allocsLoader.reloadAllocations();
-      } catch (Exception e) {
-        throw new IOException("Failed to initialize FairScheduler", e);
+      if (continuousSchedulingEnabled) {
+        if (schedulingThread != null) {
+          schedulingThread.interrupt();
+          schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
+        }
       }
-      allocsLoader.start();
-    } else {
-      try {
-        allocsLoader.reloadAllocations();
-      } catch (Exception e) {
-        LOG.error("Failed to reload allocations file", e);
+      if (allocsLoader != null) {
+        allocsLoader.stop();
       }
     }
+
+    super.serviceStop();
+  }
+
+  @Override
+  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+      throws IOException {
+    try {
+      allocsLoader.reloadAllocations();
+    } catch (Exception e) {
+      LOG.error("Failed to reload allocations file", e);
+    }
   }
 
   @Override
@@ -1323,7 +1355,7 @@ public class FairScheduler extends Abstr
 
   @Override
   public List<QueueUserACLInfo> getQueueUserAclInfo() {
-    UserGroupInformation user = null;
+    UserGroupInformation user;
     try {
       user = UserGroupInformation.getCurrentUser();
     } catch (IOException ioe) {
@@ -1365,7 +1397,7 @@ public class FairScheduler extends Abstr
       // if it does not already exist, so it can be displayed on the web UI.
       synchronized (FairScheduler.this) {
         allocConf = queueInfo;
-        allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity);
+        allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
         queueMgr.updateAllocationConfiguration(allocConf);
       }
     }
@@ -1385,11 +1417,11 @@ public class FairScheduler extends Abstr
   @Override
   public synchronized String moveApplication(ApplicationId appId,
       String queueName) throws YarnException {
-    SchedulerApplication app = applications.get(appId);
+    SchedulerApplication<FSAppAttempt> app = applications.get(appId);
     if (app == null) {
       throw new YarnException("App to be moved " + appId + " not found.");
     }
-    FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt();
+    FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
     // To serialize with FairScheduler#allocate, synchronize on app attempt
     synchronized (attempt) {
       FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
@@ -1402,8 +1434,7 @@ public class FairScheduler extends Abstr
         return oldQueue.getQueueName();
       }
       
-      if (oldQueue.getRunnableAppSchedulables().contains(
-          attempt.getAppSchedulable())) {
+      if (oldQueue.getRunnableAppSchedulables().contains(attempt)) {
         verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
       }
       
@@ -1412,7 +1443,7 @@ public class FairScheduler extends Abstr
     }
   }
   
-  private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app,
+  private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app,
       FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException {
     String queueName = targetQueue.getQueueName();
     ApplicationAttemptId appAttId = app.getApplicationAttemptId();
@@ -1449,8 +1480,8 @@ public class FairScheduler extends Abstr
    * Helper for moveApplication, which has appropriate synchronization, so all
    * operations will be atomic.
    */
-  private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
-      FSLeafQueue oldQueue, FSLeafQueue newQueue) {
+  private void executeMove(SchedulerApplication<FSAppAttempt> app,
+      FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) {
     boolean wasRunnable = oldQueue.removeApp(attempt);
     // if app was not runnable before, it may be runnable now
     boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
@@ -1478,8 +1509,9 @@ public class FairScheduler extends Abstr
       maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, oldQueue);
     }
   }
-  
-  private FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) {
+
+  @VisibleForTesting
+  FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) {
     // Because queue names include ancestors, separated by periods, we can find
     // the lowest common ancestors by going from the start of the names until
     // there's a character that doesn't match.
@@ -1491,7 +1523,7 @@ public class FairScheduler extends Abstr
     for (int i = 0; i < Math.max(name1.length(), name2.length()); i++) {
       if (name1.length() <= i || name2.length() <= i ||
           name1.charAt(i) != name2.charAt(i)) {
-        return queueMgr.getQueue(name1.substring(lastPeriodIndex));
+        return queueMgr.getQueue(name1.substring(0, lastPeriodIndex));
       } else if (name1.charAt(i) == '.') {
         lastPeriodIndex = i;
       }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Tue Aug 19 23:49:39 2014
@@ -101,6 +101,10 @@ public class FairSchedulerConfiguration 
   /** Whether preemption is enabled. */
   protected static final String  PREEMPTION = CONF_PREFIX + "preemption";
   protected static final boolean DEFAULT_PREEMPTION = false;
+
+  protected static final String PREEMPTION_THRESHOLD =
+      CONF_PREFIX + "preemption.cluster-utilization-threshold";
+  protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
   
   protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
   protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
@@ -119,6 +123,11 @@ public class FairSchedulerConfiguration 
   protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
   protected static final int DEFAULT_MAX_ASSIGN = -1;
 
+  /** The update interval for calculating resources in FairScheduler .*/
+  public static final String UPDATE_INTERVAL_MS =
+      CONF_PREFIX + "update-interval-ms";
+  public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;
+
   public FairSchedulerConfiguration() {
     super();
   }
@@ -185,6 +194,10 @@ public class FairSchedulerConfiguration 
     return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
   }
 
+  public float getPreemptionUtilizationThreshold() {
+    return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD);
+  }
+
   public boolean getAssignMultiple() {
     return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
   }
@@ -238,6 +251,10 @@ public class FairSchedulerConfiguration 
           "Error reading resource config", ex);
     }
   }
+
+  public long getUpdateInterval() {
+    return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS);
+  }
   
   private static int findResource(String val, String units)
     throws AllocationConfigurationException {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java Tue Aug 19 23:49:39 2014
@@ -25,15 +25,15 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 
 /**
- * Order {@link AppSchedulable} objects by priority and then by submit time, as
+ * Order {@link FSAppAttempt} objects by priority and then by submit time, as
  * in the default scheduler in Hadoop.
  */
 @Private
 @Unstable
-public class FifoAppComparator implements Comparator<AppSchedulable>, Serializable {
+public class FifoAppComparator implements Comparator<FSAppAttempt>, Serializable {
   private static final long serialVersionUID = 3428835083489547918L;
 
-  public int compare(AppSchedulable a1, AppSchedulable a2) {
+  public int compare(FSAppAttempt a1, FSAppAttempt a2) {
     int res = a1.getPriority().compareTo(a2.getPriority());
     if (res == 0) {
       if (a1.getStartTime() < a2.getStartTime()) {
@@ -44,7 +44,7 @@ public class FifoAppComparator implement
     }
     if (res == 0) {
       // If there is a tie, break it by app ID to get a deterministic order
-      res = a1.getApp().getApplicationId().compareTo(a2.getApp().getApplicationId());
+      res = a1.getApplicationId().compareTo(a2.getApplicationId());
     }
     return res;
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java Tue Aug 19 23:49:39 2014
@@ -43,7 +43,7 @@ public class MaxRunningAppsEnforcer {
   // Tracks the number of running applications by user.
   private final Map<String, Integer> usersNumRunnableApps;
   @VisibleForTesting
-  final ListMultimap<String, AppSchedulable> usersNonRunnableApps;
+  final ListMultimap<String, FSAppAttempt> usersNonRunnableApps;
 
   public MaxRunningAppsEnforcer(FairScheduler scheduler) {
     this.scheduler = scheduler;
@@ -80,7 +80,7 @@ public class MaxRunningAppsEnforcer {
    * Tracks the given new runnable app for purposes of maintaining max running
    * app limits.
    */
-  public void trackRunnableApp(FSSchedulerApp app) {
+  public void trackRunnableApp(FSAppAttempt app) {
     String user = app.getUser();
     FSLeafQueue queue = app.getQueue();
     // Increment running counts for all parent queues
@@ -99,9 +99,9 @@ public class MaxRunningAppsEnforcer {
    * Tracks the given new non runnable app so that it can be made runnable when
    * it would not violate max running app limits.
    */
-  public void trackNonRunnableApp(FSSchedulerApp app) {
+  public void trackNonRunnableApp(FSAppAttempt app) {
     String user = app.getUser();
-    usersNonRunnableApps.put(user, app.getAppSchedulable());
+    usersNonRunnableApps.put(user, app);
   }
 
   /**
@@ -111,7 +111,7 @@ public class MaxRunningAppsEnforcer {
    * Runs in O(n log(n)) where n is the number of queues that are under the
    * highest queue that went from having no slack to having slack.
    */
-  public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) {
+  public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) {
     AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     
     // childqueueX might have no pending apps itself, but if a queue higher up
@@ -133,8 +133,8 @@ public class MaxRunningAppsEnforcer {
       parent = parent.getParent();
     }
 
-    List<List<AppSchedulable>> appsNowMaybeRunnable =
-        new ArrayList<List<AppSchedulable>>();
+    List<List<FSAppAttempt>> appsNowMaybeRunnable =
+        new ArrayList<List<FSAppAttempt>>();
 
     // Compile lists of apps which may now be runnable
     // We gather lists instead of building a set of all non-runnable apps so
@@ -150,26 +150,26 @@ public class MaxRunningAppsEnforcer {
       userNumRunning = 0;
     }
     if (userNumRunning == allocConf.getUserMaxApps(user) - 1) {
-      List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
+      List<FSAppAttempt> userWaitingApps = usersNonRunnableApps.get(user);
       if (userWaitingApps != null) {
         appsNowMaybeRunnable.add(userWaitingApps);
       }
     }
 
     // Scan through and check whether this means that any apps are now runnable
-    Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
+    Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator(
         appsNowMaybeRunnable);
-    FSSchedulerApp prev = null;
-    List<AppSchedulable> noLongerPendingApps = new ArrayList<AppSchedulable>();
+    FSAppAttempt prev = null;
+    List<FSAppAttempt> noLongerPendingApps = new ArrayList<FSAppAttempt>();
     while (iter.hasNext()) {
-      FSSchedulerApp next = iter.next();
+      FSAppAttempt next = iter.next();
       if (next == prev) {
         continue;
       }
 
       if (canAppBeRunnable(next.getQueue(), next.getUser())) {
         trackRunnableApp(next);
-        AppSchedulable appSched = next.getAppSchedulable();
+        FSAppAttempt appSched = next;
         next.getQueue().getRunnableAppSchedulables().add(appSched);
         noLongerPendingApps.add(appSched);
 
@@ -186,14 +186,14 @@ public class MaxRunningAppsEnforcer {
     // We remove the apps from their pending lists afterwards so that we don't
     // pull them out from under the iterator.  If they are not in these lists
     // in the first place, there is a bug.
-    for (AppSchedulable appSched : noLongerPendingApps) {
-      if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables()
+    for (FSAppAttempt appSched : noLongerPendingApps) {
+      if (!appSched.getQueue().getNonRunnableAppSchedulables()
           .remove(appSched)) {
         LOG.error("Can't make app runnable that does not already exist in queue"
             + " as non-runnable: " + appSched + ". This should never happen.");
       }
       
-      if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) {
+      if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
         LOG.error("Waiting app " + appSched + " expected to be in "
         		+ "usersNonRunnableApps, but was not. This should never happen.");
       }
@@ -204,7 +204,7 @@ public class MaxRunningAppsEnforcer {
    * Updates the relevant tracking variables after a runnable app with the given
    * queue and user has been removed.
    */
-  public void untrackRunnableApp(FSSchedulerApp app) {
+  public void untrackRunnableApp(FSAppAttempt app) {
     // Update usersRunnableApps
     String user = app.getUser();
     int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
@@ -226,8 +226,8 @@ public class MaxRunningAppsEnforcer {
   /**
    * Stops tracking the given non-runnable app
    */
-  public void untrackNonRunnableApp(FSSchedulerApp app) {
-    usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable());
+  public void untrackNonRunnableApp(FSAppAttempt app) {
+    usersNonRunnableApps.remove(app.getUser(), app);
   }
 
   /**
@@ -235,7 +235,7 @@ public class MaxRunningAppsEnforcer {
    * of non-runnable applications.
    */
   private void gatherPossiblyRunnableAppLists(FSQueue queue,
-      List<List<AppSchedulable>> appLists) {
+      List<List<FSAppAttempt>> appLists) {
     if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration()
         .getQueueMaxApps(queue.getName())) {
       if (queue instanceof FSLeafQueue) {
@@ -259,14 +259,14 @@ public class MaxRunningAppsEnforcer {
    * of O(num lists) time.
    */
   static class MultiListStartTimeIterator implements
-      Iterator<FSSchedulerApp> {
+      Iterator<FSAppAttempt> {
 
-    private List<AppSchedulable>[] appLists;
+    private List<FSAppAttempt>[] appLists;
     private int[] curPositionsInAppLists;
     private PriorityQueue<IndexAndTime> appListsByCurStartTime;
 
     @SuppressWarnings("unchecked")
-    public MultiListStartTimeIterator(List<List<AppSchedulable>> appListList) {
+    public MultiListStartTimeIterator(List<List<FSAppAttempt>> appListList) {
       appLists = appListList.toArray(new List[appListList.size()]);
       curPositionsInAppLists = new int[appLists.length];
       appListsByCurStartTime = new PriorityQueue<IndexAndTime>();
@@ -284,10 +284,10 @@ public class MaxRunningAppsEnforcer {
     }
 
     @Override
-    public FSSchedulerApp next() {
+    public FSAppAttempt next() {
       IndexAndTime indexAndTime = appListsByCurStartTime.remove();
       int nextListIndex = indexAndTime.index;
-      AppSchedulable next = appLists[nextListIndex]
+      FSAppAttempt next = appLists[nextListIndex]
           .get(curPositionsInAppLists[nextListIndex]);
       curPositionsInAppLists[nextListIndex]++;
 
@@ -299,7 +299,7 @@ public class MaxRunningAppsEnforcer {
       }
       appListsByCurStartTime.add(indexAndTime);
 
-      return next.getApp();
+      return next;
     }
 
     @Override

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java Tue Aug 19 23:49:39 2014
@@ -48,7 +48,7 @@ public class NewAppWeightBooster extends
     super.setConf(conf);
   }
 
-  public double adjustWeight(AppSchedulable app, double curWeight) {
+  public double adjustWeight(FSAppAttempt app, double curWeight) {
     long start = app.getStartTime();
     long now = System.currentTimeMillis();
     if (now - start < duration) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Tue Aug 19 23:49:39 2014
@@ -74,7 +74,7 @@ public class QueueManager {
   }
   
   /**
-   * Get a queue by name, creating it if the create param is true and is necessary.
+   * Get a leaf queue by name, creating it if the create param is true and is necessary.
    * If the queue is not or can not be a leaf queue, i.e. it already exists as a
    * parent queue, or one of the parents in its name is already a leaf queue,
    * null is returned.
@@ -85,31 +85,53 @@ public class QueueManager {
    * could be referred to as just "parent1.queue2".
    */
   public FSLeafQueue getLeafQueue(String name, boolean create) {
+    FSQueue queue = getQueue(name, create, FSQueueType.LEAF);
+    if (queue instanceof FSParentQueue) {
+      return null;
+    }
+    return (FSLeafQueue) queue;
+  }
+  
+  /**
+   * Get a parent queue by name, creating it if the create param is true and is necessary.
+   * If the queue is not or can not be a parent queue, i.e. it already exists as a
+   * leaf queue, or one of the parents in its name is already a leaf queue,
+   * null is returned.
+   * 
+   * The root part of the name is optional, so a queue underneath the root 
+   * named "queue1" could be referred to  as just "queue1", and a queue named
+   * "queue2" underneath a parent named "parent1" that is underneath the root 
+   * could be referred to as just "parent1.queue2".
+   */
+  public FSParentQueue getParentQueue(String name, boolean create) {
+    FSQueue queue = getQueue(name, create, FSQueueType.PARENT);
+    if (queue instanceof FSLeafQueue) {
+      return null;
+    }
+    return (FSParentQueue) queue;
+  }
+  
+  private FSQueue getQueue(String name, boolean create, FSQueueType queueType) {
     name = ensureRootPrefix(name);
     synchronized (queues) {
       FSQueue queue = queues.get(name);
       if (queue == null && create) {
-        FSLeafQueue leafQueue = createLeafQueue(name);
-        if (leafQueue == null) {
-          return null;
-        }
-        queue = leafQueue;
-      } else if (queue instanceof FSParentQueue) {
-        return null;
+        // if the queue doesn't exist,create it and return
+        queue = createQueue(name, queueType);
       }
-      return (FSLeafQueue)queue;
+      return queue;
     }
   }
   
   /**
-   * Creates a leaf queue and places it in the tree. Creates any
-   * parents that don't already exist.
+   * Creates a leaf or parent queue based on what is specified in 'queueType' 
+   * and places it in the tree. Creates any parents that don't already exist.
    * 
    * @return
    *    the created queue, if successful. null if not allowed (one of the parent
    *    queues in the queue name is already a leaf queue)
    */
-  private FSLeafQueue createLeafQueue(String name) {
+  private FSQueue createQueue(String name, FSQueueType queueType) {
     List<String> newQueueNames = new ArrayList<String>();
     newQueueNames.add(name);
     int sepIndex = name.length();
@@ -143,8 +165,7 @@ public class QueueManager {
     FSLeafQueue leafQueue = null;
     for (int i = newQueueNames.size()-1; i >= 0; i--) {
       String queueName = newQueueNames.get(i);
-      if (i == 0) {
-        // First name added was the leaf queue
+      if (i == 0 && queueType != FSQueueType.PARENT) {
         leafQueue = new FSLeafQueue(name, scheduler, parent);
         try {
           leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
@@ -155,6 +176,7 @@ public class QueueManager {
         parent.addChildQueue(leafQueue);
         queues.put(leafQueue.getName(), leafQueue);
         leafQueues.add(leafQueue);
+        return leafQueue;
       } else {
         FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
         try {
@@ -169,53 +191,64 @@ public class QueueManager {
       }
     }
     
-    return leafQueue;
+    return parent;
   }
 
   /**
-   * Make way for the given leaf queue if possible, by removing incompatible
+   * Make way for the given queue if possible, by removing incompatible
    * queues with no apps in them. Incompatibility could be due to
-   * (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in
-   * the ancestry of leafToCreate.
+   * (1) queueToCreate being currently a parent but needs to change to leaf
+   * (2) queueToCreate being currently a leaf but needs to change to parent
+   * (3) an existing leaf queue in the ancestry of queueToCreate.
    * 
    * We will never remove the root queue or the default queue in this way.
    *
-   * @return true if we can create leafToCreate or it already exists.
+   * @return true if we can create queueToCreate or it already exists.
    */
-  private boolean removeEmptyIncompatibleQueues(String leafToCreate) {
-    leafToCreate = ensureRootPrefix(leafToCreate);
+  private boolean removeEmptyIncompatibleQueues(String queueToCreate,
+      FSQueueType queueType) {
+    queueToCreate = ensureRootPrefix(queueToCreate);
 
-    // Ensure leafToCreate is not root and doesn't have the default queue in its
+    // Ensure queueToCreate is not root and doesn't have the default queue in its
     // ancestry.
-    if (leafToCreate.equals(ROOT_QUEUE) ||
-        leafToCreate.startsWith(
+    if (queueToCreate.equals(ROOT_QUEUE) ||
+        queueToCreate.startsWith(
             ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
       return false;
     }
 
-    FSQueue queue = queues.get(leafToCreate);
+    FSQueue queue = queues.get(queueToCreate);
     // Queue exists already.
     if (queue != null) {
       if (queue instanceof FSLeafQueue) {
-        // If it's an already existing leaf, we're ok.
-        return true;
+        if (queueType == FSQueueType.LEAF) {
+          // if queue is already a leaf then return true
+          return true;
+        }
+        // remove incompatibility since queue is a leaf currently
+        // needs to change to a parent.
+        return removeQueueIfEmpty(queue);
       } else {
-        // If it's an existing parent queue, remove it if it's empty.
+        if (queueType == FSQueueType.PARENT) {
+          return true;
+        }
+        // If it's an existing parent queue and needs to change to leaf, 
+        // remove it if it's empty.
         return removeQueueIfEmpty(queue);
       }
     }
 
     // Queue doesn't exist already. Check if the new queue would be created
     // under an existing leaf queue. If so, try removing that leaf queue.
-    int sepIndex = leafToCreate.length();
-    sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
+    int sepIndex = queueToCreate.length();
+    sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
     while (sepIndex != -1) {
-      String prefixString = leafToCreate.substring(0, sepIndex);
+      String prefixString = queueToCreate.substring(0, sepIndex);
       FSQueue prefixQueue = queues.get(prefixString);
       if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
         return removeQueueIfEmpty(prefixQueue);
       }
-      sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
+      sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
     }
     return true;
   }
@@ -312,12 +345,21 @@ public class QueueManager {
   }
   
   public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
-    // Make sure all queues exist
-    for (String name : queueConf.getQueueNames()) {
-      if (removeEmptyIncompatibleQueues(name)) {
+    // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
+    for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) {
+      if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
         getLeafQueue(name, true);
       }
     }
+
+    // At this point all leaves and 'parents with at least one child' would have been created.
+    // Now create parents with no configured leaf.
+    for (String name : queueConf.getConfiguredQueues().get(
+        FSQueueType.PARENT)) {
+      if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
+        getParentQueue(name, true);
+      }
+    }
     
     for (FSQueue queue : queues.values()) {
       // Update queue metrics
@@ -327,7 +369,7 @@ public class QueueManager {
       // Set scheduling policies
       try {
         SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
-        policy.initialize(scheduler.getClusterCapacity());
+        policy.initialize(scheduler.getClusterResource());
         queue.setPolicy(policy);
       } catch (AllocationConfigurationException ex) {
         LOG.warn("Cannot apply configured scheduling policy to queue "

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java Tue Aug 19 23:49:39 2014
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -32,6 +34,8 @@ import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
+@Private
+@Unstable
 public class QueuePlacementPolicy {
   private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses;
   static {
@@ -42,17 +46,19 @@ public class QueuePlacementPolicy {
     map.put("secondaryGroupExistingQueue",
         QueuePlacementRule.SecondaryGroupExistingQueue.class);
     map.put("specified", QueuePlacementRule.Specified.class);
+    map.put("nestedUserQueue",
+        QueuePlacementRule.NestedUserQueue.class);
     map.put("default", QueuePlacementRule.Default.class);
     map.put("reject", QueuePlacementRule.Reject.class);
     ruleClasses = Collections.unmodifiableMap(map);
   }
   
   private final List<QueuePlacementRule> rules;
-  private final Set<String> configuredQueues;
+  private final Map<FSQueueType, Set<String>> configuredQueues;
   private final Groups groups;
   
   public QueuePlacementPolicy(List<QueuePlacementRule> rules,
-      Set<String> configuredQueues, Configuration conf)
+      Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
       throws AllocationConfigurationException {
     for (int i = 0; i < rules.size()-1; i++) {
       if (rules.get(i).isTerminal()) {
@@ -72,28 +78,15 @@ public class QueuePlacementPolicy {
   /**
    * Builds a QueuePlacementPolicy from an xml element.
    */
-  public static QueuePlacementPolicy fromXml(Element el, Set<String> configuredQueues,
-      Configuration conf) throws AllocationConfigurationException {
+  public static QueuePlacementPolicy fromXml(Element el,
+      Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+      throws AllocationConfigurationException {
     List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
     NodeList elements = el.getChildNodes();
     for (int i = 0; i < elements.getLength(); i++) {
       Node node = elements.item(i);
       if (node instanceof Element) {
-        Element element = (Element)node;
-
-        String ruleName = element.getAttribute("name");
-        if ("".equals(ruleName)) {
-          throw new AllocationConfigurationException("No name provided for a " +
-            "rule element");
-        }
-
-        Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
-        if (clazz == null) {
-          throw new AllocationConfigurationException("No rule class found for "
-              + ruleName);
-        }
-        QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
-        rule.initializeFromXml(element);
+        QueuePlacementRule rule = createAndInitializeRule(node);
         rules.add(rule);
       }
     }
@@ -101,11 +94,37 @@ public class QueuePlacementPolicy {
   }
   
   /**
+   * Create and initialize a rule given a xml node
+   * @param node
+   * @return QueuePlacementPolicy
+   * @throws AllocationConfigurationException
+   */
+  public static QueuePlacementRule createAndInitializeRule(Node node)
+      throws AllocationConfigurationException {
+    Element element = (Element) node;
+
+    String ruleName = element.getAttribute("name");
+    if ("".equals(ruleName)) {
+      throw new AllocationConfigurationException("No name provided for a "
+          + "rule element");
+    }
+
+    Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
+    if (clazz == null) {
+      throw new AllocationConfigurationException("No rule class found for "
+          + ruleName);
+    }
+    QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
+    rule.initializeFromXml(element);
+    return rule;
+  }
+    
+  /**
    * Build a simple queue placement policy from the allow-undeclared-pools and
    * user-as-default-queue configuration options.
    */
   public static QueuePlacementPolicy fromConfiguration(Configuration conf,
-      Set<String> configuredQueues) {
+      Map<FSQueueType, Set<String>> configuredQueues) {
     boolean create = conf.getBoolean(
         FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
         FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);