You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/05/23 19:02:20 UTC

svn commit: r1126587 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/ yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ yarn/yarn-api/src/main/java/org/apac...

Author: acmurthy
Date: Mon May 23 17:02:19 2011
New Revision: 1126587

URL: http://svn.apache.org/viewvc?rev=1126587&view=rev
Log:
Changed Scheduler to return available limit to AM in the allocate api.

Added:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May 23 17:02:19 2011
@@ -3,6 +3,10 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+
+    Changed Scheduler to return available limit to AM in the allocate api.
+    (acmurthy)
+
     Refactored RMContainerAllocator to release unused containers. (sharad) 
 
     Fix null pointer exception in kill task attempt (mahadev)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon May 23 17:02:19 2011
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -229,7 +230,7 @@ public class TestRMContainerAllocator {
       //RMContainerAllocator
       
       @Override
-      public synchronized List<Container> allocate(ApplicationId applicationId,
+      public synchronized Allocation allocate(ApplicationId applicationId,
           List<ResourceRequest> ask, List<Container> release) 
           throws IOException {
         List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
@@ -347,7 +348,9 @@ public class TestRMContainerAllocator {
         List<Container> release = request.getReleaseList();
         try {
           AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
-          response.addAllContainers(resourceScheduler.allocate(status.getApplicationId(), ask, release));
+          Allocation allocation = resourceScheduler.allocate(status.getApplicationId(), ask, release);
+          response.addAllContainers(allocation.getContainers());
+          response.setAvailableResources(allocation.getResourceLimit());
           AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class);
           allocateResponse.setAMResponse(response);
           return allocateResponse;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java Mon May 23 17:02:19 2011
@@ -1,21 +1,23 @@
 package org.apache.hadoop.yarn.api.records;
 
 import java.util.List;
-//TODO Check if this can replace AMRMProtocolResponse
 
 public interface AMResponse {
-  public abstract boolean getReboot();
-  public abstract int getResponseId();
+  public boolean getReboot();
+  public int getResponseId();
   
-  public abstract List<Container> getContainerList();
-  public abstract Container getContainer(int index);
-  public abstract int getContainerCount();
+  public List<Container> getContainerList();
+  public Container getContainer(int index);
+  public int getContainerCount();
 
-  public abstract void setReboot(boolean reboot);
-  public abstract void setResponseId(int responseId);
+  public void setReboot(boolean reboot);
+  public void setResponseId(int responseId);
   
-  public abstract void addAllContainers(List<Container> containers);
-  public abstract void addContainer(Container container);
-  public abstract void removeContainer(int index);
-  public abstract void clearContainers();
+  public void addAllContainers(List<Container> containers);
+  public void addContainer(Container container);
+  public void removeContainer(int index);
+  public void clearContainers();
+  
+  public void setAvailableResources(Resource limit);
+  public Resource getAvailableResources();
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java Mon May 23 17:02:19 2011
@@ -8,9 +8,12 @@ import java.util.List;
 import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeManagerInfoProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 
 
     
@@ -19,6 +22,8 @@ public class AMResponsePBImpl extends Pr
   AMResponseProto.Builder builder = null;
   boolean viaProto = false;
   
+  Resource limit;
+
   private List<Container> containerList = null;
 //  private boolean hasLocalContainerList = false;
   
@@ -84,6 +89,28 @@ public class AMResponsePBImpl extends Pr
     builder.setResponseId((responseId));
   }
   @Override
+  public Resource getAvailableResources() {
+    if (this.limit != null) {
+      return this.limit;
+    }
+
+    AMResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasLimit()) {
+      return null;
+    }
+    this.limit = convertFromProtoFormat(p.getLimit());
+    return this.limit;
+  }
+
+  @Override
+  public void setAvailableResources(Resource limit) {
+    maybeInitBuilder();
+    if (limit == null)
+      builder.clearLimit();
+    this.limit = limit;
+  }
+
+  @Override
   public List<Container> getContainerList() {
     initLocalContainerList();
     return this.containerList;
@@ -183,6 +210,12 @@ public class AMResponsePBImpl extends Pr
     return ((ContainerPBImpl)t).getProto();
   }
 
+  private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
+    return new ResourcePBImpl(p);
+  }
 
+  private ResourceProto convertToProtoFormat(Resource r) {
+    return ((ResourcePBImpl) r).getProto();
+  }
 
 }  

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto Mon May 23 17:02:19 2011
@@ -146,6 +146,7 @@ message AMResponseProto {
   optional bool reboot = 1;
   optional int32 response_id = 2;
   repeated ContainerProto containers = 3;
+  optional ResourceProto limit = 4;
 }
 
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Mon May 23 17:02:19 2011
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationMasterHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.service.AbstractService;
 
@@ -160,10 +161,12 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
           return allocateResponse;
         }
         applicationsManager.applicationHeartbeat(status);
-        List<Container> containers = rScheduler.allocate(status.getApplicationId(), ask, release);
+        Allocation allocation = 
+          rScheduler.allocate(status.getApplicationId(), ask, release); 
         AMResponse  response = recordFactory.newRecordInstance(AMResponse.class);
-        response.addAllContainers(containers);
+        response.addAllContainers(allocation.getContainers());
         response.setResponseId(lastResponse.getResponseId() + 1);
+        response.setAvailableResources(allocation.getResourceLimit());
         responseMap.put(status.getApplicationId(), response);
         allocateResponse.setAMResponse(response);
         return allocateResponse;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java Mon May 23 17:02:19 2011
@@ -146,7 +146,7 @@ class SchedulerNegotiator extends Abstra
             AppContext masterInfo = it.next();
             ApplicationId appId = masterInfo.getMaster().getApplicationId();
             containers = scheduler.allocate(appId, 
-                EMPTY_ASK, EMPTY_RELEASE);
+                EMPTY_ASK, EMPTY_RELEASE).getContainers();
             if (!containers.isEmpty()) {
               // there should be only one container for an application
               assert(containers.size() == 1);

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java?rev=1126587&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java Mon May 23 17:02:19 2011
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class Allocation {
+  final List<Container> containers;
+  final Resource resourceLimit;
+  
+  public Allocation(List<Container> containers, Resource resourceLimit) {
+    this.containers = containers;
+    this.resourceLimit = resourceLimit;
+  }
+
+  public List<Container> getContainers() {
+    return containers;
+  }
+
+  public Resource getResourceLimit() {
+    return resourceLimit;
+  }
+  
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Mon May 23 17:02:19 2011
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
@@ -49,9 +49,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 
 /**
- * This class keeps track of all the consumption of an application.
- * This also keeps track of current running/completed
- *  containers for the application.
+ * This class keeps track of all the consumption of an application. This also
+ * keeps track of current running/completed containers for the application.
  */
 @LimitedPrivate("yarn")
 @Evolving
@@ -60,46 +59,45 @@ public class Application {
   final ApplicationId applicationId;
   final Queue queue;
   final String user;
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
-  final Set<Priority> priorities = 
-    new TreeSet<Priority>(
-        new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
-  final Map<Priority, Map<String, ResourceRequest>> requests = 
-    new HashMap<Priority, Map<String, ResourceRequest>>();
-  final Resource currentConsumption = recordFactory.newRecordInstance(Resource.class);
-  final Resource overallConsumption = recordFactory.newRecordInstance(Resource.class);
+  private final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
 
-  Map<Priority, Integer> schedulingOpportunities = 
-    new HashMap<Priority, Integer>();
+  final Set<Priority> priorities = new TreeSet<Priority>(
+      new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
+  final Map<Priority, Map<String, ResourceRequest>> requests = new HashMap<Priority, Map<String, ResourceRequest>>();
+  final Resource currentConsumption = recordFactory
+      .newRecordInstance(Resource.class);
+  final Resource overallConsumption = recordFactory
+      .newRecordInstance(Resource.class);
+  Resource resourceLimit = recordFactory.newRecordInstance(Resource.class);
   
+  Map<Priority, Integer> schedulingOpportunities = new HashMap<Priority, Integer>();
+
   private final ApplicationStore store;
-  
+
   /* Current consumption */
   List<Container> acquired = new ArrayList<Container>();
   List<Container> completedContainers = new ArrayList<Container>();
   /* Allocated by scheduler */
-  List<Container> allocated = new ArrayList<Container>(); 
+  List<Container> allocated = new ArrayList<Container>();
   Set<NodeInfo> applicationOnNodes = new HashSet<NodeInfo>();
   ApplicationMaster master;
   boolean pending = true; // for app metrics
-  
+
   /* Reserved containers */
-  private final Comparator<NodeInfo> nodeComparator = 
-    new Comparator<NodeInfo>() {
+  private final Comparator<NodeInfo> nodeComparator = new Comparator<NodeInfo>() {
     @Override
     public int compare(NodeInfo o1, NodeInfo o2) {
       return o1.getNodeID().getId() - o2.getNodeID().getId();
     }
   };
-  final Map<Priority, Set<NodeInfo>> reservedContainers =
-    new HashMap<Priority, Set<NodeInfo>>();
+  final Map<Priority, Set<NodeInfo>> reservedContainers = new HashMap<Priority, Set<NodeInfo>>();
 
   public Application(ApplicationId applicationId, ApplicationMaster master,
       Queue queue, String user, ApplicationStore store) {
     this.applicationId = applicationId;
     this.queue = queue;
-    this.user = user; 
+    this.user = user;
     this.master = master;
     this.store = store;
   }
@@ -111,7 +109,7 @@ public class Application {
   public Queue getQueue() {
     return queue;
   }
-  
+
   public String getUser() {
     return user;
   }
@@ -140,19 +138,21 @@ public class Application {
   public synchronized void clearRequests() {
     requests.clear();
   }
-  
+
   /**
-   * the currently acquired/allocated containers  by the application masters.
+   * the currently acquired/allocated containers by the application masters.
+   * 
    * @return the current containers being used by the application masters.
    */
   public synchronized List<Container> getCurrentContainers() {
-    List<Container> currentContainers =  new ArrayList<Container>(acquired);
+    List<Container> currentContainers = new ArrayList<Container>(acquired);
     currentContainers.addAll(allocated);
     return currentContainers;
   }
 
   /**
    * The ApplicationMaster is acquiring the allocated/completed resources.
+   * 
    * @return allocated resources
    */
   synchronized public List<Container> acquire() {
@@ -165,22 +165,23 @@ public class Application {
       Resources.addTo(overallConsumption, container.getResource());
     }
 
-    LOG.debug("acquire:" +
-        " application=" + applicationId + 
-        " #acquired=" + heartbeatContainers.size());
-    heartbeatContainers =  (heartbeatContainers == null) ? 
-        new ArrayList<Container>() : heartbeatContainers;
-
-        heartbeatContainers.addAll(completedContainers);
-        completedContainers.clear();
-        return heartbeatContainers;
+    LOG.debug("acquire:" + " application=" + applicationId + " #acquired="
+        + heartbeatContainers.size());
+    heartbeatContainers = (heartbeatContainers == null) ? new ArrayList<Container>()
+        : heartbeatContainers;
+
+    heartbeatContainers.addAll(completedContainers);
+    completedContainers.clear();
+    return heartbeatContainers;
   }
 
   /**
-   * The ApplicationMaster is updating resource requirements for the 
-   * application, by asking for more resources and releasing resources 
-   * acquired by the application.
-   * @param requests resources to be acquired
+   * The ApplicationMaster is updating resource requirements for the
+   * application, by asking for more resources and releasing resources acquired
+   * by the application.
+   * 
+   * @param requests
+   *          resources to be acquired
    */
   synchronized public void updateResourceRequests(List<ResourceRequest> requests) {
     QueueMetrics metrics = queue.getMetrics();
@@ -192,9 +193,8 @@ public class Application {
       ResourceRequest lastRequest = null;
 
       if (hostName.equals(NodeManager.ANY)) {
-        LOG.debug("update:" +
-            " application=" + applicationId +
-            " request=" + request);
+        LOG.debug("update:" + " application=" + applicationId + " request="
+            + request);
         updatePendingResources = true;
       }
 
@@ -211,28 +211,26 @@ public class Application {
       asks.put(hostName, request);
 
       if (updatePendingResources) {
-        int lastRequestContainers = lastRequest != null ?
-            lastRequest.getNumContainers() : 0;
-        Resource lastRequestCapability = lastRequest != null ?
-            lastRequest.getCapability() : Resources.none();
-        metrics.incrPendingResources(user,
-            request.getNumContainers() - lastRequestContainers,
-            Resources.subtractFrom( // save a clone
-                Resources.multiply(request.getCapability(),
-                                   request.getNumContainers()),
-                Resources.multiply(lastRequestCapability,
-                                   lastRequestContainers)));
+        int lastRequestContainers = lastRequest != null ? lastRequest
+            .getNumContainers() : 0;
+        Resource lastRequestCapability = lastRequest != null ? lastRequest
+            .getCapability() : Resources.none();
+        metrics.incrPendingResources(user, request.getNumContainers()
+            - lastRequestContainers, Resources.subtractFrom( // save a clone
+            Resources.multiply(request.getCapability(), request
+                .getNumContainers()), Resources.multiply(lastRequestCapability,
+                lastRequestContainers)));
       }
     }
   }
 
   public synchronized void releaseContainers(List<Container> release) {
-    // Release containers and update consumption 
+    // Release containers and update consumption
     for (Container container : release) {
-      LOG.debug("update: " +
-          "application=" + applicationId + " released=" + container);
+      LOG.debug("update: " + "application=" + applicationId + " released="
+          + container);
       Resources.subtractFrom(currentConsumption, container.getResource());
-      for (Iterator<Container> i=acquired.iterator(); i.hasNext();) {
+      for (Iterator<Container> i = acquired.iterator(); i.hasNext();) {
         Container c = i.next();
         if (c.getId().equals(container.getId())) {
           i.remove();
@@ -246,21 +244,25 @@ public class Application {
     return priorities;
   }
 
-  synchronized public Map<String, ResourceRequest> 
-  getResourceRequests(Priority priority) {
+  synchronized public Map<String, ResourceRequest> getResourceRequests(
+      Priority priority) {
     return requests.get(priority);
   }
 
-  synchronized public ResourceRequest getResourceRequest(Priority priority, 
+  synchronized public ResourceRequest getResourceRequest(Priority priority,
       String nodeAddress) {
     Map<String, ResourceRequest> nodeRequests = requests.get(priority);
     return (nodeRequests == null) ? null : nodeRequests.get(nodeAddress);
   }
 
-  synchronized public void completedContainer(Container container) {
-    LOG.info("Completed container: " + container);
-    completedContainers.add(container);
-    queue.getMetrics().releaseResources(user, 1, container.getResource());
+  synchronized public void completedContainer(Container container, 
+      Resource containerResource) {
+    if (container != null) {
+      LOG.info("Completed container: " + container);
+      completedContainers.add(container);
+    }
+    queue.getMetrics().releaseResources(user, 1, containerResource);
+    Resources.subtractFrom(currentConsumption, containerResource);
   }
 
   synchronized public void completedContainers(List<Container> containers) {
@@ -268,15 +270,21 @@ public class Application {
   }
 
   /**
-   * Resources have been allocated to this application by the resource scheduler.
-   * Track them.
-   * @param type the type of the node
-   * @param node the nodeinfo of the node
-   * @param priority the priority of the request.
-   * @param request the request
-   * @param containers the containers allocated.
+   * Resources have been allocated to this application by the resource
+   * scheduler. Track them.
+   * 
+   * @param type
+   *          the type of the node
+   * @param node
+   *          the nodeinfo of the node
+   * @param priority
+   *          the priority of the request.
+   * @param request
+   *          the request
+   * @param containers
+   *          the containers allocated.
    */
-  synchronized public void allocate(NodeType type, NodeInfo node, 
+  synchronized public void allocate(NodeType type, NodeInfo node,
       Priority priority, ResourceRequest request, List<Container> containers) {
     applicationOnNodes.add(node);
     if (type == NodeType.DATA_LOCAL) {
@@ -293,81 +301,90 @@ public class Application {
       pending = false;
       metrics.incrAppsRunning(user);
     }
-    LOG.debug("allocate: user: "+ user +", memory: "+ request.getCapability());
+    LOG.debug("allocate: user: " + user + ", memory: "
+        + request.getCapability());
     metrics.allocateResources(user, containers.size(), request.getCapability());
   }
 
   /**
-   * The {@link ResourceScheduler} is allocating data-local resources 
-   * to the application.
-   * @param allocatedContainers resources allocated to the application
+   * The {@link ResourceScheduler} is allocating data-local resources to the
+   * application.
+   * 
+   * @param allocatedContainers
+   *          resources allocated to the application
    */
-  synchronized private void allocateNodeLocal(NodeInfo node, 
-      Priority priority, ResourceRequest nodeLocalRequest, 
-      List<Container> containers) {
+  synchronized private void allocateNodeLocal(NodeInfo node, Priority priority,
+      ResourceRequest nodeLocalRequest, List<Container> containers) {
     // Update consumption and track allocations
     allocate(containers);
 
     // Update future requirements
-    nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - containers.size());
+    nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers()
+        - containers.size());
     if (nodeLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getNodeAddress());
     }
-    
-    ResourceRequest rackLocalRequest = 
-      requests.get(priority).get(node.getRackName());
-    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - containers.size());
+
+    ResourceRequest rackLocalRequest = requests.get(priority).get(
+        node.getRackName());
+    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers()
+        - containers.size());
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
-    
+
     // Do not remove ANY
-    ResourceRequest offSwitchRequest = 
-      requests.get(priority).get(NodeManager.ANY);
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - containers.size());
+    ResourceRequest offSwitchRequest = requests.get(priority).get(
+        NodeManager.ANY);
+    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
+        - containers.size());
   }
 
   /**
-   * The {@link ResourceScheduler} is allocating data-local resources 
-   * to the application.
-   * @param allocatedContainers resources allocated to the application
+   * The {@link ResourceScheduler} is allocating data-local resources to the
+   * application.
+   * 
+   * @param allocatedContainers
+   *          resources allocated to the application
    */
-  synchronized private void allocateRackLocal(NodeInfo node, 
-      Priority priority, ResourceRequest rackLocalRequest, 
-      List<Container> containers) {
+  synchronized private void allocateRackLocal(NodeInfo node, Priority priority,
+      ResourceRequest rackLocalRequest, List<Container> containers) {
 
     // Update consumption and track allocations
     allocate(containers);
 
     // Update future requirements
-    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - containers.size());
+    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers()
+        - containers.size());
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
-    
+
     // Do not remove ANY
-    ResourceRequest offSwitchRequest = 
-      requests.get(priority).get(NodeManager.ANY);
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - containers.size());
-} 
+    ResourceRequest offSwitchRequest = requests.get(priority).get(
+        NodeManager.ANY);
+    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
+        - containers.size());
+  }
 
   /**
-   * The {@link ResourceScheduler} is allocating data-local resources 
-   * to the application.
-   * @param allocatedContainers resources allocated to the application
+   * The {@link ResourceScheduler} is allocating data-local resources to the
+   * application.
+   * 
+   * @param allocatedContainers
+   *          resources allocated to the application
    */
-  synchronized private void allocateOffSwitch(NodeInfo node, 
-      Priority priority, ResourceRequest offSwitchRequest, 
-      List<Container> containers) {
+  synchronized private void allocateOffSwitch(NodeInfo node, Priority priority,
+      ResourceRequest offSwitchRequest, List<Container> containers) {
 
     // Update consumption and track allocations
     allocate(containers);
 
     // Update future requirements
-    
+
     // Do not remove ANY
-    offSwitchRequest.setNumContainers(
-        offSwitchRequest.getNumContainers() - containers.size());
+    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
+        - containers.size());
   }
 
   synchronized public void allocate(List<Container> containers) {
@@ -378,61 +395,63 @@ public class Application {
       allocated.add(container);
       try {
         store.storeContainer(container);
-      } catch(IOException ie) {
-        //TODO fix this. we shouldnt ignore
+      } catch (IOException ie) {
+        // TODO fix this. we shouldnt ignore
       }
-      LOG.debug("allocate: applicationId=" + applicationId + 
-          " container=" + container.getId() + " host=" + container.getContainerManagerAddress());
+      LOG.debug("allocate: applicationId=" + applicationId + " container="
+          + container.getId() + " host="
+          + container.getContainerManagerAddress());
     }
   }
 
   synchronized public void resetSchedulingOpportunities(Priority priority) {
-    Integer schedulingOpportunities = this.schedulingOpportunities.get(priority);
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
     schedulingOpportunities = 0;
     this.schedulingOpportunities.put(priority, schedulingOpportunities);
   }
-  
+
   synchronized public void addSchedulingOpportunity(Priority priority) {
-    Integer schedulingOpportunities = this.schedulingOpportunities.get(priority);
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
     if (schedulingOpportunities == null) {
       schedulingOpportunities = 0;
     }
     ++schedulingOpportunities;
     this.schedulingOpportunities.put(priority, schedulingOpportunities);
   }
-  
+
   synchronized public int getSchedulingOpportunities(Priority priority) {
-    Integer schedulingOpportunities = this.schedulingOpportunities.get(priority);
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
     if (schedulingOpportunities == null) {
       schedulingOpportunities = 0;
       this.schedulingOpportunities.put(priority, schedulingOpportunities);
     }
     return schedulingOpportunities;
   }
-  
+
   synchronized public void showRequests() {
     for (Priority priority : getPriorities()) {
       Map<String, ResourceRequest> requests = getResourceRequests(priority);
       if (requests != null) {
+        LOG.debug("showRequests:" + " application=" + applicationId + 
+            " available=" + getResourceLimit() + " current=" + currentConsumption);
         for (ResourceRequest request : requests.values()) {
-          LOG.debug("showRequests:" +
-              " application=" + applicationId + 
-              " request=" + request);
+          LOG.debug("showRequests:" + " application=" + applicationId
+              + " request=" + request);
         }
       }
     }
   }
-  
+
   synchronized public List<NodeInfo> getAllNodesForApplication() {
     return new ArrayList<NodeInfo>(applicationOnNodes);
   }
 
-
-  synchronized public org.apache.hadoop.yarn.api.records.Application 
-  getApplicationInfo() {
-    org.apache.hadoop.yarn.api.records.Application application = 
-      recordFactory.newRecordInstance(
-          org.apache.hadoop.yarn.api.records.Application.class);
+  synchronized public org.apache.hadoop.yarn.api.records.Application getApplicationInfo() {
+    org.apache.hadoop.yarn.api.records.Application application = recordFactory
+        .newRecordInstance(org.apache.hadoop.yarn.api.records.Application.class);
     application.setApplicationId(applicationId);
     application.setMasterHost("");
     application.setName("");
@@ -440,8 +459,8 @@ public class Application {
     application.setState(ApplicationState.RUNNING);
     application.setUser(user);
 
-    ApplicationStatus status = 
-      recordFactory.newRecordInstance(ApplicationStatus.class);
+    ApplicationStatus status = recordFactory
+        .newRecordInstance(ApplicationStatus.class);
     status.setApplicationId(applicationId);
     application.setStatus(status);
 
@@ -461,9 +480,9 @@ public class Application {
       reservedContainers.put(priority, reservedNodes);
     }
     reservedNodes.add(node);
-    LOG.info("Application " + applicationId + " reserved " + resource + 
-        " on node " + node + ", currently has " + reservedNodes.size() + 
-        " at priority " + priority);
+    LOG.info("Application " + applicationId + " reserved " + resource
+        + " on node " + node + ", currently has " + reservedNodes.size()
+        + " at priority " + priority);
   }
 
   public synchronized void unreserveResource(NodeInfo node, Priority priority) {
@@ -473,14 +492,14 @@ public class Application {
       this.reservedContainers.remove(priority);
     }
 
-    LOG.info("Application " + applicationId + " unreserved " + 
-        " on node " + node + ", currently has " + reservedNodes.size() + 
-        " at priority " + priority);
+    LOG.info("Application " + applicationId + " unreserved " + " on node "
+        + node + ", currently has " + reservedNodes.size() + " at priority "
+        + priority);
   }
 
   public synchronized boolean isReserved(NodeInfo node, Priority priority) {
     Set<NodeInfo> reservedNodes = reservedContainers.get(priority);
-    if (reservedNodes != null) { 
+    if (reservedNodes != null) {
       return reservedNodes.contains(node);
     }
     return false;
@@ -488,10 +507,10 @@ public class Application {
 
   public float getLocalityWaitFactor(Priority priority, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
-    int requiredResources = Math.max(this.requests.get(priority).size()-1, 1);
-    return ((float)requiredResources / clusterNodes); 
+    int requiredResources = Math.max(this.requests.get(priority).size() - 1, 1);
+    return ((float) requiredResources / clusterNodes);
   }
-  
+
   synchronized public void finish() {
     // GC pending resources metrics
     QueueMetrics metrics = queue.getMetrics();
@@ -499,8 +518,8 @@ public class Application {
       ResourceRequest request = asks.get(NodeManager.ANY);
       if (request != null) {
         metrics.decrPendingResources(user, request.getNumContainers(),
-            Resources.multiply(request.getCapability(),
-                               request.getNumContainers()));
+            Resources.multiply(request.getCapability(), request
+                .getNumContainers()));
       }
     }
   }
@@ -508,4 +527,21 @@ public class Application {
   public Map<Priority, Set<NodeInfo>> getAllReservations() {
     return new HashMap<Priority, Set<NodeInfo>>(reservedContainers);
   }
+
+  public synchronized void setAvailableResourceLimit(Resource globalLimit) {
+    resourceLimit = Resources.subtract(globalLimit, currentConsumption);
+    
+    // Corner case to deal with applications being slightly over-limit
+    if (resourceLimit.getMemory() < 0) {
+      resourceLimit.setMemory(0);
+    }
+  }
+
+  /**
+   * Get available headroom in terms of resources for the application's user.
+   * @return available resource headroom
+   */
+  public synchronized Resource getResourceLimit() {
+    return resourceLimit;
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Mon May 23 17:02:19 2011
@@ -171,22 +171,22 @@ public class QueueMetrics {
   /**
    * Set available resources. To be called by scheduler periodically as
    * resources become available.
-   * @param mb memory in MB
+   * @param limit resource limit
    */
-  public void setAvailableQueueMemory(int mb) {
-    availableGB.set(mb/GB);
+  public void setAvailableResourcesToQueue(Resource limit) {
+    availableGB.set(limit.getMemory()/GB);
   }
 
   /**
    * Set available resources. To be called by scheduler periodically as
    * resources become available.
    * @param user
-   * @param mb memory in MB
+   * @param limit resource limit
    */
-  public void setAvailableUserMemory(String user, int mb) {
+  public void setAvailableResourcesToUser(String user, Resource limit) {
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
-      userMetrics.setAvailableQueueMemory(mb);
+      userMetrics.setAvailableResourcesToQueue(limit);
     }
   }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Mon May 23 17:02:19 2011
@@ -42,10 +42,10 @@ public interface YarnScheduler {
    * @param applicationId
    * @param ask
    * @param release
-   * @return
+   * @return the scheduler's {@link Allocation} response 
    * @throws IOException
    */
-  List<Container> allocate(ApplicationId applicationId,
+  Allocation allocate(ApplicationId applicationId,
       List<ResourceRequest> ask, List<Container> release)
   throws IOException;
   

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon May 23 17:02:19 2011
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -300,7 +301,7 @@ implements ResourceScheduler, CapacitySc
   }
 
   @Override
-  public List<Container> allocate(ApplicationId applicationId,
+  public Allocation allocate(ApplicationId applicationId,
       List<ResourceRequest> ask, List<Container> release)
       throws IOException {
 
@@ -308,8 +309,9 @@ implements ResourceScheduler, CapacitySc
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + applicationId);
-      return EMPTY_CONTAINER_LIST; 
+      return new Allocation(EMPTY_CONTAINER_LIST, Resources.none()); 
     }
+    
     normalizeRequests(ask);
 
     LOG.info("DEBUG --- allocate: pre-update" +
@@ -332,7 +334,7 @@ implements ResourceScheduler, CapacitySc
         " #ask=" + ask.size() + 
         " #release=" + release.size() +
         " #allContainers=" + allContainers.size());
-    return allContainers;
+    return new Allocation(allContainers, application.getResourceLimit());
   }
 
   @Override

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon May 23 17:02:19 2011
@@ -528,7 +528,7 @@ public class LeafQueue implements Queue 
             }
 
             // User limits
-            if (!assignToUser(application.getUser(), clusterResource, required.getCapability())) {
+            if (!assignToUser(application, clusterResource, required.getCapability())) {
               continue; 
             }
 
@@ -615,8 +615,8 @@ public class LeafQueue implements Queue 
     return true;
   }
 
-  private synchronized boolean assignToUser(String userName, Resource clusterResource,
-      Resource required) {
+  private synchronized boolean assignToUser(Application application, 
+      Resource clusterResource, Resource required) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
     //   we're running below capacity. The 'max' ensures that jobs in queues
@@ -642,6 +642,8 @@ public class LeafQueue implements Queue 
     // Also, the queue's configured capacity should be higher than 
     // queue-hard-limit * ulMin
 
+    String userName = application.getUser();
+    
     final int activeUsers = users.size();  
     User user = getUser(userName);
 
@@ -652,8 +654,8 @@ public class LeafQueue implements Queue 
               (int)(queueCapacity * userLimitFactor)
       );
 
-    metrics.setAvailableUserMemory(userName,
-        limit - user.getConsumedResources().getMemory());
+    application.setAvailableResourceLimit(Resources.createResource(limit));
+    metrics.setAvailableResourcesToUser(userName, application.getResourceLimit());
 
     // Note: We aren't considering the current request since there is a fixed
     // overhead of the AM, so... 
@@ -935,16 +937,14 @@ public class LeafQueue implements Queue 
 
   @Override
   public void completedContainer(Resource clusterResource, 
-      Container container, Resource allocatedResource, Application application) {
+      Container container, Resource containerResource, Application application) {
     if (application != null) {
       // Careful! Locking order is important!
       synchronized (this) {
         
-        // Inform the application iff this was an allocated container, 
-        // as opposed to an unfulfilled reservation
-        if (container != null) {
-          application.completedContainer(container);
-        }
+        // Inform the application - this might be an allocated container or
+        // an unfulfilled reservation
+        application.completedContainer(container, containerResource);
         
         // Book-keeping
         releaseResource(clusterResource, 
@@ -952,7 +952,7 @@ public class LeafQueue implements Queue 
 
         LOG.info("completedContainer" +
             " container=" + container +
-            " resource=" + allocatedResource +
+            " resource=" + containerResource +
         		" queue=" + this + 
             " util=" + getUtilization() + 
             " used=" + usedResources + 
@@ -961,7 +961,7 @@ public class LeafQueue implements Queue 
 
       // Inform the parent queue
       parent.completedContainer(clusterResource, container, 
-          allocatedResource, application);
+          containerResource, application);
     }
   }
 
@@ -987,10 +987,15 @@ public class LeafQueue implements Queue 
 
   @Override
   public synchronized void updateResource(Resource clusterResource) {
-    float memLimit = clusterResource.getMemory() * absoluteCapacity;
-    setUtilization(usedResources.getMemory() / memLimit);
-    setUsedCapacity(usedResources.getMemory() / (clusterResource.getMemory() * capacity));
-    metrics.setAvailableQueueMemory((int) memLimit - usedResources.getMemory());
+    float queueLimit = clusterResource.getMemory() * absoluteCapacity; 
+    setUtilization(usedResources.getMemory() / queueLimit);
+    setUsedCapacity(
+        usedResources.getMemory() / (clusterResource.getMemory() * capacity));
+    
+    Resource resourceLimit = 
+      Resources.createResource((int)queueLimit);
+    metrics.setAvailableResourcesToQueue(
+        Resources.subtractFrom(resourceLimit, usedResources));
   }
 
   @Override

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Mon May 23 17:02:19 2011
@@ -619,13 +619,13 @@ public class ParentQueue implements Queu
   
   @Override
   public void completedContainer(Resource clusterResource,
-      Container container, Resource allocatedResource, 
+      Container container, Resource containerResource, 
       Application application) {
     if (application != null) {
       // Careful! Locking order is important!
       // Book keeping
       synchronized (this) {
-        releaseResource(clusterResource, allocatedResource);
+        releaseResource(clusterResource, containerResource);
 
         LOG.info("completedContainer" +
             " queue=" + getQueueName() + 
@@ -637,7 +637,7 @@ public class ParentQueue implements Queu
       // Inform the parent
       if (parent != null) {
         parent.completedContainer(clusterResource, container, 
-            allocatedResource, application);
+            containerResource, application);
       }    
     }
   }
@@ -658,10 +658,15 @@ public class ParentQueue implements Queu
 
   @Override
   public synchronized void updateResource(Resource clusterResource) {
-    float memLimit = clusterResource.getMemory() * absoluteCapacity;
-    setUtilization(usedResources.getMemory() / memLimit);
-    setUsedCapacity(usedResources.getMemory() / (clusterResource.getMemory() * capacity));
-    metrics.setAvailableQueueMemory((int) memLimit - usedResources.getMemory());
+    float queueLimit = clusterResource.getMemory() * absoluteCapacity; 
+    setUtilization(usedResources.getMemory() / queueLimit);
+    setUsedCapacity(
+        usedResources.getMemory() / (clusterResource.getMemory() * capacity));
+    
+    Resource resourceLimit = 
+      Resources.createResource((int)queueLimit);
+    metrics.setAvailableResourcesToQueue(
+        Resources.subtractFrom(resourceLimit, usedResources));
   }
 
   @Override

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Mon May 23 17:02:19 2011
@@ -170,11 +170,11 @@ extends org.apache.hadoop.yarn.server.re
    * @param clusterResource the resource of the cluster
    * @param container completed container, 
    *                  <code>null</code> if it was just a reservation
-   * @param allocatedResource allocated resource
+   * @param containerResource allocated resource
    * @param application application to which the container was assigned
    */
   public void completedContainer(Resource clusterResource,
-      Container container, Resource allocatedResource, 
+      Container container, Resource containerResource, 
       Application application);
 
   /**

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Mon May 23 17:02:19 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -167,14 +168,14 @@ public class FifoScheduler implements Re
   }
 
   @Override
-  public synchronized List<Container> allocate(ApplicationId applicationId,
+  public synchronized Allocation allocate(ApplicationId applicationId,
       List<ResourceRequest> ask, List<Container> release) 
       throws IOException {
     Application application = getApplication(applicationId);
     if (application == null) {
       LOG.error("Calling allocate on removed " +
           "or non existant application " + applicationId);
-      return EMPTY_CONTAINER_LIST; 
+      return new Allocation(EMPTY_CONTAINER_LIST, Resources.none()); 
     }
     normalizeRequests(ask);
 
@@ -200,7 +201,7 @@ public class FifoScheduler implements Re
         " #ask=" + ask.size() + 
         " #release=" + release.size() +
         " #allContainers=" + allContainers.size());
-    return allContainers;
+    return new Allocation(allContainers, application.getResourceLimit());
   }
 
   private void releaseContainers(Application application, List<Container> release) {
@@ -294,6 +295,9 @@ public class FifoScheduler implements Re
           }
         }
       }
+      
+      application.setAvailableResourceLimit(clusterResource);
+      
       LOG.debug("post-assignContainers");
       application.showRequests();
 
@@ -475,7 +479,7 @@ public class FifoScheduler implements Re
        * the nodemanger is just updating about a completed container.
        */
       if (app != null) {
-        app.completedContainer(c);
+        app.completedContainer(c, c.getResource());
       }
     }
   }
@@ -507,8 +511,8 @@ public class FifoScheduler implements Re
         MINIMUM_ALLOCATION)) {
       assignContainers(node);
     }
-    metrics.setAvailableQueueMemory(
-        clusterResource.getMemory() - usedResource.getMemory());
+    metrics.setAvailableResourcesToQueue(
+        Resources.subtract(clusterResource, usedResource));
     LOG.info("Node after allocation " + node.getNodeID() + " resource = "
         + node.getAvailableResource());
 
@@ -541,7 +545,6 @@ public class FifoScheduler implements Re
   }
 
 
-  private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
   private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Mon May 23 17:02:19 2011
@@ -268,7 +268,7 @@ public class Application {
     List<Container> response = 
       resourceManager.getResourceScheduler().allocate(applicationId, 
           new ArrayList<ResourceRequest>(ask), 
-          new ArrayList<Container>(release));
+          new ArrayList<Container>(release)).getContainers();
     
     List<Container> containers = new ArrayList<Container>(response.size());
     for (Container container : response) {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java Mon May 23 17:02:19 2011
@@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.junit.After;
 import org.junit.Before;
@@ -71,9 +73,9 @@ public class TestAMLaunchFailure extends
     private Container container = recordFactory.newRecordInstance(Container.class);
 
     @Override
-    public List<Container> allocate(ApplicationId applicationId,
+    public Allocation allocate(ApplicationId applicationId,
         List<ResourceRequest> ask, List<Container> release) throws IOException {
-      return Arrays.asList(container);
+      return new Allocation(Arrays.asList(container), Resources.none());
     }
 
     @Override

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java Mon May 23 17:02:19 2011
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.junit.After;
 import org.junit.Before;
@@ -81,7 +82,7 @@ public class TestAMRMRPCResponseId exten
   
   private class DummyScheduler implements YarnScheduler {
     @Override
-    public List<Container> allocate(ApplicationId applicationId,
+    public Allocation allocate(ApplicationId applicationId,
         List<ResourceRequest> ask, List<Container> release) throws IOException {
       return null;
     }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Mon May 23 17:02:19 2011
@@ -41,8 +41,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -140,7 +142,7 @@ public class TestAMRestart extends TestC
     }
     
     @Override
-    public List<Container> allocate(ApplicationId applicationId,
+    public Allocation allocate(ApplicationId applicationId,
         List<ResourceRequest> ask, List<Container> release) throws IOException {
       Container container = recordFactory.newRecordInstance(Container.class);
       container.setContainerToken(recordFactory.newRecordInstance(ContainerToken.class));
@@ -150,7 +152,7 @@ public class TestAMRestart extends TestC
       container.getId().setAppId(appID);
       container.getId().setId(count);
       count++;
-      return Arrays.asList(container);
+      return new Allocation(Arrays.asList(container), Resources.none());
     }
 
     @Override

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java Mon May 23 17:02:19 2011
@@ -50,8 +50,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -72,7 +74,7 @@ public class TestSchedulerNegotiator ext
   
   private class DummyScheduler implements ResourceScheduler {
     @Override
-    public List<Container> allocate(ApplicationId applicationId,
+    public Allocation allocate(ApplicationId applicationId,
         List<ResourceRequest> ask, List<Container> release) throws IOException {
       ArrayList<Container> containers = new ArrayList<Container>();
       Container container = recordFactory.newRecordInstance(Container.class);
@@ -80,7 +82,7 @@ public class TestSchedulerNegotiator ext
       container.getId().setAppId(applicationId);
       container.getId().setId(testNum);
       containers.add(container);
-      return containers;
+      return new Allocation(containers, Resources.none());
     }
   
   

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Mon May 23 17:02:19 2011
@@ -8,6 +8,7 @@ import org.apache.hadoop.metrics2.lib.De
 import static org.apache.hadoop.test.MetricsAsserts.*;
 import static org.apache.hadoop.test.MockitoMaker.*;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 import org.junit.Before;
@@ -37,7 +38,7 @@ public class TestQueueMetrics {
     MetricsSource userSource = userSource(ms, queueName, user);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0);
 
-    metrics.setAvailableQueueMemory(100*GB);
+    metrics.setAvailableResourcesToQueue(Resource.createResource(100*GB));
     metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
@@ -71,8 +72,8 @@ public class TestQueueMetrics {
     checkApps(queueSource, 1, 1, 0, 0, 0, 0);
     checkApps(userSource, 1, 1, 0, 0, 0, 0);
 
-    metrics.setAvailableQueueMemory(100*GB);
-    metrics.setAvailableUserMemory(user, 10*GB);
+    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
+    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
     metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
@@ -120,10 +121,10 @@ public class TestQueueMetrics {
     checkApps(userSource, 1, 1, 0, 0, 0, 0);
     checkApps(parentUserSource, 1, 1, 0, 0, 0, 0);
 
-    parentMetrics.setAvailableQueueMemory(100*GB);
-    metrics.setAvailableQueueMemory(100*GB);
-    parentMetrics.setAvailableUserMemory(user, 10*GB);
-    metrics.setAvailableUserMemory(user, 10*GB);
+    parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
+    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
+    parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
+    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
     metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
     checkResources(queueSource, 0, 0, 100, 15, 5);
     checkResources(parentQueueSource, 0, 0, 100, 15, 5);