You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [13/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Tue Aug 19 23:49:39 2014
@@ -18,34 +18,101 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-public abstract class AbstractYarnScheduler implements ResourceScheduler {
+import com.google.common.util.concurrent.SettableFuture;
+
+
+@SuppressWarnings("unchecked")
+public abstract class AbstractYarnScheduler
+    <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
+    extends AbstractService implements ResourceScheduler {
+
+  private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
+
+  // Nodes in the cluster, indexed by NodeId
+  protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
+
+  // Whole capacity of the cluster
+  protected Resource clusterResource = Resource.newInstance(0, 0);
+
+  protected Resource minimumAllocation;
+  protected Resource maximumAllocation;
 
   protected RMContext rmContext;
-  protected Map<ApplicationId, SchedulerApplication> applications;
+  protected Map<ApplicationId, SchedulerApplication<T>> applications;
+  protected int nmExpireInterval;
+
   protected final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
   protected static final Allocation EMPTY_ALLOCATION = new Allocation(
     EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
 
+  /**
+   * Construct the service.
+   *
+   * @param name service name
+   */
+  public AbstractYarnScheduler(String name) {
+    super(name);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    nmExpireInterval =
+        conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+    createReleaseCache();
+    super.serviceInit(conf);
+  }
+
   public synchronized List<Container> getTransferredContainers(
       ApplicationAttemptId currentAttempt) {
     ApplicationId appId = currentAttempt.getApplicationId();
-    SchedulerApplication app = applications.get(appId);
+    SchedulerApplication<T> app = applications.get(appId);
     List<Container> containerList = new ArrayList<Container>();
     RMApp appImpl = this.rmContext.getRMApps().get(appId);
     if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
@@ -64,14 +131,333 @@ public abstract class AbstractYarnSchedu
     return containerList;
   }
 
-  public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() {
+  public Map<ApplicationId, SchedulerApplication<T>>
+      getSchedulerApplications() {
     return applications;
   }
-  
+
+  @Override
+  public Resource getClusterResource() {
+    return clusterResource;
+  }
+
+  @Override
+  public Resource getMinimumResourceCapability() {
+    return minimumAllocation;
+  }
+
+  @Override
+  public Resource getMaximumResourceCapability() {
+    return maximumAllocation;
+  }
+
+  protected void containerLaunchedOnNode(ContainerId containerId,
+                                         SchedulerNode node) {
+    // Get the application for the finished container
+    SchedulerApplicationAttempt application = getCurrentAttemptForContainer
+        (containerId);
+    if (application == null) {
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+      return;
+    }
+
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
+  }
+
+  public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
+    SchedulerApplication<T> app =
+        applications.get(applicationAttemptId.getApplicationId());
+    return app == null ? null : app.getCurrentAppAttempt();
+  }
+
+  @Override
+  public SchedulerAppReport getSchedulerAppInfo(
+      ApplicationAttemptId appAttemptId) {
+    SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
+    if (attempt == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
+      }
+      return null;
+    }
+    return new SchedulerAppReport(attempt);
+  }
+
+  @Override
+  public ApplicationResourceUsageReport getAppResourceUsageReport(
+      ApplicationAttemptId appAttemptId) {
+    SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
+    if (attempt == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
+      }
+      return null;
+    }
+    return attempt.getResourceUsageReport();
+  }
+
+  public T getCurrentAttemptForContainer(ContainerId containerId) {
+    return getApplicationAttempt(containerId.getApplicationAttemptId());
+  }
+
+  @Override
+  public RMContainer getRMContainer(ContainerId containerId) {
+    SchedulerApplicationAttempt attempt =
+        getCurrentAttemptForContainer(containerId);
+    return (attempt == null) ? null : attempt.getRMContainer(containerId);
+  }
+
+  @Override
+  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
+    N node = nodes.get(nodeId);
+    return node == null ? null : new SchedulerNodeReport(node);
+  }
+
   @Override
   public String moveApplication(ApplicationId appId, String newQueue)
       throws YarnException {
     throw new YarnException(getClass().getSimpleName()
         + " does not support moving apps between queues");
   }
+
+  private void killOrphanContainerOnNode(RMNode node,
+      NMContainerStatus container) {
+    if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
+      this.rmContext.getDispatcher().getEventHandler().handle(
+        new RMNodeCleanContainerEvent(node.getNodeID(),
+          container.getContainerId()));
+    }
+  }
+
+  public synchronized void recoverContainersOnNode(
+      List<NMContainerStatus> containerReports, RMNode nm) {
+    if (!rmContext.isWorkPreservingRecoveryEnabled()
+        || containerReports == null
+        || (containerReports != null && containerReports.isEmpty())) {
+      return;
+    }
+
+    for (NMContainerStatus container : containerReports) {
+      ApplicationId appId =
+          container.getContainerId().getApplicationAttemptId().getApplicationId();
+      RMApp rmApp = rmContext.getRMApps().get(appId);
+      if (rmApp == null) {
+        LOG.error("Skip recovering container " + container
+            + " for unknown application.");
+        killOrphanContainerOnNode(nm, container);
+        continue;
+      }
+
+      // Unmanaged AM recovery is addressed in YARN-1815
+      if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
+        LOG.info("Skip recovering container " + container + " for unmanaged AM."
+            + rmApp.getApplicationId());
+        killOrphanContainerOnNode(nm, container);
+        continue;
+      }
+
+      SchedulerApplication<T> schedulerApp = applications.get(appId);
+      if (schedulerApp == null) {
+        LOG.info("Skip recovering container  " + container
+            + " for unknown SchedulerApplication. Application current state is "
+            + rmApp.getState());
+        killOrphanContainerOnNode(nm, container);
+        continue;
+      }
+
+      LOG.info("Recovering container " + container);
+      SchedulerApplicationAttempt schedulerAttempt =
+          schedulerApp.getCurrentAppAttempt();
+
+      // create container
+      RMContainer rmContainer = recoverAndCreateContainer(container, nm);
+
+      // recover RMContainer
+      rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
+        container));
+
+      // recover scheduler node
+      nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
+
+      // recover queue: update headroom etc.
+      Queue queue = schedulerAttempt.getQueue();
+      queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
+
+      // recover scheduler attempt
+      schedulerAttempt.recoverContainer(rmContainer);
+            
+      // set master container for the current running AMContainer for this
+      // attempt.
+      RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
+      if (appAttempt != null) {
+        Container masterContainer = appAttempt.getMasterContainer();
+
+        // Mark current running AMContainer's RMContainer based on the master
+        // container ID stored in AppAttempt.
+        if (masterContainer != null
+            && masterContainer.getId().equals(rmContainer.getContainerId())) {
+          ((RMContainerImpl)rmContainer).setAMContainer(true);
+        }
+      }
+
+      synchronized (schedulerAttempt) {
+        Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
+        if (releases.contains(container.getContainerId())) {
+          // release the container
+          rmContainer.handle(new RMContainerFinishedEvent(container
+            .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
+            container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
+            RMContainerEventType.RELEASED));
+          releases.remove(container.getContainerId());
+          LOG.info(container.getContainerId() + " is released by application.");
+        }
+      }
+    }
+  }
+
+  private RMContainer recoverAndCreateContainer(NMContainerStatus status,
+      RMNode node) {
+    Container container =
+        Container.newInstance(status.getContainerId(), node.getNodeID(),
+          node.getHttpAddress(), status.getAllocatedResource(),
+          status.getPriority(), null);
+    ApplicationAttemptId attemptId =
+        container.getId().getApplicationAttemptId();
+    RMContainer rmContainer =
+        new RMContainerImpl(container, attemptId, node.getNodeID(),
+          applications.get(attemptId.getApplicationId()).getUser(), rmContext,
+          status.getCreationTime());
+    return rmContainer;
+  }
+
+  /**
+   * Recover resource request back from RMContainer when a container is 
+   * preempted before AM pulled the same. If container is pulled by
+   * AM, then RMContainer will not have resource request to recover.
+   * @param rmContainer
+   */
+  protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+
+    // If container state is moved to ACQUIRED, request will be empty.
+    if (requests == null) {
+      return;
+    }
+    // Add resource request back to Scheduler.
+    SchedulerApplicationAttempt schedulerAttempt 
+        = getCurrentAttemptForContainer(rmContainer.getContainerId());
+    if (schedulerAttempt != null) {
+      schedulerAttempt.recoverResourceRequests(requests);
+    }
+  }
+
+  protected void createReleaseCache() {
+    // Cleanup the cache after nm expire interval.
+    new Timer().schedule(new TimerTask() {
+      @Override
+      public void run() {
+        for (SchedulerApplication<T> app : applications.values()) {
+
+          T attempt = app.getCurrentAppAttempt();
+          synchronized (attempt) {
+            for (ContainerId containerId : attempt.getPendingRelease()) {
+              RMAuditLogger.logFailure(
+                app.getUser(),
+                AuditConstants.RELEASE_CONTAINER,
+                "Unauthorized access or invalid container",
+                "Scheduler",
+                "Trying to release container not owned by app or with invalid id.",
+                attempt.getApplicationId(), containerId);
+            }
+            attempt.getPendingRelease().clear();
+          }
+        }
+        LOG.info("Release request cache is cleaned up");
+      }
+    }, nmExpireInterval);
+  }
+
+  // clean up a completed container
+  protected abstract void completedContainer(RMContainer rmContainer,
+      ContainerStatus containerStatus, RMContainerEventType event);
+
+  protected void releaseContainers(List<ContainerId> containers,
+      SchedulerApplicationAttempt attempt) {
+    for (ContainerId containerId : containers) {
+      RMContainer rmContainer = getRMContainer(containerId);
+      if (rmContainer == null) {
+        if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+            < nmExpireInterval) {
+          LOG.info(containerId + " doesn't exist. Add the container"
+              + " to the release request cache as it maybe on recovery.");
+          synchronized (attempt) {
+            attempt.getPendingRelease().add(containerId);
+          }
+        } else {
+          RMAuditLogger.logFailure(attempt.getUser(),
+            AuditConstants.RELEASE_CONTAINER,
+            "Unauthorized access or invalid container", "Scheduler",
+            "Trying to release container not owned by app or with invalid id.",
+            attempt.getApplicationId(), containerId);
+        }
+      }
+      completedContainer(rmContainer,
+        SchedulerUtils.createAbnormalContainerStatus(containerId,
+          SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
+    }
+  }
+
+  public SchedulerNode getSchedulerNode(NodeId nodeId) {
+    return nodes.get(nodeId);
+  }
+
+  @Override
+  public synchronized void moveAllApps(String sourceQueue, String destQueue)
+      throws YarnException {
+    // check if destination queue is a valid leaf queue
+    try {
+      getQueueInfo(destQueue, false, false);
+    } catch (IOException e) {
+      LOG.warn(e);
+      throw new YarnException(e);
+    }
+    // check if source queue is a valid
+    List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
+    if (apps == null) {
+      String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
+      LOG.warn(errMsg);
+      throw new YarnException(errMsg);
+    }
+    // generate move events for each pending/running app
+    for (ApplicationAttemptId app : apps) {
+      SettableFuture<Object> future = SettableFuture.create();
+      this.rmContext
+          .getDispatcher()
+          .getEventHandler()
+          .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
+    }
+  }
+
+  @Override
+  public synchronized void killAllAppsInQueue(String queueName)
+      throws YarnException {
+    // check if queue is a valid
+    List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
+    if (apps == null) {
+      String errMsg = "The specified Queue: " + queueName + " doesn't exist";
+      LOG.warn(errMsg);
+      throw new YarnException(errMsg);
+    }
+    // generate kill events for each pending/running app
+    for (ApplicationAttemptId app : apps) {
+      this.rmContext
+          .getDispatcher()
+          .getEventHandler()
+          .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
+    }
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Tue Aug 19 23:49:39 2014
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -52,10 +54,13 @@ public class AppSchedulingInfo {
   private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
   private final ApplicationAttemptId applicationAttemptId;
   final ApplicationId applicationId;
-  private final String queueName;
+  private String queueName;
   Queue queue;
   final String user;
-  private final AtomicInteger containerIdCounter = new AtomicInteger(0);
+  // TODO making containerIdCounter long
+  private final AtomicInteger containerIdCounter;
+  private final int EPOCH_BIT_MASK = 0x3ff;
+  private final int EPOCH_BIT_SHIFT = 22;
 
   final Set<Priority> priorities = new TreeSet<Priority>(
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
@@ -68,15 +73,19 @@ public class AppSchedulingInfo {
   
   /* Allocated by scheduler */
   boolean pending = true; // for app metrics
-
+  
+ 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
-      String user, Queue queue, ActiveUsersManager activeUsersManager) {
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      int epoch) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
     this.queueName = queue.getQueueName();
     this.user = user;
     this.activeUsersManager = activeUsersManager;
+    this.containerIdCounter = new AtomicInteger(
+        (epoch & EPOCH_BIT_MASK) << EPOCH_BIT_SHIFT);
   }
 
   public ApplicationId getApplicationId() {
@@ -118,9 +127,10 @@ public class AppSchedulingInfo {
    * by the application.
    *
    * @param requests resources to be acquired
+   * @param recoverPreemptedRequest recover Resource Request on preemption
    */
   synchronized public void updateResourceRequests(
-      List<ResourceRequest> requests) {
+      List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
     QueueMetrics metrics = queue.getMetrics();
     
     // Update resource requests
@@ -154,8 +164,13 @@ public class AppSchedulingInfo {
         asks = new HashMap<String, ResourceRequest>();
         this.requests.put(priority, asks);
         this.priorities.add(priority);
-      } else if (updatePendingResources) {
-        lastRequest = asks.get(resourceName);
+      }
+      lastRequest = asks.get(resourceName);
+
+      if (recoverPreemptedRequest && lastRequest != null) {
+        // Increment the number of containers to 1, as it is recovering a
+        // single container.
+        request.setNumContainers(lastRequest.getNumContainers() + 1);
       }
 
       asks.put(resourceName, request);
@@ -245,14 +260,16 @@ public class AppSchedulingInfo {
    * @param container
    *          the containers allocated.
    */
-  synchronized public void allocate(NodeType type, SchedulerNode node,
-      Priority priority, ResourceRequest request, Container container) {
+  synchronized public List<ResourceRequest> allocate(NodeType type,
+      SchedulerNode node, Priority priority, ResourceRequest request,
+      Container container) {
+    List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
     if (type == NodeType.NODE_LOCAL) {
-      allocateNodeLocal(node, priority, request, container);
+      allocateNodeLocal(node, priority, request, container, resourceRequests);
     } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, priority, request, container);
+      allocateRackLocal(node, priority, request, container, resourceRequests);
     } else {
-      allocateOffSwitch(node, priority, request, container);
+      allocateOffSwitch(node, priority, request, container, resourceRequests);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -270,6 +287,7 @@ public class AppSchedulingInfo {
           + " resource=" + request.getCapability());
     }
     metrics.allocateResources(user, 1, request.getCapability(), true);
+    return resourceRequests;
   }
 
   /**
@@ -279,9 +297,9 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateNodeLocal( 
-      SchedulerNode node, Priority priority, 
-      ResourceRequest nodeLocalRequest, Container container) {
+  synchronized private void allocateNodeLocal(SchedulerNode node,
+      Priority priority, ResourceRequest nodeLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
@@ -295,7 +313,14 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -305,16 +330,22 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateRackLocal(
-      SchedulerNode node, Priority priority,
-      ResourceRequest rackLocalRequest, Container container) {
+  synchronized private void allocateRackLocal(SchedulerNode node,
+      Priority priority, ResourceRequest rackLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -324,11 +355,13 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateOffSwitch(
-      SchedulerNode node, Priority priority,
-      ResourceRequest offSwitchRequest, Container container) {
+  synchronized private void allocateOffSwitch(SchedulerNode node,
+      Priority priority, ResourceRequest offSwitchRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decrementOutstanding(offSwitchRequest);
+    // Update cloned OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
   synchronized private void decrementOutstanding(
@@ -377,6 +410,7 @@ public class AppSchedulingInfo {
     activeUsersManager = newQueue.getActiveUsersManager();
     activeUsersManager.activateApplication(user, applicationId);
     this.queue = newQueue;
+    this.queueName = newQueue.getQueueName();
   }
 
   synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
@@ -409,4 +443,29 @@ public class AppSchedulingInfo {
     //    this.requests = appInfo.getRequests();
     this.blacklist = appInfo.getBlackList();
   }
+
+  public synchronized void recoverContainer(RMContainer rmContainer) {
+    QueueMetrics metrics = queue.getMetrics();
+    if (pending) {
+      // If there was any container to recover, the application was
+      // running from scheduler's POV.
+      pending = false;
+      metrics.runAppAttempt(applicationId, user);
+    }
+
+    // Container is completed. Skip recovering resources.
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
+
+    metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
+      false);
+  }
+  
+  public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+    ResourceRequest newRequest = ResourceRequest.newInstance(
+        request.getPriority(), request.getResourceName(),
+        request.getCapability(), 1, request.getRelaxLocality());
+    return newRequest;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,8 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 
 @Evolving
 @LimitedPrivate("yarn")
@@ -60,4 +62,13 @@ public interface Queue {
   boolean hasAccess(QueueACL acl, UserGroupInformation user);
   
   public ActiveUsersManager getActiveUsersManager();
+
+  /**
+   * Recover the state of the queue for a given container.
+   * @param clusterResource the resource of the cluster
+   * @param schedulerAttempt the application for which the container was allocated
+   * @param rmContainer the container that was recovered.
+   */
+  public void recoverContainer(Resource clusterResource,
+      SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer);
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Tue Aug 19 23:49:39 2014
@@ -34,6 +34,15 @@ import org.apache.hadoop.yarn.server.res
 @LimitedPrivate("yarn")
 @Evolving
 public interface ResourceScheduler extends YarnScheduler, Recoverable {
+
+  /**
+   * Set RMContext for <code>ResourceScheduler</code>.
+   * This method should be called immediately after instantiating
+   * a scheduler once.
+   * @param rmContext created by ResourceManager
+   */
+  void setRMContext(RMContext rmContext);
+
   /**
    * Re-initialize the <code>ResourceScheduler</code>.
    * @param conf configuration

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Tue Aug 19 23:49:39 2014
@@ -23,11 +23,11 @@ import org.apache.hadoop.yarn.server.res
 
 @Private
 @Unstable
-public class SchedulerApplication {
+public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
 
   private Queue queue;
   private final String user;
-  private SchedulerApplicationAttempt currentAttempt;
+  private T currentAttempt;
 
   public SchedulerApplication(Queue queue, String user) {
     this.queue = queue;
@@ -46,11 +46,11 @@ public class SchedulerApplication {
     return user;
   }
 
-  public SchedulerApplicationAttempt getCurrentAppAttempt() {
+  public T getCurrentAppAttempt() {
     return currentAttempt;
   }
 
-  public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) {
+  public void setCurrentAppAttempt(T currentAttempt) {
     this.currentAttempt = currentAttempt;
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Tue Aug 19 23:49:39 2014
@@ -20,9 +20,11 @@ package org.apache.hadoop.yarn.server.re
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,6 +34,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -46,9 +49,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 
@@ -76,10 +81,20 @@ public class SchedulerApplicationAttempt
   protected final Resource currentReservation = Resource.newInstance(0, 0);
   private Resource resourceLimit = Resource.newInstance(0, 0);
   protected Resource currentConsumption = Resource.newInstance(0, 0);
+  private Resource amResource;
+  private boolean unmanagedAM = true;
+  private boolean amRunning = false;
 
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
 
+  // This pendingRelease is used in work-preserving recovery scenario to keep
+  // track of the AM's outstanding release requests. RM on recovery could
+  // receive the release request form AM before it receives the container status
+  // from NM for recovery. In this case, the to-be-recovered containers reported
+  // by NM should not be recovered.
+  private Set<ContainerId> pendingRelease = null;
+
   /**
    * Count how many times the application has been given an opportunity
    * to schedule a task at each priority. Each time the scheduler
@@ -101,11 +116,23 @@ public class SchedulerApplicationAttempt
   public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
+    Preconditions.checkNotNull("RMContext should not be null", rmContext);
     this.rmContext = rmContext;
     this.appSchedulingInfo = 
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            activeUsersManager);
+            activeUsersManager, rmContext.getEpoch());
     this.queue = queue;
+    this.pendingRelease = new HashSet<ContainerId>();
+    if (rmContext.getRMApps() != null &&
+        rmContext.getRMApps()
+            .containsKey(applicationAttemptId.getApplicationId())) {
+      ApplicationSubmissionContext appSubmissionContext =
+          rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
+              .getApplicationSubmissionContext();
+      if (appSubmissionContext != null) {
+        unmanagedAM = appSubmissionContext.getUnmanagedAM();
+      }
+    }
   }
   
   /**
@@ -144,6 +171,10 @@ public class SchedulerApplicationAttempt
     return appSchedulingInfo.getResourceRequests(priority);
   }
 
+  public Set<ContainerId> getPendingRelease() {
+    return this.pendingRelease;
+  }
+
   public int getNewContainerId() {
     return appSchedulingInfo.getNewContainerId();
   }
@@ -168,6 +199,26 @@ public class SchedulerApplicationAttempt
     return appSchedulingInfo.getQueueName();
   }
   
+  public Resource getAMResource() {
+    return amResource;
+  }
+
+  public void setAMResource(Resource amResource) {
+    this.amResource = amResource;
+  }
+
+  public boolean isAmRunning() {
+    return amRunning;
+  }
+
+  public void setAmRunning(boolean bool) {
+    amRunning = bool;
+  }
+
+  public boolean getUnmanagedAM() {
+    return unmanagedAM;
+  }
+
   public synchronized RMContainer getRMContainer(ContainerId id) {
     return liveContainers.get(id);
   }
@@ -202,7 +253,14 @@ public class SchedulerApplicationAttempt
   public synchronized void updateResourceRequests(
       List<ResourceRequest> requests) {
     if (!isStopped) {
-      appSchedulingInfo.updateResourceRequests(requests);
+      appSchedulingInfo.updateResourceRequests(requests, false);
+    }
+  }
+  
+  public synchronized void recoverResourceRequests(
+      List<ResourceRequest> requests) {
+    if (!isStopped) {
+      appSchedulingInfo.updateResourceRequests(requests, true);
     }
   }
   
@@ -378,7 +436,8 @@ public class SchedulerApplicationAttempt
         // create container token and NMToken altogether.
         container.setContainerToken(rmContext.getContainerTokenSecretManager()
           .createContainerToken(container.getId(), container.getNodeId(),
-            getUser(), container.getResource()));
+            getUser(), container.getResource(), container.getPriority(),
+            rmContainer.getCreationTime()));
         NMToken nmToken =
             rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
               getApplicationAttemptId(), container);
@@ -499,5 +558,24 @@ public class SchedulerApplicationAttempt
 
     appSchedulingInfo.move(newQueue);
     this.queue = newQueue;
-  }  
+  }
+
+  public synchronized void recoverContainer(RMContainer rmContainer) {
+    // recover app scheduling info
+    appSchedulingInfo.recoverContainer(rmContainer);
+
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
+    LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+      + " is recovering container " + rmContainer.getContainerId());
+    liveContainers.put(rmContainer.getContainerId(), rmContainer);
+    Resources.addTo(currentConsumption, rmContainer.getContainer()
+      .getResource());
+    // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
+    // is called.
+    // newlyAllocatedContainers.add(rmContainer);
+    // schedulingOpportunities
+    // lastScheduledContainer
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Tue Aug 19 23:49:39 2014
@@ -18,11 +18,26 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
 
 /**
  * Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -31,59 +46,236 @@ import org.apache.hadoop.yarn.conf.YarnC
 @Unstable
 public abstract class SchedulerNode {
 
+  private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
+
+  private Resource availableResource = Resource.newInstance(0, 0);
+  private Resource usedResource = Resource.newInstance(0, 0);
+  private Resource totalResourceCapability;
+  private RMContainer reservedContainer;
+  private volatile int numContainers;
+
+
+  /* set of containers that are allocated containers */
+  private final Map<ContainerId, RMContainer> launchedContainers =
+      new HashMap<ContainerId, RMContainer>();
+
+  private final RMNode rmNode;
+  private final String nodeName;
+
+  public SchedulerNode(RMNode node, boolean usePortForNodeName) {
+    this.rmNode = node;
+    this.availableResource = Resources.clone(node.getTotalCapability());
+    this.totalResourceCapability = Resources.clone(node.getTotalCapability());
+    if (usePortForNodeName) {
+      nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
+    } else {
+      nodeName = rmNode.getHostName();
+    }
+  }
+
+  public RMNode getRMNode() {
+    return this.rmNode;
+  }
+
+  /**
+   * Get the ID of the node which contains both its hostname and port.
+   * 
+   * @return the ID of the node
+   */
+  public NodeId getNodeID() {
+    return this.rmNode.getNodeID();
+  }
+
+  public String getHttpAddress() {
+    return this.rmNode.getHttpAddress();
+  }
+
   /**
    * Get the name of the node for scheduling matching decisions.
    * <p/>
-   * Typically this is the 'hostname' reported by the node, but it could be 
-   * configured to be 'hostname:port' reported by the node via the 
+   * Typically this is the 'hostname' reported by the node, but it could be
+   * configured to be 'hostname:port' reported by the node via the
    * {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant.
    * The main usecase of this is Yarn minicluster to be able to differentiate
    * node manager instances by their port number.
    * 
    * @return name of the node for scheduling matching decisions.
    */
-  public abstract String getNodeName();
-  
+  public String getNodeName() {
+    return nodeName;
+  }
+
   /**
    * Get rackname.
+   * 
    * @return rackname
    */
-  public abstract String getRackName();
-  
+  public String getRackName() {
+    return this.rmNode.getRackName();
+  }
+
   /**
-   * Get used resources on the node.
-   * @return used resources on the node
+   * The Scheduler has allocated containers on this node to the given
+   * application.
+   * 
+   * @param rmContainer
+   *          allocated container
    */
-  public abstract Resource getUsedResource();
+  public synchronized void allocateContainer(RMContainer rmContainer) {
+    Container container = rmContainer.getContainer();
+    deductAvailableResource(container.getResource());
+    ++numContainers;
+
+    launchedContainers.put(container.getId(), rmContainer);
+
+    LOG.info("Assigned container " + container.getId() + " of capacity "
+        + container.getResource() + " on host " + rmNode.getNodeAddress()
+        + ", which has " + numContainers + " containers, "
+        + getUsedResource() + " used and " + getAvailableResource()
+        + " available after allocation");
+  }
 
   /**
    * Get available resources on the node.
+   * 
    * @return available resources on the node
    */
-  public abstract Resource getAvailableResource();
+  public synchronized Resource getAvailableResource() {
+    return this.availableResource;
+  }
 
   /**
-   * Get number of active containers on the node.
-   * @return number of active containers on the node
-   */
-  public abstract int getNumContainers();
-  
-  /**
-   * Apply delta resource on node's available resource.
-   * @param deltaResource the delta of resource need to apply to node
+   * Get used resources on the node.
+   * 
+   * @return used resources on the node
    */
-  public abstract void applyDeltaOnAvailableResource(Resource deltaResource);
+  public synchronized Resource getUsedResource() {
+    return this.usedResource;
+  }
 
   /**
    * Get total resources on the node.
+   * 
    * @return total resources on the node.
    */
-  public abstract Resource getTotalResource();
-  
+  public Resource getTotalResource() {
+    return this.totalResourceCapability;
+  }
+
+  public synchronized boolean isValidContainer(ContainerId containerId) {
+    if (launchedContainers.containsKey(containerId)) {
+      return true;
+    }
+    return false;
+  }
+
+  private synchronized void updateResource(Container container) {
+    addAvailableResource(container.getResource());
+    --numContainers;
+  }
+
   /**
-   * Get the ID of the node which contains both its hostname and port.
-   * @return the ID of the node
+   * Release an allocated container on this node.
+   * 
+   * @param container
+   *          container to be released
+   */
+  public synchronized void releaseContainer(Container container) {
+    if (!isValidContainer(container.getId())) {
+      LOG.error("Invalid container released " + container);
+      return;
+    }
+
+    /* remove the containers from the nodemanger */
+    if (null != launchedContainers.remove(container.getId())) {
+      updateResource(container);
+    }
+
+    LOG.info("Released container " + container.getId() + " of capacity "
+        + container.getResource() + " on host " + rmNode.getNodeAddress()
+        + ", which currently has " + numContainers + " containers, "
+        + getUsedResource() + " used and " + getAvailableResource()
+        + " available" + ", release resources=" + true);
+  }
+
+  private synchronized void addAvailableResource(Resource resource) {
+    if (resource == null) {
+      LOG.error("Invalid resource addition of null resource for "
+          + rmNode.getNodeAddress());
+      return;
+    }
+    Resources.addTo(availableResource, resource);
+    Resources.subtractFrom(usedResource, resource);
+  }
+
+  private synchronized void deductAvailableResource(Resource resource) {
+    if (resource == null) {
+      LOG.error("Invalid deduction of null resource for "
+          + rmNode.getNodeAddress());
+      return;
+    }
+    Resources.subtractFrom(availableResource, resource);
+    Resources.addTo(usedResource, resource);
+  }
+
+  /**
+   * Reserve container for the attempt on this node.
+   */
+  public abstract void reserveResource(SchedulerApplicationAttempt attempt,
+      Priority priority, RMContainer container);
+
+  /**
+   * Unreserve resources on this node.
    */
-  public abstract NodeId getNodeID();
+  public abstract void unreserveResource(SchedulerApplicationAttempt attempt);
+
+  @Override
+  public String toString() {
+    return "host: " + rmNode.getNodeAddress() + " #containers="
+        + getNumContainers() + " available="
+        + getAvailableResource().getMemory() + " used="
+        + getUsedResource().getMemory();
+  }
+
+  /**
+   * Get number of active containers on the node.
+   * 
+   * @return number of active containers on the node
+   */
+  public int getNumContainers() {
+    return numContainers;
+  }
+
+  public synchronized List<RMContainer> getRunningContainers() {
+    return new ArrayList<RMContainer>(launchedContainers.values());
+  }
+
+  public synchronized RMContainer getReservedContainer() {
+    return reservedContainer;
+  }
+
+  protected synchronized void
+      setReservedContainer(RMContainer reservedContainer) {
+    this.reservedContainer = reservedContainer;
+  }
+
+  /**
+   * Apply delta resource on node's available resource.
+   * 
+   * @param deltaResource
+   *          the delta of resource need to apply to node
+   */
+  public synchronized void
+      applyDeltaOnAvailableResource(Resource deltaResource) {
+    // we can only adjust available resource if total resource is changed.
+    Resources.addTo(this.availableResource, deltaResource);
+  }
+
 
+  public synchronized void recoverContainer(RMContainer rmContainer) {
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
+    allocateContainer(rmContainer);
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Tue Aug 19 23:49:39 2014
@@ -153,14 +153,17 @@ public class SchedulerUtils {
    * @param rmNode RMNode with new resource view
    * @param clusterResource the cluster's resource that need to update
    * @param log Scheduler's log for resource change
+   * @return true if the resources have changed
    */
-  public static void updateResourceIfChanged(SchedulerNode node, 
+  public static boolean updateResourceIfChanged(SchedulerNode node,
       RMNode rmNode, Resource clusterResource, Log log) {
+    boolean result = false;
     Resource oldAvailableResource = node.getAvailableResource();
     Resource newAvailableResource = Resources.subtract(
         rmNode.getTotalCapability(), node.getUsedResource());
     
     if (!newAvailableResource.equals(oldAvailableResource)) {
+      result = true;
       Resource deltaResource = Resources.subtract(newAvailableResource,
           oldAvailableResource);
       // Reflect resource change to scheduler node.
@@ -176,6 +179,8 @@ public class SchedulerUtils {
           + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
           + deltaResource.getMemory() +"MB");
     }
+
+    return result;
   }
 
   /**

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Tue Aug 19 23:49:39 2014
@@ -69,7 +69,15 @@ public interface YarnScheduler extends E
   @Public
   @Stable
   public List<QueueUserACLInfo> getQueueUserAclInfo();
-  
+
+  /**
+   * Get the whole resource capacity of the cluster.
+   * @return the whole resource capacity of the cluster.
+   */
+  @LimitedPrivate("yarn")
+  @Unstable
+  public Resource getClusterResource();
+
   /**
    * Get minimum allocatable {@link Resource}.
    * @return minimum allocatable resource
@@ -182,7 +190,7 @@ public interface YarnScheduler extends E
   @LimitedPrivate("yarn")
   @Unstable
   public RMContainer getRMContainer(ContainerId containerId);
-  
+
   /**
    * Moves the given application to the given queue
    * @param appId
@@ -194,4 +202,22 @@ public interface YarnScheduler extends E
   @Evolving
   public String moveApplication(ApplicationId appId, String newQueue)
       throws YarnException;
+
+  /**
+   * Completely drain sourceQueue of applications, by moving all of them to
+   * destQueue.
+   *
+   * @param sourceQueue
+   * @param destQueue
+   * @throws YarnException
+   */
+  void moveAllApps(String sourceQueue, String destQueue) throws YarnException;
+
+  /**
+   * Terminate all applications in the specified queue.
+   *
+   * @param queueName the name of queue to be drained
+   * @throws YarnException
+   */
+  void killAllAppsInQueue(String queueName) throws YarnException;
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Tue Aug 19 23:49:39 2014
@@ -28,7 +28,6 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
@@ -235,17 +234,26 @@ extends org.apache.hadoop.yarn.server.re
   public ActiveUsersManager getActiveUsersManager();
   
   /**
-   * Recover the state of the queue
-   * @param clusterResource the resource of the cluster
-   * @param application the application for which the container was allocated
-   * @param container the container that was recovered.
-   */
-  public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, 
-      Container container);
-  
-  /**
    * Adds all applications in the queue and its subqueues to the given collection.
    * @param apps the collection to add the applications to
    */
   public void collectSchedulerApplications(Collection<ApplicationAttemptId> apps);
+
+  /**
+  * Detach a container from this queue
+  * @param clusterResource the current cluster resource
+  * @param application application to which the container was assigned
+  * @param container the container to detach
+  */
+  public void detachContainer(Resource clusterResource,
+               FiCaSchedulerApp application, RMContainer container);
+
+  /**
+   * Attach a container to this queue
+   * @param clusterResource the current cluster resource
+   * @param application application to which the container was assigned
+   * @param container the container to attach
+   */
+  public void attachContainer(Resource clusterResource,
+               FiCaSchedulerApp application, RMContainer container);
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,9 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -24,6 +27,8 @@ import org.apache.hadoop.yarn.util.resou
 
 class CSQueueUtils {
   
+  private static final Log LOG = LogFactory.getLog(CSQueueUtils.class);
+
   final static float EPSILON = 0.0001f;
   
   public static void checkMaxCapacity(String queueName, 
@@ -113,4 +118,52 @@ class CSQueueUtils {
             )
         );
    }
+
+   public static float getAbsoluteMaxAvailCapacity(
+      ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) {
+      CSQueue parent = queue.getParent();
+      if (parent == null) {
+        return queue.getAbsoluteMaximumCapacity();
+      }
+
+      //Get my parent's max avail, needed to determine my own
+      float parentMaxAvail = getAbsoluteMaxAvailCapacity(
+        resourceCalculator, clusterResource, parent);
+      //...and as a resource
+      Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail);
+
+      //check for no resources parent before dividing, if so, max avail is none
+      if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) {
+        return 0.0f;
+      }
+      //sibling used is parent used - my used...
+      float siblingUsedCapacity = Resources.ratio(
+                 resourceCalculator,
+                 Resources.subtract(parent.getUsedResources(), queue.getUsedResources()),
+                 parentResource);
+      //my max avail is the lesser of my max capacity and what is unused from my parent
+      //by my siblings (if they are beyond their base capacity)
+      float maxAvail = Math.min(
+        queue.getMaximumCapacity(),
+        1.0f - siblingUsedCapacity);
+      //and, mutiply by parent to get absolute (cluster relative) value
+      float absoluteMaxAvail = maxAvail * parentMaxAvail;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("qpath " + queue.getQueuePath());
+        LOG.debug("parentMaxAvail " + parentMaxAvail);
+        LOG.debug("siblingUsedCapacity " + siblingUsedCapacity);
+        LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity());
+        LOG.debug("maxAvail " + maxAvail);
+        LOG.debug("absoluteMaxAvail " + absoluteMaxAvail);
+      }
+
+      if (absoluteMaxAvail < 0.0f) {
+        absoluteMaxAvail = 0.0f;
+      } else if (absoluteMaxAvail > 1.0f) {
+        absoluteMaxAvail = 1.0f;
+      }
+
+      return absoluteMaxAvail;
+   }
 }