You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC

svn commit: r1082677 [34/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,339 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+import javax.xml.crypto.NodeSetData;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
+import org.apache.hadoop.yarn.server.YarnServerConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.HeartbeatResponse;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.NodeStatus;
+import org.apache.hadoop.yarn.RegistrationResponse;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceTracker;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
+
+/**
+ * This class is responsible for the interaction with the NodeManagers.
+ * All the interactions with the NodeManagers happens via this interface.
+ *`
+ */
+public class RMResourceTrackerImpl extends AbstractService implements 
+ResourceTracker, RMResourceTracker, ResourceContext {
+  private static final Log LOG = LogFactory.getLog(RMResourceTrackerImpl.class);
+  /* we dont garbage collect on nodes. A node can come back up again and re register,
+   * so no use garbage collecting. Though admin can break the RM by bouncing 
+   * nodemanagers on different ports again and again.
+   */
+  private Map<String, NodeID> nodes = new ConcurrentHashMap<String, NodeID>();
+  private final Map<NodeID, NodeInfoTracker> nodeManagers = 
+    new ConcurrentHashMap<NodeID, NodeInfoTracker>();
+  private final HeartBeatThread heartbeatThread;
+  private final TreeSet<NodeStatus> nmExpiryQueue =
+      new TreeSet<NodeStatus>(
+          new Comparator<NodeStatus>() {
+            public int compare(NodeStatus p1, NodeStatus p2) {
+              if (p1.lastSeen < p2.lastSeen) {
+                return -1;
+              } else if (p1.lastSeen > p2.lastSeen) {
+                return 1;
+              } else {
+                return (p1.nodeId.id -
+                    p2.nodeId.id);
+              }
+            }
+          }
+      );
+  
+  private ResourceListener resourceListener;
+  private InetSocketAddress resourceTrackerAddress;
+  private Server server;
+  private final ContainerTokenSecretManager containerTokenSecretManager;
+  private final AtomicInteger nodeCounter = new AtomicInteger(0);
+  private static final HeartbeatResponse reboot = new HeartbeatResponse();
+  private long nmExpiryInterval;
+
+  public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager) {
+    super(RMResourceTrackerImpl.class.getName());
+    reboot.reboot = true;
+    this.containerTokenSecretManager = containerTokenSecretManager;
+    this.heartbeatThread = new HeartBeatThread();
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    String resourceTrackerBindAddress =
+      conf.get(YarnServerConfig.RESOURCETRACKER_ADDRESS,
+          YarnServerConfig.DEFAULT_RESOURCETRACKER_BIND_ADDRESS);
+    resourceTrackerAddress = NetUtils.createSocketAddr(resourceTrackerBindAddress);
+    this.nmExpiryInterval =  conf.getLong(YarnConfiguration.NM_EXPIRY_INTERVAL, 
+        YarnConfiguration.DEFAULT_NM_EXPIRY_INTERVAL);
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    // ResourceTrackerServer authenticates NodeManager via Kerberos if
+    // security is enabled, so no secretManager.
+    YarnRPC rpc = YarnRPC.create(getConfig());
+    Configuration rtServerConf = new Configuration(getConfig());
+    rtServerConf.setClass(
+        CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        RMNMSecurityInfoClass.class, SecurityInfo.class);
+    this.server =
+      rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
+          rtServerConf, null);
+    this.server.start();
+    this.heartbeatThread.start();
+    LOG.info("Expiry interval of NodeManagers set to " + nmExpiryInterval);
+    super.start();
+  }
+
+  /**
+   * resolving the network topology.
+   * @param hostName the hostname of this node.
+   * @return the resolved {@link Node} for this nodemanager.
+   */
+  public static Node resolve(String hostName) {
+    return new NodeBase(hostName, NetworkTopology.DEFAULT_RACK);
+  }
+
+  @Override
+  public RegistrationResponse registerNodeManager(CharSequence node,
+      Resource capability) throws AvroRemoteException {
+    NodeID nodeId = getNodeId(node);
+    NodeInfoTracker nTracker = null;
+    
+    synchronized(nodeManagers) {
+      if (!nodeManagers.containsKey(nodeId)) {
+        /* we do the resolving here, so that scheduler does not have to do it */
+        NodeInfo nodeManager = resourceListener.addNode(nodeId, node.toString(),
+            resolve(node.toString()),
+            capability);
+        HeartbeatResponse response = new HeartbeatResponse();
+        response.responseId = 0;
+        nTracker = new NodeInfoTracker(nodeManager, response);
+        nodeManagers.put(nodeId, nTracker);
+      } else {
+        nTracker = nodeManagers.get(nodeId);
+        NodeStatus status = nTracker.getNodeStatus();
+        status.lastSeen = System.currentTimeMillis();
+        nTracker.updateNodeStatus(status);
+      }
+    }
+    addForTracking(nTracker.getNodeStatus());
+    LOG.info("NodeManager from node " + node + " registered with capability: " + 
+        capability.memory + ", assigned nodeId " + nodeId.id);
+
+    RegistrationResponse regResponse = new RegistrationResponse();
+    regResponse.nodeID = nodeId;
+    SecretKey secretKey =
+      this.containerTokenSecretManager.createAndGetSecretKey(node);
+    regResponse.secretKey = ByteBuffer.wrap(secretKey.getEncoded());
+    return regResponse;
+  }
+
+  @Override
+  public HeartbeatResponse nodeHeartbeat(NodeStatus nodeStatus)
+  throws AvroRemoteException {
+    nodeStatus.lastSeen = System.currentTimeMillis();
+    NodeInfoTracker nTracker = null;
+    synchronized(nodeManagers) {
+      nTracker = nodeManagers.get(nodeStatus.nodeId);
+    }
+    if (nTracker == null) {
+      /* node does not exist */
+      LOG.info("Node not found rebooting " + nodeStatus.nodeId);
+      return reboot;
+    }
+
+    NodeInfo nodeInfo = nTracker.getNodeInfo();
+    /* check to see if its an old heartbeat */    
+    if (nodeStatus.responseId + 1 == nTracker.getLastHeartBeatResponse().responseId) {
+      return nTracker.getLastHeartBeatResponse();
+    } else if (nodeStatus.responseId + 1 < nTracker.getLastHeartBeatResponse().responseId) {
+      LOG.info("Too far behind rm response id:" + 
+          nTracker.lastHeartBeatResponse.responseId + " nm response id:" + nodeStatus.responseId);
+      return reboot;
+    }
+
+    /* inform any listeners of node heartbeats */
+    NodeResponse nodeResponse = resourceListener.nodeUpdate(
+        nodeInfo, nodeStatus.containers);
+
+    
+    HeartbeatResponse response = new HeartbeatResponse();
+    response.containersToCleanup = nodeResponse.getContainersToCleanUp();
+
+    
+    response.appplicationsToCleanup = nodeResponse.getFinishedApplications();
+    response.responseId = nTracker.getLastHeartBeatResponse().responseId + 1;
+
+    nTracker.refreshHeartBeatResponse(response);
+    nTracker.updateNodeStatus(nodeStatus);
+    return response;
+  }
+
+  @Private
+  public synchronized NodeInfo getNodeManager(NodeID nodeId) {
+    NodeInfoTracker ntracker = nodeManagers.get(nodeId);
+    return (ntracker == null ? null: ntracker.getNodeInfo());
+  }
+
+  private synchronized NodeID getNodeId(CharSequence node) {
+    NodeID nodeId;
+    nodeId = nodes.get(node);
+    if (nodeId == null) {
+      nodeId = new NodeID();
+      nodeId.id = nodeCounter.getAndIncrement();
+      nodes.put(node.toString(), nodeId);
+    }
+    return nodeId;
+  }
+
+  @Override
+  public synchronized YarnClusterMetrics getClusterMetrics() {
+    YarnClusterMetrics ymetrics = new YarnClusterMetrics();
+    ymetrics.numNodeManagers = nodeManagers.size();
+    return ymetrics;
+  }
+
+  @Override
+  public void stop() {
+    if (this.server != null) {
+      this.server.close();
+    }
+    super.stop();
+  }
+
+  @Override
+  public synchronized void register(ResourceListener listener) {
+    //for now there is only one resource listener, so we dont
+    //really add it to a list.
+    this.resourceListener = listener;
+  }
+
+  @Override
+  public synchronized void unregister(ResourceListener listener) {
+    //TODO make the listener so that it dumps to a void listener
+    //rather than nullifying it.
+  }
+
+  @Override
+  public List<NodeInfo> getAllNodeInfo() {
+    List<NodeInfo> infoList = new ArrayList<NodeInfo>();
+    synchronized (nodeManagers) {
+      for (NodeInfoTracker t : nodeManagers.values()) {
+        infoList.add(t.getNodeInfo());
+      }
+    }
+    return infoList;
+  }
+
+  protected void addForTracking(NodeStatus status) {
+    synchronized(nmExpiryQueue) {
+      nmExpiryQueue.add(status);
+    }
+  }
+  protected void expireNMs(List<NodeID> nodes) {
+    /* for now do nothing */
+  }
+
+  /*
+   * This class runs continuosly to track the nodemanagers
+   * that might be dead.
+   */
+  private class HeartBeatThread extends Thread {
+    private volatile boolean stop = false;
+
+    public HeartBeatThread() {
+      super("RMResourceTrackerImpl:" + HeartBeatThread.class.getName());
+    }
+
+    @Override
+    public void run() {
+      /* the expiry queue does not need to be in sync with nodeManagers,
+       * if a nodemanager in the expiry queue cannot be found in nodemanagers
+       * its alright. We do not want to hold a hold on nodeManagers while going
+       * through the expiry queue.
+       */
+      
+      List<NodeID> expired = new ArrayList<NodeID>();
+      LOG.info("Starting expiring thread with interval " + nmExpiryInterval);
+      
+      while (!stop) {
+        NodeStatus leastRecent;
+        long now = System.currentTimeMillis();
+        expired.clear();
+        synchronized(nmExpiryQueue) {
+          while ((nmExpiryQueue.size() > 0) &&
+              (leastRecent = nmExpiryQueue.first()) != null &&
+              ((now - leastRecent.lastSeen) > 
+              nmExpiryInterval)) {
+            nmExpiryQueue.remove(leastRecent);
+            NodeInfoTracker info;
+            synchronized(nodeManagers) {
+              info = nodeManagers.get(leastRecent.nodeId);
+            }
+            if (info == null) {
+              continue;
+            }
+            NodeStatus status = info.getNodeStatus();
+            if ((now - status.lastSeen) > nmExpiryInterval) {
+              expired.add(status.nodeId);
+            } else {
+              nmExpiryQueue.add(status);
+            }
+          }
+        }
+        expireNMs(expired);
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,32 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
+
+import java.util.List;
+
+/**
+ * The read-only interface for cluster resource
+ */
+public interface ResourceContext {
+  /**
+   * Get all node info
+   * @return a list of node info
+   */
+  List<NodeInfo> getAllNodeInfo();
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,315 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+/**
+ * This class keeps track of all the consumption of an application.
+ * This also keeps track of current running/completed
+ *  containers for the application.
+ */
+@LimitedPrivate("yarn")
+@Evolving
+public class Application {
+  private static final Log LOG = LogFactory.getLog(Application.class);
+
+  private AtomicInteger containerCtr = new AtomicInteger(0);
+
+  final ApplicationID applicationId;
+  final Queue queue;
+  final String user;
+
+  final Set<Priority> priorities = 
+    new TreeSet<Priority>(
+        new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
+  final Map<Priority, Map<String, ResourceRequest>> requests = 
+    new HashMap<Priority, Map<String, ResourceRequest>>();
+  final Resource currentConsumption = new Resource();
+  final Resource overallConsumption = new Resource();
+
+  /* Current consumption */
+  List<Container> acquired = new ArrayList<Container>();
+  List<Container> completedContainers = new ArrayList<Container>();
+  /* Allocated by scheduler */
+  List<Container> allocated = new ArrayList<Container>(); 
+  Set<NodeInfo> applicationOnNodes = new HashSet<NodeInfo>();
+  
+  public Application(ApplicationID applicationId, Queue queue, String user) {
+    this.applicationId = applicationId;
+    this.queue = queue;
+    this.user = user; 
+  }
+
+  public ApplicationID getApplicationId() {
+    return applicationId;
+  }
+
+  public Queue getQueue() {
+    return queue;
+  }
+  
+  public String getUser() {
+    return user;
+  }
+
+  public synchronized Map<Priority, Map<String, ResourceRequest>> getRequests() {
+    return requests;
+  }
+
+  public int getNewContainerId() {
+    return containerCtr.incrementAndGet();
+  }
+
+  /**
+   * the currently acquired/allocated containers  by the application masters.
+   * @return the current containers being used by the application masters.
+   */
+  public synchronized List<Container> getCurrentContainers() {
+    List<Container> currentContainers =  new ArrayList<Container>(acquired);
+    currentContainers.addAll(allocated);
+    return currentContainers;
+  }
+
+  /**
+   * The ApplicationMaster is acquiring the allocated/completed resources.
+   * @return allocated resources
+   */
+  synchronized public List<Container> acquire() {
+    // Return allocated containers
+    acquired.addAll(allocated);
+    List<Container> heartbeatContainers = allocated;
+    allocated = new ArrayList<Container>();
+
+    // Metrics
+    for (Container container : heartbeatContainers) {
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+          overallConsumption, container.resource);
+    }
+
+    LOG.debug("acquire:" +
+        " application=" + applicationId + 
+        " #acquired=" + heartbeatContainers.size());
+    heartbeatContainers =  (heartbeatContainers == null) ? 
+        new ArrayList<Container>() : heartbeatContainers;
+
+        heartbeatContainers.addAll(completedContainers);
+        completedContainers.clear();
+        return heartbeatContainers;
+  }
+
+  /**
+   * The ApplicationMaster is updating resource requirements for the 
+   * application, by asking for more resources and releasing resources 
+   * acquired by the application.
+   * @param requests resources to be acquired
+   * @param release resources being released
+   */
+  synchronized public void updateResourceRequests(List<ResourceRequest> requests) {
+    // Update resource requests
+    for (ResourceRequest request : requests) {
+      Priority priority = request.priority;
+      String hostName = request.hostName.toString();
+
+      Map<String, ResourceRequest> asks = this.requests.get(priority);
+
+      if (asks == null) {
+        asks = new HashMap<String, ResourceRequest>();
+        this.requests.put(priority, asks);
+        this.priorities.add(priority);
+      }
+
+      asks.put(hostName, request);
+
+      if (hostName.equals(NodeManager.ANY)) {
+        LOG.debug("update:" +
+            " application=" + applicationId + 
+            " request=" + request);
+      }
+    }
+  }
+
+  public synchronized void releaseContainers(List<Container> release) {
+    // Release containers and update consumption 
+    for (Container container : release) {
+      LOG.debug("update: " +
+          "application=" + applicationId + " released=" + container);
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+          currentConsumption, container.resource);
+      for (Iterator<Container> i=acquired.iterator(); i.hasNext();) {
+        Container c = i.next();
+        if (c.id.equals(container.id)) {
+          i.remove();
+          LOG.info("Removed acquired container: " + container.id);
+        }
+      }
+    }
+  }
+
+  synchronized public Collection<Priority> getPriorities() {
+    return priorities;
+  }
+
+  synchronized public Map<String, ResourceRequest> 
+  getResourceRequests(Priority priority) {
+    return requests.get(priority);
+  }
+
+  synchronized public ResourceRequest getResourceRequest(Priority priority, 
+      String node) {
+    Map<String, ResourceRequest> nodeRequests = requests.get(priority);
+    return (nodeRequests == null) ? null : nodeRequests.get(node);
+  }
+
+  synchronized public void completedContainer(Container container) {
+    LOG.info("Completed container: " + container);
+    completedContainers.add(container);
+  }
+
+  synchronized public void completedContainers(List<Container> containers) {
+    completedContainers.addAll(containers);
+  }
+
+  /**
+   * Resources have been allocated to this application by the resource scheduler.
+   * Track them.
+   * @param type the type of the node
+   * @param node the nodeinfo of the node
+   * @param priority the priority of the request.
+   * @param request the request
+   * @param containers the containers allocated.
+   */
+  synchronized public void allocate(NodeType type, NodeInfo node, 
+      Priority priority, ResourceRequest request, List<Container> containers) {
+    applicationOnNodes.add(node);
+    if (type == NodeType.DATA_LOCAL) {
+      allocateNodeLocal(node, priority, request, containers);
+    } else if (type == NodeType.RACK_LOCAL) {
+      allocateRackLocal(node, priority, request, containers);
+    } else {
+      allocateOffSwitch(node, priority, request, containers);
+    }
+  }
+
+  /**
+   * The {@link ResourceScheduler} is allocating data-local resources 
+   * to the application.
+   * @param allocatedContainers resources allocated to the application
+   */
+  synchronized private void allocateNodeLocal(NodeInfo node, 
+      Priority priority, ResourceRequest nodeLocalRequest, 
+      List<Container> containers) {
+    // Update consumption and track allocations
+    allocate(containers);
+
+    // Update future requirements
+    nodeLocalRequest.numContainers -= containers.size();
+    ResourceRequest rackLocalRequest = 
+      requests.get(priority).get(node.getRackName());
+    rackLocalRequest.numContainers -= containers.size();
+    ResourceRequest offSwitchRequest = 
+      requests.get(priority).get(NodeManager.ANY);
+    offSwitchRequest.numContainers -= containers.size();
+  }
+
+  /**
+   * The {@link ResourceScheduler} is allocating data-local resources 
+   * to the application.
+   * @param allocatedContainers resources allocated to the application
+   */
+  synchronized private void allocateRackLocal(NodeInfo node, 
+      Priority priority, ResourceRequest rackLocalRequest, 
+      List<Container> containers) {
+
+    // Update consumption and track allocations
+    allocate(containers);
+
+    // Update future requirements
+    rackLocalRequest.numContainers -= containers.size();
+    ResourceRequest offSwitchRequest = 
+      requests.get(priority).get(NodeManager.ANY);
+    offSwitchRequest.numContainers -= containers.size();
+  } 
+
+  /**
+   * The {@link ResourceScheduler} is allocating data-local resources 
+   * to the application.
+   * @param allocatedContainers resources allocated to the application
+   */
+  synchronized private void allocateOffSwitch(NodeInfo node, 
+      Priority priority, ResourceRequest offSwitchRequest, 
+      List<Container> containers) {
+
+    // Update consumption and track allocations
+    allocate(containers);
+
+    // Update future requirements
+    offSwitchRequest.numContainers -= containers.size();
+  }
+
+  synchronized private void allocate(List<Container> containers) {
+    // Update consumption and track allocations
+    for (Container container : containers) {
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+          currentConsumption, container.resource);
+
+      allocated.add(container);
+
+      LOG.debug("allocate: applicationId=" + applicationId + 
+          " container=" + container.id + " host=" + container.hostName);
+    }
+  }
+
+  synchronized public void showRequests() {
+    for (Priority priority : getPriorities()) {
+      Map<String, ResourceRequest> requests = getResourceRequests(priority);
+      if (requests != null) {
+        for (ResourceRequest request : requests.values()) {
+          LOG.debug("showRequests:" +
+              " application=" + applicationId + 
+              " request=" + request);
+        }
+      }
+    }
+  }
+  
+  synchronized public List<NodeInfo> getAllNodesForApplication() {
+    return new ArrayList<NodeInfo>(applicationOnNodes);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTracker.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTracker.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTracker.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,132 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * This interface defines tracking for all the node managers for the scheduler 
+ * and all the containers running/allocated 
+ * on the nodemanagers.
+ */
+@Evolving
+@Private
+public interface ClusterTracker {
+
+  /**
+   * The class that encapsulates response from clusterinfo for 
+   * updates from the node managers.
+   */
+  public class NodeResponse {
+    private final List<Container> completed;
+    private final List<Container> toCleanUp;
+    private final List<ApplicationID> finishedApplications;
+    
+    public NodeResponse(List<ApplicationID> finishedApplications,
+        List<Container> completed, List<Container> toKill) {
+      this.finishedApplications = finishedApplications;
+      this.completed = completed;
+      this.toCleanUp = toKill;
+    }
+    public List<ApplicationID> getFinishedApplications() {
+      return this.finishedApplications;
+    }
+    public List<Container> getCompletedContainers() {
+      return this.completed;
+    }
+    public List<Container> getContainersToCleanUp() {
+      return this.toCleanUp;
+    }
+  }
+  
+  /** 
+   * the current cluster resource.
+   * @return the current cluster resource.
+   */
+  public Resource getClusterResource();
+  
+  /**
+   * Remove the node from this cluster.
+   * @param nodeInfo the nodemanager to be removed from tracking.
+   */
+  public void removeNode(NodeInfo nodeInfo);
+  
+
+  /**
+   * Add a node for tracking
+   * @param nodeId the nodeid of the node
+   * @param hostName the hostname of the node
+   * @param node the node info of the node
+   * @param capability the total capability of the node.
+   * @return the {@link NodeInfo} that tracks this node.
+   */
+  public NodeInfo addNode(NodeID nodeId, String hostName, Node node,
+      Resource capability);
+  
+  /**
+   * An application has released a container
+   * @param applicationId the application that is releasing the container
+   * @param container the container that is being released 
+   */
+  public boolean releaseContainer(ApplicationID applicationId, 
+      Container container);
+  
+  /**
+   * Update the cluster with the node update.
+   * @param nodeInfo the node for which the update is
+   * @param containers the containers update for the node.
+   * @return the containers that are completed or need to be prempted.
+   */
+  public NodeResponse nodeUpdate(NodeInfo nodeInfo, 
+      Map<CharSequence, List<Container>> containers);
+
+  /**
+   * check to see if this node is being tracked for resources and allocations.
+   * @param node the node to check for.
+   * @return true if this node is being tracked, false else.
+   */
+  public boolean isTracked(NodeInfo node);
+  
+  /**
+   * Update the cluster with allocated containers on a node.
+   * @param nodeInfo the nodeinfo for the node that containers are allocated on.
+   * @param applicationId the application id of the application that containers
+   * are allocated to
+   * @param containers the list of containers allocated.
+   */
+  public void addAllocatedContainers(NodeInfo nodeInfo, ApplicationID applicationId,
+      List<Container> containers);
+  
+  /**
+   * Notify each of the node data structures that the application has finished.
+   * @param applicationId the application id of the application that finished.
+   * @param nodes the list of nodes that need to be notified of application completion.
+   */
+  public void finishedApplication(ApplicationID applicationId, List<NodeInfo> nodes);
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTrackerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTrackerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTrackerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+
+@Evolving
+@Private
+public class ClusterTrackerImpl extends AbstractService implements ClusterTracker {
+  
+  public ClusterTrackerImpl() {
+    super("ClusterTrackerImpl");
+  }
+
+  private static final Log LOG = LogFactory.getLog(ClusterTrackerImpl.class);
+  private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
+  private Resource clusterResource = new Resource();
+  private Configuration conf;
+  
+  public void init(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public synchronized Resource getClusterResource() {
+    return clusterResource;
+  }
+
+  @Override
+  public synchronized void removeNode(NodeInfo nodeInfo) {
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+        clusterResource, nodeInfo.getTotalCapability());
+    nodes.remove(nodeInfo.getHostName());
+  }
+  
+  @Override
+  public synchronized boolean isTracked(NodeInfo nodeInfo) {
+    NodeManager node = nodes.get(nodeInfo.getHostName());
+    return (node == null? false: true);
+  }
+ 
+  @Override
+  public synchronized NodeInfo addNode(NodeID nodeId, 
+      String hostName, Node node, Resource capability) {
+    NodeManager nodeManager = new NodeManager(nodeId, hostName, node, capability);
+    nodes.put(nodeManager.getHostName(), nodeManager);
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        clusterResource, nodeManager.getTotalCapability());
+    return nodeManager;
+  }
+
+  @Override
+  public synchronized boolean releaseContainer(ApplicationID applicationId, 
+      Container container) {
+    // Reap containers
+    LOG.info("Application " + applicationId + " released container " + container);
+    NodeManager nodeManager = nodes.get(container.hostName.toString());
+    return nodeManager.releaseContainer(container);
+  }
+  
+  @Override
+  public synchronized NodeResponse nodeUpdate(NodeInfo nodeInfo, 
+      Map<CharSequence,List<Container>> containers) {
+    NodeManager node = nodes.get(nodeInfo.getHostName());
+    LOG.debug("nodeUpdate: node=" + nodeInfo.getHostName() + 
+        " available=" + nodeInfo.getAvailableResource().memory);
+    return node.statusUpdate(containers);
+    
+  }
+
+  @Override
+  public synchronized void addAllocatedContainers(NodeInfo nodeInfo, 
+      ApplicationID applicationId, List<Container> containers) {
+    NodeManager node = nodes.get(nodeInfo.getHostName());
+    node.allocateContainer(applicationId, containers);
+  }
+
+  @Override
+  public synchronized void finishedApplication(ApplicationID applicationId,
+      List<NodeInfo> nodesToNotify) {
+    for (NodeInfo node: nodesToNotify) {
+      NodeManager nodeManager = nodes.get(node.getHostName());
+      nodeManager.notifyFinishedApplication(applicationId);
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,318 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * This class is used by ClusterInfo to keep track of all the applications/containers
+ * running on a node.
+ *
+ */
+@Private
+@Unstable
+public class NodeManager implements NodeInfo {
+  private static final Log LOG = LogFactory.getLog(NodeManager.class);
+  private final NodeID nodeId;
+  private final String hostName;
+  private Resource totalCapability;
+  private Resource availableResource = new Resource();
+  private Resource usedResource = new Resource();
+  private final Node node;
+  
+  private static final Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
+  private static final List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
+  private static final ApplicationID[] EMPTY_APPLICATION_ARRAY = new ApplicationID[]{};
+  private static final List<ApplicationID> EMPTY_APPLICATION_LIST = Arrays.asList(EMPTY_APPLICATION_ARRAY);
+  
+  public static final String ANY = "*";  
+  /* set of containers that are allocated containers */
+  private final Map<ContainerID, Container> allocatedContainers = 
+    new TreeMap<ContainerID, Container>();
+    
+  /* set of containers that are currently active on a node manager */
+  private final Map<ContainerID, Container> activeContainers =
+    new TreeMap<ContainerID, Container>();
+  
+  /* set of containers that need to be cleaned */
+  private final Set<Container> containersToClean = 
+    new TreeSet<Container>(new org.apache.hadoop.yarn.server.resourcemanager.resource.Container.Comparator());
+
+  
+  /* the list of applications that have finished and need to be purged */
+  private final List<ApplicationID> finishedApplications = new ArrayList<ApplicationID>();
+  
+  private volatile int numContainers;
+  
+  public NodeManager(NodeID nodeId, String hostname, 
+      Node node, Resource capability) {
+    this.nodeId = nodeId;   
+    this.totalCapability = capability; 
+    this.hostName = hostname;
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        availableResource, capability);
+    this.node = node;
+  }
+
+  /**
+   * NodeInfo for this node.
+   * @return the {@link NodeInfo} for this node.
+   */
+  public NodeInfo getNodeInfo() {
+    return this;
+  }
+  
+  /**
+   * The Scheduler has allocated containers on this node to the 
+   * given application.
+   * 
+   * @param applicationId application
+   * @param containers allocated containers
+   */
+  public synchronized void allocateContainer(ApplicationID applicationId, 
+      List<Container> containers) {
+    if (containers == null) {
+      LOG.error("Adding null containers for application " + applicationId);
+      return;
+    }   
+    for (Container container : containers) {
+      allocateContainer(container);
+    }
+
+    LOG.info("addContainers:" +
+        " node=" + getHostName() + 
+        " #containers=" + containers.size() + 
+        " available=" + getAvailableResource().memory + 
+        " used=" + getUsedResource().memory);
+  }
+
+  /**
+   * Status update from the NodeManager
+   * @param nodeStatus node status
+   * @return the set of containers no longer should be used by the
+   * node manager.
+   */
+  public synchronized NodeResponse 
+    statusUpdate(Map<CharSequence,List<Container>> allContainers) {
+
+    if (allContainers == null) {
+      return new NodeResponse(EMPTY_APPLICATION_LIST, EMPTY_CONTAINER_LIST,
+          EMPTY_CONTAINER_LIST);
+    }
+       
+    List<Container> listContainers = new ArrayList<Container>();
+    // Iterate through the running containers and update their status
+    for (Map.Entry<CharSequence, List<Container>> e : 
+      allContainers.entrySet()) {
+      listContainers.addAll(e.getValue());
+    }
+    NodeResponse statusCheck = update(listContainers);
+    return statusCheck;
+  }
+  
+  /**
+   * Status update for an application running on a given node
+   * @param node node
+   * @param containers containers update.
+   * @return containers that are completed or need to be preempted.
+   */
+  private synchronized NodeResponse update(List<Container> containers) {
+    List<Container> completedContainers = new ArrayList<Container>();
+    List<Container> containersToCleanUp = new ArrayList<Container>();
+    List<ApplicationID> lastfinishedApplications = new ArrayList<ApplicationID>();
+    
+    for (Container container : containers) {
+      if (allocatedContainers.remove(container.id) != null) {
+        activeContainers.put(container.id, container);
+        LOG.info("Activated container " + container.id + " on node " + 
+         getHostName());
+      }
+
+      if (container.state == ContainerState.COMPLETE) {
+        if (activeContainers.remove(container.id) != null) {
+          updateResource(container);
+          LOG.info("Completed container " + container);
+        }
+        completedContainers.add(container);
+        LOG.info("Removed completed container " + container.id + " on node " + 
+            getHostName());
+      }
+      else if (container.state != ContainerState.COMPLETE && 
+          (!allocatedContainers.containsKey(container.id)) && 
+          !activeContainers.containsKey(container.id)) {
+        containersToCleanUp.add(container);
+      }
+    }
+    containersToCleanUp.addAll(containersToClean);
+    /* clear out containers to clean */
+    containersToClean.clear();
+    lastfinishedApplications.addAll(finishedApplications);
+    return new NodeResponse(lastfinishedApplications, completedContainers, 
+        containersToCleanUp);
+  }
+  
+  private synchronized void allocateContainer(Container container) {
+    deductAvailableResource(container.resource);
+    ++numContainers;
+    
+    allocatedContainers.put(container.id, container);
+    LOG.info("Allocated container " + container.id + 
+        " to node " + getHostName());
+    
+    LOG.info("Assigned container " + container.id + 
+        " of capacity " + container.resource + " on host " + getHostName() + 
+        ", which currently has " + numContainers + " containers, " + 
+        getUsedResource() + " used and " + 
+        getAvailableResource() + " available");
+  }
+
+  private synchronized boolean isValidContainer(Container c) {    
+    if (activeContainers.containsKey(c.id) || allocatedContainers.containsKey(c.id))
+      return true;
+    return false;
+  }
+
+  private synchronized void updateResource(Container container) {
+    addAvailableResource(container.resource);
+    --numContainers;
+  }
+  
+  /**
+   * Release an allocated container on this node.
+   * @param container container to be released
+   * @return <code>true</code> iff the container was unused, 
+   *         <code>false</code> otherwise
+   */
+  public synchronized boolean releaseContainer(Container container) {
+    if (!isValidContainer(container)) {
+      LOG.error("Invalid container released " + container);
+      return false;
+    }
+    
+    /* remove the containers from the nodemanger */
+    
+    // Was this container launched?
+    activeContainers.remove(container.id);
+    allocatedContainers.remove(container.id);
+    containersToClean.add(container);
+    updateResource(container);
+
+    LOG.info("Released container " + container.id + 
+        " of capacity " + container.resource + " on host " + getHostName() + 
+        ", which currently has " + numContainers + " containers, " + 
+        getUsedResource() + " used and " + getAvailableResource()
+        + " available" + ", release resources=" + true);
+    return true;
+  }
+
+  @Override
+  public NodeID getNodeID() {
+    return this.nodeId;
+  }
+
+  @Override
+  public String getHostName() {
+    return this.hostName;
+  }
+
+  @Override
+  public Resource getTotalCapability() {
+   return this.totalCapability;
+  }
+
+  @Override
+  public String getRackName() {
+    return node.getNetworkLocation();
+  }
+
+  @Override
+  public Node getNode() {
+    return this.node;
+  }
+
+  @Override
+  public synchronized Resource getAvailableResource() {
+    return this.availableResource;
+  }
+
+  @Override
+  public synchronized Resource getUsedResource() {
+    return this.usedResource;
+  }
+
+  public synchronized void addAvailableResource(Resource resource) {
+    if (resource == null) {
+      LOG.error("Invalid resource addition of null resource for " + this.hostName);
+      return;
+    }
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        availableResource, resource);
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+        usedResource, resource);
+  }
+
+  public synchronized void deductAvailableResource(Resource resource) {
+    if (resource == null) {
+      LOG.error("Invalid deduction of null resource for "+ this.hostName);
+    }
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+        availableResource, resource);
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        usedResource, resource);
+  }
+
+  public synchronized void notifyFinishedApplication(ApplicationID applicationId) {  
+    finishedApplications.add(applicationId);
+    /* make sure to iterate through the list and remove all the containers that 
+     * belong to this application.
+     */
+  }
+
+  @Override
+  public int getNumContainers() {
+    return numContainers;
+  }
+  
+  @Override
+  public String toString() {
+    return "host: " + getHostName() + " #containers=" + getNumContainers() +  
+      " available=" + getAvailableResource().memory + 
+      " used=" + getUsedResource().memory;
+  }
+ }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+/**
+ * Resource classification.
+ */
+public enum NodeType {
+  DATA_LOCAL,
+  RACK_LOCAL,
+  OFF_SWITCH
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Evolving
+@LimitedPrivate("yarn")
+public interface Queue {
+  String getQueueName();
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,67 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * This interface is implemented by services which want to get notified
+ * by the resource tracker with resource tracking information.
+ */
+@Evolving
+@Private
+public interface ResourceListener {
+ 
+  /**
+   * add a node to the resource listener.
+   * @param nodeId the nodeid of the node
+   * @param hostName the hostname of this node.
+   * @param node the topology information.
+   * @param capability the resource  capability of the node.
+   * @return the {@link NodeInfo} object that tracks this nodemanager.
+   */
+  public NodeInfo addNode(NodeID nodeId,String hostName,
+      Node node, Resource capability);
+  
+  /**
+   * A node has been removed from the cluster.
+   * @param node the node to remove.
+   */
+  public void removeNode(NodeInfo node);
+  
+  /**
+   * A status update from a NodeManager
+   * @param nodeInfo NodeManager info
+   * @param containers the containers completed/running/failed on this node.
+   * @return response information for the node, which containers to kill and 
+   * applications to clean.
+   */
+  public NodeResponse nodeUpdate(NodeInfo nodeInfo, 
+      Map<CharSequence,List<Container>> containers);
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+
+/**
+ * This interface is the one implemented by the schedulers. It mainly extends 
+ * {@link ResourceListener} and {@link YarnScheduler}. 
+ *
+ */
+@LimitedPrivate("yarn")
+@Evolving
+public interface ResourceScheduler extends ResourceListener, YarnScheduler {
+  /**
+   * Re-initialize the <code>ResourceScheduler</code>.
+   * @param conf configuration
+   * @param secretManager token-secret manager
+   * @throws IOException
+   */
+  void reinitialize(Configuration conf, 
+      ContainerTokenSecretManager secretManager);    
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,63 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+/**
+ * This interface is used by the components to talk to the
+ * scheduler for allocating of resources, cleaning up resources.
+ *
+ */
+public interface YarnScheduler {
+  /**
+   * Allocates and returns resources.
+   * @param applicationId
+   * @param ask
+   * @param release
+   * @return
+   * @throws IOException
+   */
+  List<Container> allocate(ApplicationID applicationId,
+      List<ResourceRequest> ask, List<Container> release)
+      throws IOException;
+  /**
+   * A new application has been submitted to the ResourceManager
+   * @param applicationId application which has been submitted
+   * @param user application user
+   * @param queue queue to which the applications is being submitted
+   * @param priority application priority
+   */
+  public void addApplication(ApplicationID applicationId, String user, 
+      String queue, Priority priority) 
+  throws IOException;
+  
+  /**
+   * A submitted application has completed.
+   * @param applicationId completed application
+   */
+  public void removeApplication(ApplicationID applicationId)
+  throws IOException;
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,353 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+@LimitedPrivate("yarn")
+@Evolving
+public class CapacityScheduler 
+implements ResourceScheduler, CapacitySchedulerContext {
+
+  private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
+  
+  private Queue root;
+  
+  private final static List<Container> EMPTY_CONTAINER_LIST = 
+    new ArrayList<Container>();
+
+  private final Comparator<Queue> queueComparator = new Comparator<Queue>() {
+    @Override
+    public int compare(Queue q1, Queue q2) {
+      if (q1.getUtilization() < q2.getUtilization()) {
+        return -1;
+      } else if (q1.getUtilization() > q2.getUtilization()) {
+        return 1;
+      }
+      
+      return q1.getQueuePath().compareTo(q2.getQueuePath());
+    }
+  };
+
+  private final Comparator<Application> applicationComparator = 
+    new Comparator<Application>() {
+      @Override
+      public int compare(Application a1, Application a2) {
+        return a1.getApplicationId().id - a2.getApplicationId().id;
+      }
+  };
+  
+  private CapacitySchedulerConfiguration conf;
+  private ContainerTokenSecretManager containerTokenSecretManager;
+  
+  private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
+  
+  private final ClusterTracker clusterTracker;
+  
+  private Resource minimumAllocation;
+  
+  private Map<ApplicationID, Application> applications = 
+    new TreeMap<ApplicationID, Application>(
+        new org.apache.hadoop.yarn.server.resourcemanager.resource.ApplicationID.Comparator());
+
+  public CapacityScheduler() {
+    this.clusterTracker = createClusterTracker();
+  }
+  
+  protected ClusterTracker createClusterTracker() {
+    return new ClusterTrackerImpl();
+  }
+
+  public Queue getRootQueue() {
+    return root;
+  }
+
+  @Override
+  public CapacitySchedulerConfiguration getConfiguration() {
+    return conf;
+  }
+  
+  @Override
+  public ContainerTokenSecretManager getContainerTokenSecretManager() {
+    return containerTokenSecretManager;
+  }
+
+  @Override
+  public Resource getMinimumAllocation() {
+    return minimumAllocation;
+  }
+
+  @Override
+  public void reinitialize(Configuration conf,
+      ContainerTokenSecretManager containerTokenSecretManager) {
+    this.conf = new CapacitySchedulerConfiguration(conf);
+    this.minimumAllocation = this.conf.getMinimumAllocation();
+    this.containerTokenSecretManager = containerTokenSecretManager;
+    
+    initializeQueues(this.conf);
+  }
+
+  @Private
+  public static final String ROOT = "root";
+  
+  @Private
+  public static final String ROOT_QUEUE = 
+    CapacitySchedulerConfiguration.PREFIX + ROOT;
+  
+  private void initializeQueues(CapacitySchedulerConfiguration conf) {
+    root = parseQueue(conf, null, ROOT);
+    LOG.info("Initialized root queue " + root);
+  }
+  
+  private Queue parseQueue(CapacitySchedulerConfiguration conf, 
+      Queue parent, String queueName) {
+    Queue queue;
+    String[] childQueueNames = 
+      conf.getQueues((parent == null) ? 
+          queueName : (parent.getQueuePath()+"."+queueName));
+    if (childQueueNames == null || childQueueNames.length == 0) {
+      queue = new LeafQueue(this, queueName, parent, applicationComparator);
+    } else {
+      ParentQueue parentQueue = 
+        new ParentQueue(this, queueName, queueComparator, parent);
+      List<Queue> childQueues = new ArrayList<Queue>();
+      for (String childQueueName : childQueueNames) {
+        Queue childQueue = 
+          parseQueue(
+              conf, 
+              parentQueue, 
+              childQueueName);
+        childQueues.add(childQueue);
+        
+        queues.put(childQueueName, childQueue);
+      }
+      parentQueue.setChildQueues(childQueues);
+      
+      queue = parentQueue;
+    }
+    
+    LOG.info("Initialized queue: " + queue);
+    return queue;
+  }
+  
+  @Override
+  public void addApplication(ApplicationID applicationId, 
+      String user, String queueName, Priority priority)
+  throws IOException {
+    Queue queue = queues.get(queueName);
+    
+    if (queue == null) {
+      throw new IOException("Application " + applicationId + 
+          " submitted by user " + user + " to unknown queue: " + queueName);
+    }
+    
+    if (!(queue instanceof LeafQueue)) {
+      throw new IOException("Application " + applicationId + 
+          " submitted by user " + user + " to non-leaf queue: " + queueName);
+    }
+
+    Application application = new Application(applicationId, queue, user); 
+    try {
+      queue.submitApplication(application, user, queueName, priority);
+    } catch (AccessControlException ace) {
+      throw new IOException(ace);
+    }
+    
+    applications.put(applicationId, application);
+
+    LOG.info("Application Submission: " + applicationId.id + 
+        ", user: " + user +
+        " queue: " + queue +
+        ", currently active: " + applications.size());
+  }
+
+  @Override
+  public void removeApplication(ApplicationID applicationId)
+      throws IOException {
+    Application application = getApplication(applicationId);
+    
+    if (application == null) {
+//      throw new IOException("Unknown application " + applicationId + 
+//          " has completed!");
+      LOG.info("Unknown application " + applicationId + " has completed!");
+      return;
+    }
+
+    // Inform the queue
+    Queue queue = queues.get(application.getQueue().getQueueName());
+    LOG.info("DEBUG --- removeApplication - appId: " + applicationId + " queue: " + queue);
+    queue.finishApplication(application, queue.getQueueName());
+    
+    // Release containers and update queue capacities
+    processReleasedContainers(application, application.getCurrentContainers());
+    
+    // Inform all NodeManagers about completion of application
+    clusterTracker.finishedApplication(applicationId, 
+        application.getAllNodesForApplication());
+    
+    // Remove from our data-structure
+    applications.remove(applicationId);
+  }
+
+  @Override
+  public NodeInfo addNode(NodeID nodeId,String hostName,
+      Node node, Resource capability) {
+    return clusterTracker.addNode(nodeId, hostName, node, capability);
+  }
+
+  @Override
+  public void removeNode(NodeInfo node) {
+    clusterTracker.removeNode(node);
+  }
+
+  @Override
+  public List<Container> allocate(ApplicationID applicationId,
+      List<ResourceRequest> ask, List<Container> release)
+      throws IOException {
+
+    Application application = getApplication(applicationId);
+    if (application == null) {
+      LOG.info("Calling allocate on removed " +
+          "or non existant application " + applicationId);
+      return EMPTY_CONTAINER_LIST; 
+    }
+    normalizeRequests(ask);
+
+    LOG.info("DEBUG --- allocate: pre-update" +
+        " applicationId=" + applicationId + 
+        " application=" + application);
+    application.showRequests();
+
+    // Update application requests
+    application.updateResourceRequests(ask);
+    
+    // Release ununsed containers and update queue capacities
+    processReleasedContainers(application, release);
+    
+    LOG.info("DEBUG --- allocate: post-update");
+    application.showRequests();
+
+    List<Container> allContainers = application.acquire();
+    LOG.info("DEBUG --- allocate:" +
+        " applicationId=" + applicationId + 
+        " #ask=" + ask.size() + 
+        " #release=" + release.size() +
+        " #allContainers=" + allContainers.size());
+    return allContainers;
+  }
+
+  private void normalizeRequests(List<ResourceRequest> asks) {
+    for (ResourceRequest ask : asks) {
+      normalizeRequest(ask);
+    }
+  }
+  
+  private void normalizeRequest(ResourceRequest ask) {
+    int memory = ask.capability.memory;
+    int minMemory = minimumAllocation.memory;
+    memory = 
+      minMemory * ((memory/minMemory) + (memory%minMemory)); 
+  }
+  
+
+  @Override
+  public synchronized NodeResponse nodeUpdate(NodeInfo node, 
+      Map<CharSequence,List<Container>> containers ) {
+    
+    LOG.info("nodeUpdate: " + node);
+    
+    NodeResponse nodeResponse = clusterTracker.nodeUpdate(node, containers);
+
+    // Completed containers
+    processCompletedContainers(nodeResponse.getCompletedContainers());
+    
+    // Assign new containers
+    root.assignContainers(clusterTracker, node);
+    
+    return nodeResponse;
+  }
+
+  private synchronized void processCompletedContainers(
+      List<Container> completedContainers) {
+    for (Container container: completedContainers) {
+      Application application = getApplication(container.id.appID);
+
+      // this is possible, since an application can be removed from scheduler 
+      // but the nodemanger is just updating about a completed container.
+      if (application != null) {
+
+        // Inform the queue
+        LeafQueue queue = (LeafQueue)application.getQueue();
+        queue.completedContainer(clusterTracker, container, application);
+      }
+    }
+  }
+   
+  private synchronized void processReleasedContainers(Application application,
+      List<Container> releasedContainers) {
+    // Inform the application
+    application.releaseContainers(releasedContainers);
+
+    // Inform clusterTracker
+    List<Container> unusedContainers = new ArrayList<Container>();
+    for (Container container : releasedContainers) {
+      if (clusterTracker.releaseContainer(
+          application.getApplicationId(), 
+          container)) {
+        unusedContainers.add(container);
+      }
+    }
+
+    // Update queue capacities
+    processCompletedContainers(unusedContainers);
+  }
+  
+  private synchronized Application getApplication(ApplicationID applicationId) {
+    return applications.get(applicationId);
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,163 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.Resource;
+
+public class CapacitySchedulerConfiguration extends Configuration {
+
+  private static final Log LOG = 
+    LogFactory.getLog(CapacitySchedulerConfiguration.class);
+  
+  private static final String CS_CONFIGURATION_FILE = "capacity-scheduler.xml";
+  
+  @Private
+  public static final String PREFIX = "yarn.capacity-scheduler.";
+  
+  @Private
+  public static final String DOT = ".";
+  
+  @Private
+  public static final String MAXIMUM_SYSTEM_APPLICATIONS =
+    PREFIX + "maximum-applications";
+  
+  @Private
+  public static final String QUEUES = "queues";
+  
+  @Private
+  public static final String CAPACITY = "capacity";
+  
+  @Private
+  public static final String MAXIMUM_CAPACITY = "maximum-capacity";
+  
+  @Private
+  public static final String USER_LIMIT = "minimum-user-limit";
+  
+  @Private
+  public static final String USER_LIMIT_FACTOR = "user-limit-factor";
+  
+  private static final int MINIMUM_MEMORY = 1024;
+
+  @Private
+  public static final String MINIMUM_ALLOCATION = 
+    PREFIX + "minimum-allocation-mb";
+  
+  @Private
+  public static int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
+  
+  @Private
+  public static int UNDEFINED = -1;
+  
+  @Private
+  public static int MINIMUM_CAPACITY_VALUE = 1;
+  
+  @Private
+  public static int MAXIMUM_CAPACITY_VALUE = 100;
+  
+  @Private
+  public static int DEFAULT_USER_LIMIT = 100;
+  
+  @Private
+  public static float DEFAULT_USER_LIMIT_FACTOR = 1.0f;
+  
+  public CapacitySchedulerConfiguration() {
+    this(new Configuration());
+  }
+  
+  public CapacitySchedulerConfiguration(Configuration configuration) {
+    super(configuration);
+    addResource(CS_CONFIGURATION_FILE);
+  }
+  
+  private String getQueuePrefix(String queue) {
+    String queueName = PREFIX + queue + DOT;
+    return queueName;
+  }
+  
+  public int getMaximumSystemApplications() {
+    int maxApplications = 
+      getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS);
+    return maxApplications;
+  }
+  
+  public int getCapacity(String queue) {
+    int capacity = getInt(getQueuePrefix(queue) + CAPACITY, UNDEFINED);
+    if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
+      throw new IllegalArgumentException("Illegal " +
+      		"capacity of " + capacity + " for queue " + queue);
+    }
+    LOG.info("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue) + 
+        ", capacity=" + capacity);
+    return capacity;
+  }
+  
+  public int getMaximumCapacity(String queue) {
+    int maxCapacity = 
+      getInt(getQueuePrefix(queue) + MAXIMUM_CAPACITY, UNDEFINED);
+    return maxCapacity;
+  }
+  
+  public int getUserLimit(String queue) {
+    int userLimit = 
+      getInt(getQueuePrefix(queue) + USER_LIMIT, DEFAULT_USER_LIMIT);
+    return userLimit;
+  }
+  
+  public float getUserLimitFactor(String queue) {
+    float userLimitFactor = 
+      getFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, 
+          DEFAULT_USER_LIMIT_FACTOR);
+    return userLimitFactor;
+  }
+
+  public void setUserLimitFactor(String queue, float userLimitFactor) {
+    setFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, userLimitFactor); 
+  }
+  
+  public void setCapacity(String queue, int capacity) {
+    setInt(getQueuePrefix(queue) + CAPACITY, capacity);
+    LOG.info("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue) + 
+        ", capacity=" + capacity);
+  }
+  
+  public String[] getQueues(String queue) {
+    LOG.info("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue));
+    String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);
+    LOG.info("CSConf - getQueues: queuePrefix=" + getQueuePrefix(queue) + 
+        ", queues=" + ((queues == null) ? "" : StringUtils.arrayToString(queues)));
+    return queues;
+  }
+  
+  public void setQueues(String queue, String[] subQueues) {
+    set(getQueuePrefix(queue) + QUEUES, StringUtils.arrayToString(subQueues));
+    LOG.info("CSConf - setQueues: qPrefix=" + getQueuePrefix(queue) + 
+        ", queues=" + StringUtils.arrayToString(subQueues));
+  }
+  
+  public Resource getMinimumAllocation() {
+    int minimumMemory = getInt(MINIMUM_ALLOCATION, MINIMUM_MEMORY);
+    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+             createResource(minimumMemory);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * Read-only interface to {@link CapacityScheduler} context.
+ */
+public interface CapacitySchedulerContext {
+  CapacitySchedulerConfiguration getConfiguration();
+  
+  Resource getMinimumAllocation();
+  
+  ContainerTokenSecretManager getContainerTokenSecretManager();
+}