You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/05/23 19:02:20 UTC
svn commit: r1126587 - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/
yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
yarn/yarn-api/src/main/java/org/apac...
Author: acmurthy
Date: Mon May 23 17:02:19 2011
New Revision: 1126587
URL: http://svn.apache.org/viewvc?rev=1126587&view=rev
Log:
Changed Scheduler to return available limit to AM in the allocate api.
Added:
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May 23 17:02:19 2011
@@ -3,6 +3,10 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+
+ Changed Scheduler to return available limit to AM in the allocate api.
+ (acmurthy)
+
Refactored RMContainerAllocator to release unused containers. (sharad)
Fix null pointer exception in kill task attempt (mahadev)
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon May 23 17:02:19 2011
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -229,7 +230,7 @@ public class TestRMContainerAllocator {
//RMContainerAllocator
@Override
- public synchronized List<Container> allocate(ApplicationId applicationId,
+ public synchronized Allocation allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release)
throws IOException {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
@@ -347,7 +348,9 @@ public class TestRMContainerAllocator {
List<Container> release = request.getReleaseList();
try {
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
- response.addAllContainers(resourceScheduler.allocate(status.getApplicationId(), ask, release));
+ Allocation allocation = resourceScheduler.allocate(status.getApplicationId(), ask, release);
+ response.addAllContainers(allocation.getContainers());
+ response.setAvailableResources(allocation.getResourceLimit());
AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class);
allocateResponse.setAMResponse(response);
return allocateResponse;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java Mon May 23 17:02:19 2011
@@ -1,21 +1,23 @@
package org.apache.hadoop.yarn.api.records;
import java.util.List;
-//TODO Check if this can replace AMRMProtocolResponse
public interface AMResponse {
- public abstract boolean getReboot();
- public abstract int getResponseId();
+ public boolean getReboot();
+ public int getResponseId();
- public abstract List<Container> getContainerList();
- public abstract Container getContainer(int index);
- public abstract int getContainerCount();
+ public List<Container> getContainerList();
+ public Container getContainer(int index);
+ public int getContainerCount();
- public abstract void setReboot(boolean reboot);
- public abstract void setResponseId(int responseId);
+ public void setReboot(boolean reboot);
+ public void setResponseId(int responseId);
- public abstract void addAllContainers(List<Container> containers);
- public abstract void addContainer(Container container);
- public abstract void removeContainer(int index);
- public abstract void clearContainers();
+ public void addAllContainers(List<Container> containers);
+ public void addContainer(Container container);
+ public void removeContainer(int index);
+ public void clearContainers();
+
+ public void setAvailableResources(Resource limit);
+ public Resource getAvailableResources();
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java Mon May 23 17:02:19 2011
@@ -8,9 +8,12 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeManagerInfoProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@@ -19,6 +22,8 @@ public class AMResponsePBImpl extends Pr
AMResponseProto.Builder builder = null;
boolean viaProto = false;
+ Resource limit;
+
private List<Container> containerList = null;
// private boolean hasLocalContainerList = false;
@@ -84,6 +89,28 @@ public class AMResponsePBImpl extends Pr
builder.setResponseId((responseId));
}
@Override
+ public Resource getAvailableResources() {
+ if (this.limit != null) {
+ return this.limit;
+ }
+
+ AMResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasLimit()) {
+ return null;
+ }
+ this.limit = convertFromProtoFormat(p.getLimit());
+ return this.limit;
+ }
+
+ @Override
+ public void setAvailableResources(Resource limit) {
+ maybeInitBuilder();
+ if (limit == null)
+ builder.clearLimit();
+ this.limit = limit;
+ }
+
+ @Override
public List<Container> getContainerList() {
initLocalContainerList();
return this.containerList;
@@ -183,6 +210,12 @@ public class AMResponsePBImpl extends Pr
return ((ContainerPBImpl)t).getProto();
}
+ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
+ return new ResourcePBImpl(p);
+ }
+ private ResourceProto convertToProtoFormat(Resource r) {
+ return ((ResourcePBImpl) r).getProto();
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto Mon May 23 17:02:19 2011
@@ -146,6 +146,7 @@ message AMResponseProto {
optional bool reboot = 1;
optional int32 response_id = 2;
repeated ContainerProto containers = 3;
+ optional ResourceProto limit = 4;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Mon May 23 17:02:19 2011
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationMasterHandler;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -160,10 +161,12 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
return allocateResponse;
}
applicationsManager.applicationHeartbeat(status);
- List<Container> containers = rScheduler.allocate(status.getApplicationId(), ask, release);
+ Allocation allocation =
+ rScheduler.allocate(status.getApplicationId(), ask, release);
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
- response.addAllContainers(containers);
+ response.addAllContainers(allocation.getContainers());
response.setResponseId(lastResponse.getResponseId() + 1);
+ response.setAvailableResources(allocation.getResourceLimit());
responseMap.put(status.getApplicationId(), response);
allocateResponse.setAMResponse(response);
return allocateResponse;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java Mon May 23 17:02:19 2011
@@ -146,7 +146,7 @@ class SchedulerNegotiator extends Abstra
AppContext masterInfo = it.next();
ApplicationId appId = masterInfo.getMaster().getApplicationId();
containers = scheduler.allocate(appId,
- EMPTY_ASK, EMPTY_RELEASE);
+ EMPTY_ASK, EMPTY_RELEASE).getContainers();
if (!containers.isEmpty()) {
// there should be only one container for an application
assert(containers.size() == 1);
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java?rev=1126587&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java Mon May 23 17:02:19 2011
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class Allocation {
+ final List<Container> containers;
+ final Resource resourceLimit;
+
+ public Allocation(List<Container> containers, Resource resourceLimit) {
+ this.containers = containers;
+ this.resourceLimit = resourceLimit;
+ }
+
+ public List<Container> getContainers() {
+ return containers;
+ }
+
+ public Resource getResourceLimit() {
+ return resourceLimit;
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Mon May 23 17:02:19 2011
@@ -1,20 +1,20 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
@@ -49,9 +49,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
/**
- * This class keeps track of all the consumption of an application.
- * This also keeps track of current running/completed
- * containers for the application.
+ * This class keeps track of all the consumption of an application. This also
+ * keeps track of current running/completed containers for the application.
*/
@LimitedPrivate("yarn")
@Evolving
@@ -60,46 +59,45 @@ public class Application {
final ApplicationId applicationId;
final Queue queue;
final String user;
- private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
- final Set<Priority> priorities =
- new TreeSet<Priority>(
- new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
- final Map<Priority, Map<String, ResourceRequest>> requests =
- new HashMap<Priority, Map<String, ResourceRequest>>();
- final Resource currentConsumption = recordFactory.newRecordInstance(Resource.class);
- final Resource overallConsumption = recordFactory.newRecordInstance(Resource.class);
+ private final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
- Map<Priority, Integer> schedulingOpportunities =
- new HashMap<Priority, Integer>();
+ final Set<Priority> priorities = new TreeSet<Priority>(
+ new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
+ final Map<Priority, Map<String, ResourceRequest>> requests = new HashMap<Priority, Map<String, ResourceRequest>>();
+ final Resource currentConsumption = recordFactory
+ .newRecordInstance(Resource.class);
+ final Resource overallConsumption = recordFactory
+ .newRecordInstance(Resource.class);
+ Resource resourceLimit = recordFactory.newRecordInstance(Resource.class);
+ Map<Priority, Integer> schedulingOpportunities = new HashMap<Priority, Integer>();
+
private final ApplicationStore store;
-
+
/* Current consumption */
List<Container> acquired = new ArrayList<Container>();
List<Container> completedContainers = new ArrayList<Container>();
/* Allocated by scheduler */
- List<Container> allocated = new ArrayList<Container>();
+ List<Container> allocated = new ArrayList<Container>();
Set<NodeInfo> applicationOnNodes = new HashSet<NodeInfo>();
ApplicationMaster master;
boolean pending = true; // for app metrics
-
+
/* Reserved containers */
- private final Comparator<NodeInfo> nodeComparator =
- new Comparator<NodeInfo>() {
+ private final Comparator<NodeInfo> nodeComparator = new Comparator<NodeInfo>() {
@Override
public int compare(NodeInfo o1, NodeInfo o2) {
return o1.getNodeID().getId() - o2.getNodeID().getId();
}
};
- final Map<Priority, Set<NodeInfo>> reservedContainers =
- new HashMap<Priority, Set<NodeInfo>>();
+ final Map<Priority, Set<NodeInfo>> reservedContainers = new HashMap<Priority, Set<NodeInfo>>();
public Application(ApplicationId applicationId, ApplicationMaster master,
Queue queue, String user, ApplicationStore store) {
this.applicationId = applicationId;
this.queue = queue;
- this.user = user;
+ this.user = user;
this.master = master;
this.store = store;
}
@@ -111,7 +109,7 @@ public class Application {
public Queue getQueue() {
return queue;
}
-
+
public String getUser() {
return user;
}
@@ -140,19 +138,21 @@ public class Application {
public synchronized void clearRequests() {
requests.clear();
}
-
+
/**
- * the currently acquired/allocated containers by the application masters.
+ * the currently acquired/allocated containers by the application masters.
+ *
* @return the current containers being used by the application masters.
*/
public synchronized List<Container> getCurrentContainers() {
- List<Container> currentContainers = new ArrayList<Container>(acquired);
+ List<Container> currentContainers = new ArrayList<Container>(acquired);
currentContainers.addAll(allocated);
return currentContainers;
}
/**
* The ApplicationMaster is acquiring the allocated/completed resources.
+ *
* @return allocated resources
*/
synchronized public List<Container> acquire() {
@@ -165,22 +165,23 @@ public class Application {
Resources.addTo(overallConsumption, container.getResource());
}
- LOG.debug("acquire:" +
- " application=" + applicationId +
- " #acquired=" + heartbeatContainers.size());
- heartbeatContainers = (heartbeatContainers == null) ?
- new ArrayList<Container>() : heartbeatContainers;
-
- heartbeatContainers.addAll(completedContainers);
- completedContainers.clear();
- return heartbeatContainers;
+ LOG.debug("acquire:" + " application=" + applicationId + " #acquired="
+ + heartbeatContainers.size());
+ heartbeatContainers = (heartbeatContainers == null) ? new ArrayList<Container>()
+ : heartbeatContainers;
+
+ heartbeatContainers.addAll(completedContainers);
+ completedContainers.clear();
+ return heartbeatContainers;
}
/**
- * The ApplicationMaster is updating resource requirements for the
- * application, by asking for more resources and releasing resources
- * acquired by the application.
- * @param requests resources to be acquired
+ * The ApplicationMaster is updating resource requirements for the
+ * application, by asking for more resources and releasing resources acquired
+ * by the application.
+ *
+ * @param requests
+ * resources to be acquired
*/
synchronized public void updateResourceRequests(List<ResourceRequest> requests) {
QueueMetrics metrics = queue.getMetrics();
@@ -192,9 +193,8 @@ public class Application {
ResourceRequest lastRequest = null;
if (hostName.equals(NodeManager.ANY)) {
- LOG.debug("update:" +
- " application=" + applicationId +
- " request=" + request);
+ LOG.debug("update:" + " application=" + applicationId + " request="
+ + request);
updatePendingResources = true;
}
@@ -211,28 +211,26 @@ public class Application {
asks.put(hostName, request);
if (updatePendingResources) {
- int lastRequestContainers = lastRequest != null ?
- lastRequest.getNumContainers() : 0;
- Resource lastRequestCapability = lastRequest != null ?
- lastRequest.getCapability() : Resources.none();
- metrics.incrPendingResources(user,
- request.getNumContainers() - lastRequestContainers,
- Resources.subtractFrom( // save a clone
- Resources.multiply(request.getCapability(),
- request.getNumContainers()),
- Resources.multiply(lastRequestCapability,
- lastRequestContainers)));
+ int lastRequestContainers = lastRequest != null ? lastRequest
+ .getNumContainers() : 0;
+ Resource lastRequestCapability = lastRequest != null ? lastRequest
+ .getCapability() : Resources.none();
+ metrics.incrPendingResources(user, request.getNumContainers()
+ - lastRequestContainers, Resources.subtractFrom( // save a clone
+ Resources.multiply(request.getCapability(), request
+ .getNumContainers()), Resources.multiply(lastRequestCapability,
+ lastRequestContainers)));
}
}
}
public synchronized void releaseContainers(List<Container> release) {
- // Release containers and update consumption
+ // Release containers and update consumption
for (Container container : release) {
- LOG.debug("update: " +
- "application=" + applicationId + " released=" + container);
+ LOG.debug("update: " + "application=" + applicationId + " released="
+ + container);
Resources.subtractFrom(currentConsumption, container.getResource());
- for (Iterator<Container> i=acquired.iterator(); i.hasNext();) {
+ for (Iterator<Container> i = acquired.iterator(); i.hasNext();) {
Container c = i.next();
if (c.getId().equals(container.getId())) {
i.remove();
@@ -246,21 +244,25 @@ public class Application {
return priorities;
}
- synchronized public Map<String, ResourceRequest>
- getResourceRequests(Priority priority) {
+ synchronized public Map<String, ResourceRequest> getResourceRequests(
+ Priority priority) {
return requests.get(priority);
}
- synchronized public ResourceRequest getResourceRequest(Priority priority,
+ synchronized public ResourceRequest getResourceRequest(Priority priority,
String nodeAddress) {
Map<String, ResourceRequest> nodeRequests = requests.get(priority);
return (nodeRequests == null) ? null : nodeRequests.get(nodeAddress);
}
- synchronized public void completedContainer(Container container) {
- LOG.info("Completed container: " + container);
- completedContainers.add(container);
- queue.getMetrics().releaseResources(user, 1, container.getResource());
+ synchronized public void completedContainer(Container container,
+ Resource containerResource) {
+ if (container != null) {
+ LOG.info("Completed container: " + container);
+ completedContainers.add(container);
+ }
+ queue.getMetrics().releaseResources(user, 1, containerResource);
+ Resources.subtractFrom(currentConsumption, containerResource);
}
synchronized public void completedContainers(List<Container> containers) {
@@ -268,15 +270,21 @@ public class Application {
}
/**
- * Resources have been allocated to this application by the resource scheduler.
- * Track them.
- * @param type the type of the node
- * @param node the nodeinfo of the node
- * @param priority the priority of the request.
- * @param request the request
- * @param containers the containers allocated.
+ * Resources have been allocated to this application by the resource
+ * scheduler. Track them.
+ *
+ * @param type
+ * the type of the node
+ * @param node
+ * the nodeinfo of the node
+ * @param priority
+ * the priority of the request.
+ * @param request
+ * the request
+ * @param containers
+ * the containers allocated.
*/
- synchronized public void allocate(NodeType type, NodeInfo node,
+ synchronized public void allocate(NodeType type, NodeInfo node,
Priority priority, ResourceRequest request, List<Container> containers) {
applicationOnNodes.add(node);
if (type == NodeType.DATA_LOCAL) {
@@ -293,81 +301,90 @@ public class Application {
pending = false;
metrics.incrAppsRunning(user);
}
- LOG.debug("allocate: user: "+ user +", memory: "+ request.getCapability());
+ LOG.debug("allocate: user: " + user + ", memory: "
+ + request.getCapability());
metrics.allocateResources(user, containers.size(), request.getCapability());
}
/**
- * The {@link ResourceScheduler} is allocating data-local resources
- * to the application.
- * @param allocatedContainers resources allocated to the application
+ * The {@link ResourceScheduler} is allocating data-local resources to the
+ * application.
+ *
+ * @param allocatedContainers
+ * resources allocated to the application
*/
- synchronized private void allocateNodeLocal(NodeInfo node,
- Priority priority, ResourceRequest nodeLocalRequest,
- List<Container> containers) {
+ synchronized private void allocateNodeLocal(NodeInfo node, Priority priority,
+ ResourceRequest nodeLocalRequest, List<Container> containers) {
// Update consumption and track allocations
allocate(containers);
// Update future requirements
- nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - containers.size());
+ nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers()
+ - containers.size());
if (nodeLocalRequest.getNumContainers() == 0) {
this.requests.get(priority).remove(node.getNodeAddress());
}
-
- ResourceRequest rackLocalRequest =
- requests.get(priority).get(node.getRackName());
- rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - containers.size());
+
+ ResourceRequest rackLocalRequest = requests.get(priority).get(
+ node.getRackName());
+ rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers()
+ - containers.size());
if (rackLocalRequest.getNumContainers() == 0) {
this.requests.get(priority).remove(node.getRackName());
}
-
+
// Do not remove ANY
- ResourceRequest offSwitchRequest =
- requests.get(priority).get(NodeManager.ANY);
- offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - containers.size());
+ ResourceRequest offSwitchRequest = requests.get(priority).get(
+ NodeManager.ANY);
+ offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
+ - containers.size());
}
/**
- * The {@link ResourceScheduler} is allocating data-local resources
- * to the application.
- * @param allocatedContainers resources allocated to the application
+ * The {@link ResourceScheduler} is allocating data-local resources to the
+ * application.
+ *
+ * @param allocatedContainers
+ * resources allocated to the application
*/
- synchronized private void allocateRackLocal(NodeInfo node,
- Priority priority, ResourceRequest rackLocalRequest,
- List<Container> containers) {
+ synchronized private void allocateRackLocal(NodeInfo node, Priority priority,
+ ResourceRequest rackLocalRequest, List<Container> containers) {
// Update consumption and track allocations
allocate(containers);
// Update future requirements
- rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - containers.size());
+ rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers()
+ - containers.size());
if (rackLocalRequest.getNumContainers() == 0) {
this.requests.get(priority).remove(node.getRackName());
}
-
+
// Do not remove ANY
- ResourceRequest offSwitchRequest =
- requests.get(priority).get(NodeManager.ANY);
- offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - containers.size());
-}
+ ResourceRequest offSwitchRequest = requests.get(priority).get(
+ NodeManager.ANY);
+ offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
+ - containers.size());
+ }
/**
- * The {@link ResourceScheduler} is allocating data-local resources
- * to the application.
- * @param allocatedContainers resources allocated to the application
+ * The {@link ResourceScheduler} is allocating data-local resources to the
+ * application.
+ *
+ * @param allocatedContainers
+ * resources allocated to the application
*/
- synchronized private void allocateOffSwitch(NodeInfo node,
- Priority priority, ResourceRequest offSwitchRequest,
- List<Container> containers) {
+ synchronized private void allocateOffSwitch(NodeInfo node, Priority priority,
+ ResourceRequest offSwitchRequest, List<Container> containers) {
// Update consumption and track allocations
allocate(containers);
// Update future requirements
-
+
// Do not remove ANY
- offSwitchRequest.setNumContainers(
- offSwitchRequest.getNumContainers() - containers.size());
+ offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
+ - containers.size());
}
synchronized public void allocate(List<Container> containers) {
@@ -378,61 +395,63 @@ public class Application {
allocated.add(container);
try {
store.storeContainer(container);
- } catch(IOException ie) {
- //TODO fix this. we shouldnt ignore
+ } catch (IOException ie) {
+ // TODO fix this. we shouldnt ignore
}
- LOG.debug("allocate: applicationId=" + applicationId +
- " container=" + container.getId() + " host=" + container.getContainerManagerAddress());
+ LOG.debug("allocate: applicationId=" + applicationId + " container="
+ + container.getId() + " host="
+ + container.getContainerManagerAddress());
}
}
synchronized public void resetSchedulingOpportunities(Priority priority) {
- Integer schedulingOpportunities = this.schedulingOpportunities.get(priority);
+ Integer schedulingOpportunities = this.schedulingOpportunities
+ .get(priority);
schedulingOpportunities = 0;
this.schedulingOpportunities.put(priority, schedulingOpportunities);
}
-
+
synchronized public void addSchedulingOpportunity(Priority priority) {
- Integer schedulingOpportunities = this.schedulingOpportunities.get(priority);
+ Integer schedulingOpportunities = this.schedulingOpportunities
+ .get(priority);
if (schedulingOpportunities == null) {
schedulingOpportunities = 0;
}
++schedulingOpportunities;
this.schedulingOpportunities.put(priority, schedulingOpportunities);
}
-
+
synchronized public int getSchedulingOpportunities(Priority priority) {
- Integer schedulingOpportunities = this.schedulingOpportunities.get(priority);
+ Integer schedulingOpportunities = this.schedulingOpportunities
+ .get(priority);
if (schedulingOpportunities == null) {
schedulingOpportunities = 0;
this.schedulingOpportunities.put(priority, schedulingOpportunities);
}
return schedulingOpportunities;
}
-
+
synchronized public void showRequests() {
for (Priority priority : getPriorities()) {
Map<String, ResourceRequest> requests = getResourceRequests(priority);
if (requests != null) {
+ LOG.debug("showRequests:" + " application=" + applicationId +
+ " available=" + getResourceLimit() + " current=" + currentConsumption);
for (ResourceRequest request : requests.values()) {
- LOG.debug("showRequests:" +
- " application=" + applicationId +
- " request=" + request);
+ LOG.debug("showRequests:" + " application=" + applicationId
+ + " request=" + request);
}
}
}
}
-
+
synchronized public List<NodeInfo> getAllNodesForApplication() {
return new ArrayList<NodeInfo>(applicationOnNodes);
}
-
- synchronized public org.apache.hadoop.yarn.api.records.Application
- getApplicationInfo() {
- org.apache.hadoop.yarn.api.records.Application application =
- recordFactory.newRecordInstance(
- org.apache.hadoop.yarn.api.records.Application.class);
+ synchronized public org.apache.hadoop.yarn.api.records.Application getApplicationInfo() {
+ org.apache.hadoop.yarn.api.records.Application application = recordFactory
+ .newRecordInstance(org.apache.hadoop.yarn.api.records.Application.class);
application.setApplicationId(applicationId);
application.setMasterHost("");
application.setName("");
@@ -440,8 +459,8 @@ public class Application {
application.setState(ApplicationState.RUNNING);
application.setUser(user);
- ApplicationStatus status =
- recordFactory.newRecordInstance(ApplicationStatus.class);
+ ApplicationStatus status = recordFactory
+ .newRecordInstance(ApplicationStatus.class);
status.setApplicationId(applicationId);
application.setStatus(status);
@@ -461,9 +480,9 @@ public class Application {
reservedContainers.put(priority, reservedNodes);
}
reservedNodes.add(node);
- LOG.info("Application " + applicationId + " reserved " + resource +
- " on node " + node + ", currently has " + reservedNodes.size() +
- " at priority " + priority);
+ LOG.info("Application " + applicationId + " reserved " + resource
+ + " on node " + node + ", currently has " + reservedNodes.size()
+ + " at priority " + priority);
}
public synchronized void unreserveResource(NodeInfo node, Priority priority) {
@@ -473,14 +492,14 @@ public class Application {
this.reservedContainers.remove(priority);
}
- LOG.info("Application " + applicationId + " unreserved " +
- " on node " + node + ", currently has " + reservedNodes.size() +
- " at priority " + priority);
+ LOG.info("Application " + applicationId + " unreserved " + " on node "
+ + node + ", currently has " + reservedNodes.size() + " at priority "
+ + priority);
}
public synchronized boolean isReserved(NodeInfo node, Priority priority) {
Set<NodeInfo> reservedNodes = reservedContainers.get(priority);
- if (reservedNodes != null) {
+ if (reservedNodes != null) {
return reservedNodes.contains(node);
}
return false;
@@ -488,10 +507,10 @@ public class Application {
public float getLocalityWaitFactor(Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
- int requiredResources = Math.max(this.requests.get(priority).size()-1, 1);
- return ((float)requiredResources / clusterNodes);
+ int requiredResources = Math.max(this.requests.get(priority).size() - 1, 1);
+ return ((float) requiredResources / clusterNodes);
}
-
+
synchronized public void finish() {
// GC pending resources metrics
QueueMetrics metrics = queue.getMetrics();
@@ -499,8 +518,8 @@ public class Application {
ResourceRequest request = asks.get(NodeManager.ANY);
if (request != null) {
metrics.decrPendingResources(user, request.getNumContainers(),
- Resources.multiply(request.getCapability(),
- request.getNumContainers()));
+ Resources.multiply(request.getCapability(), request
+ .getNumContainers()));
}
}
}
@@ -508,4 +527,21 @@ public class Application {
public Map<Priority, Set<NodeInfo>> getAllReservations() {
return new HashMap<Priority, Set<NodeInfo>>(reservedContainers);
}
+
+ public synchronized void setAvailableResourceLimit(Resource globalLimit) {
+ resourceLimit = Resources.subtract(globalLimit, currentConsumption);
+
+ // Corner case to deal with applications being slightly over-limit
+ if (resourceLimit.getMemory() < 0) {
+ resourceLimit.setMemory(0);
+ }
+ }
+
+ /**
+ * Get available headroom in terms of resources for the application's user.
+ * @return available resource headroom
+ */
+ public synchronized Resource getResourceLimit() {
+ return resourceLimit;
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Mon May 23 17:02:19 2011
@@ -171,22 +171,22 @@ public class QueueMetrics {
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
- * @param mb memory in MB
+ * @param limit resource limit
*/
- public void setAvailableQueueMemory(int mb) {
- availableGB.set(mb/GB);
+ public void setAvailableResourcesToQueue(Resource limit) {
+ availableGB.set(limit.getMemory()/GB);
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
* @param user
- * @param mb memory in MB
+ * @param limit resource limit
*/
- public void setAvailableUserMemory(String user, int mb) {
+ public void setAvailableResourcesToUser(String user, Resource limit) {
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
- userMetrics.setAvailableQueueMemory(mb);
+ userMetrics.setAvailableResourcesToQueue(limit);
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Mon May 23 17:02:19 2011
@@ -42,10 +42,10 @@ public interface YarnScheduler {
* @param applicationId
* @param ask
* @param release
- * @return
+ * @return the scheduler's {@link Allocation} response
* @throws IOException
*/
- List<Container> allocate(ApplicationId applicationId,
+ Allocation allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release)
throws IOException;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon May 23 17:02:19 2011
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -300,7 +301,7 @@ implements ResourceScheduler, CapacitySc
}
@Override
- public List<Container> allocate(ApplicationId applicationId,
+ public Allocation allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release)
throws IOException {
@@ -308,8 +309,9 @@ implements ResourceScheduler, CapacitySc
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + applicationId);
- return EMPTY_CONTAINER_LIST;
+ return new Allocation(EMPTY_CONTAINER_LIST, Resources.none());
}
+
normalizeRequests(ask);
LOG.info("DEBUG --- allocate: pre-update" +
@@ -332,7 +334,7 @@ implements ResourceScheduler, CapacitySc
" #ask=" + ask.size() +
" #release=" + release.size() +
" #allContainers=" + allContainers.size());
- return allContainers;
+ return new Allocation(allContainers, application.getResourceLimit());
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon May 23 17:02:19 2011
@@ -528,7 +528,7 @@ public class LeafQueue implements Queue
}
// User limits
- if (!assignToUser(application.getUser(), clusterResource, required.getCapability())) {
+ if (!assignToUser(application, clusterResource, required.getCapability())) {
continue;
}
@@ -615,8 +615,8 @@ public class LeafQueue implements Queue
return true;
}
- private synchronized boolean assignToUser(String userName, Resource clusterResource,
- Resource required) {
+ private synchronized boolean assignToUser(Application application,
+ Resource clusterResource, Resource required) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
// we're running below capacity. The 'max' ensures that jobs in queues
@@ -642,6 +642,8 @@ public class LeafQueue implements Queue
// Also, the queue's configured capacity should be higher than
// queue-hard-limit * ulMin
+ String userName = application.getUser();
+
final int activeUsers = users.size();
User user = getUser(userName);
@@ -652,8 +654,8 @@ public class LeafQueue implements Queue
(int)(queueCapacity * userLimitFactor)
);
- metrics.setAvailableUserMemory(userName,
- limit - user.getConsumedResources().getMemory());
+ application.setAvailableResourceLimit(Resources.createResource(limit));
+ metrics.setAvailableResourcesToUser(userName, application.getResourceLimit());
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, so...
@@ -935,16 +937,14 @@ public class LeafQueue implements Queue
@Override
public void completedContainer(Resource clusterResource,
- Container container, Resource allocatedResource, Application application) {
+ Container container, Resource containerResource, Application application) {
if (application != null) {
// Careful! Locking order is important!
synchronized (this) {
- // Inform the application iff this was an allocated container,
- // as opposed to an unfulfilled reservation
- if (container != null) {
- application.completedContainer(container);
- }
+ // Inform the application - this might be an allocated container or
+ // an unfulfilled reservation
+ application.completedContainer(container, containerResource);
// Book-keeping
releaseResource(clusterResource,
@@ -952,7 +952,7 @@ public class LeafQueue implements Queue
LOG.info("completedContainer" +
" container=" + container +
- " resource=" + allocatedResource +
+ " resource=" + containerResource +
" queue=" + this +
" util=" + getUtilization() +
" used=" + usedResources +
@@ -961,7 +961,7 @@ public class LeafQueue implements Queue
// Inform the parent queue
parent.completedContainer(clusterResource, container,
- allocatedResource, application);
+ containerResource, application);
}
}
@@ -987,10 +987,15 @@ public class LeafQueue implements Queue
@Override
public synchronized void updateResource(Resource clusterResource) {
- float memLimit = clusterResource.getMemory() * absoluteCapacity;
- setUtilization(usedResources.getMemory() / memLimit);
- setUsedCapacity(usedResources.getMemory() / (clusterResource.getMemory() * capacity));
- metrics.setAvailableQueueMemory((int) memLimit - usedResources.getMemory());
+ float queueLimit = clusterResource.getMemory() * absoluteCapacity;
+ setUtilization(usedResources.getMemory() / queueLimit);
+ setUsedCapacity(
+ usedResources.getMemory() / (clusterResource.getMemory() * capacity));
+
+ Resource resourceLimit =
+ Resources.createResource((int)queueLimit);
+ metrics.setAvailableResourcesToQueue(
+ Resources.subtractFrom(resourceLimit, usedResources));
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Mon May 23 17:02:19 2011
@@ -619,13 +619,13 @@ public class ParentQueue implements Queu
@Override
public void completedContainer(Resource clusterResource,
- Container container, Resource allocatedResource,
+ Container container, Resource containerResource,
Application application) {
if (application != null) {
// Careful! Locking order is important!
// Book keeping
synchronized (this) {
- releaseResource(clusterResource, allocatedResource);
+ releaseResource(clusterResource, containerResource);
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@@ -637,7 +637,7 @@ public class ParentQueue implements Queu
// Inform the parent
if (parent != null) {
parent.completedContainer(clusterResource, container,
- allocatedResource, application);
+ containerResource, application);
}
}
}
@@ -658,10 +658,15 @@ public class ParentQueue implements Queu
@Override
public synchronized void updateResource(Resource clusterResource) {
- float memLimit = clusterResource.getMemory() * absoluteCapacity;
- setUtilization(usedResources.getMemory() / memLimit);
- setUsedCapacity(usedResources.getMemory() / (clusterResource.getMemory() * capacity));
- metrics.setAvailableQueueMemory((int) memLimit - usedResources.getMemory());
+ float queueLimit = clusterResource.getMemory() * absoluteCapacity;
+ setUtilization(usedResources.getMemory() / queueLimit);
+ setUsedCapacity(
+ usedResources.getMemory() / (clusterResource.getMemory() * capacity));
+
+ Resource resourceLimit =
+ Resources.createResource((int)queueLimit);
+ metrics.setAvailableResourcesToQueue(
+ Resources.subtractFrom(resourceLimit, usedResources));
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Mon May 23 17:02:19 2011
@@ -170,11 +170,11 @@ extends org.apache.hadoop.yarn.server.re
* @param clusterResource the resource of the cluster
* @param container completed container,
* <code>null</code> if it was just a reservation
- * @param allocatedResource allocated resource
+ * @param containerResource allocated resource
* @param application application to which the container was assigned
*/
public void completedContainer(Resource clusterResource,
- Container container, Resource allocatedResource,
+ Container container, Resource containerResource,
Application application);
/**
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Mon May 23 17:02:19 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -167,14 +168,14 @@ public class FifoScheduler implements Re
}
@Override
- public synchronized List<Container> allocate(ApplicationId applicationId,
+ public synchronized Allocation allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release)
throws IOException {
Application application = getApplication(applicationId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationId);
- return EMPTY_CONTAINER_LIST;
+ return new Allocation(EMPTY_CONTAINER_LIST, Resources.none());
}
normalizeRequests(ask);
@@ -200,7 +201,7 @@ public class FifoScheduler implements Re
" #ask=" + ask.size() +
" #release=" + release.size() +
" #allContainers=" + allContainers.size());
- return allContainers;
+ return new Allocation(allContainers, application.getResourceLimit());
}
private void releaseContainers(Application application, List<Container> release) {
@@ -294,6 +295,9 @@ public class FifoScheduler implements Re
}
}
}
+
+ application.setAvailableResourceLimit(clusterResource);
+
LOG.debug("post-assignContainers");
application.showRequests();
@@ -475,7 +479,7 @@ public class FifoScheduler implements Re
* the nodemanger is just updating about a completed container.
*/
if (app != null) {
- app.completedContainer(c);
+ app.completedContainer(c, c.getResource());
}
}
}
@@ -507,8 +511,8 @@ public class FifoScheduler implements Re
MINIMUM_ALLOCATION)) {
assignContainers(node);
}
- metrics.setAvailableQueueMemory(
- clusterResource.getMemory() - usedResource.getMemory());
+ metrics.setAvailableResourcesToQueue(
+ Resources.subtract(clusterResource, usedResource));
LOG.info("Node after allocation " + node.getNodeID() + " resource = "
+ node.getAvailableResource());
@@ -541,7 +545,6 @@ public class FifoScheduler implements Re
}
- private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Mon May 23 17:02:19 2011
@@ -268,7 +268,7 @@ public class Application {
List<Container> response =
resourceManager.getResourceScheduler().allocate(applicationId,
new ArrayList<ResourceRequest>(ask),
- new ArrayList<Container>(release));
+ new ArrayList<Container>(release)).getContainers();
List<Container> containers = new ArrayList<Container>(response.size());
for (Container container : response) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java Mon May 23 17:02:19 2011
@@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.junit.After;
import org.junit.Before;
@@ -71,9 +73,9 @@ public class TestAMLaunchFailure extends
private Container container = recordFactory.newRecordInstance(Container.class);
@Override
- public List<Container> allocate(ApplicationId applicationId,
+ public Allocation allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release) throws IOException {
- return Arrays.asList(container);
+ return new Allocation(Arrays.asList(container), Resources.none());
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java Mon May 23 17:02:19 2011
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.junit.After;
import org.junit.Before;
@@ -81,7 +82,7 @@ public class TestAMRMRPCResponseId exten
private class DummyScheduler implements YarnScheduler {
@Override
- public List<Container> allocate(ApplicationId applicationId,
+ public Allocation allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release) throws IOException {
return null;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Mon May 23 17:02:19 2011
@@ -41,8 +41,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -140,7 +142,7 @@ public class TestAMRestart extends TestC
}
@Override
- public List<Container> allocate(ApplicationId applicationId,
+ public Allocation allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release) throws IOException {
Container container = recordFactory.newRecordInstance(Container.class);
container.setContainerToken(recordFactory.newRecordInstance(ContainerToken.class));
@@ -150,7 +152,7 @@ public class TestAMRestart extends TestC
container.getId().setAppId(appID);
container.getId().setId(count);
count++;
- return Arrays.asList(container);
+ return new Allocation(Arrays.asList(container), Resources.none());
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java Mon May 23 17:02:19 2011
@@ -50,8 +50,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -72,7 +74,7 @@ public class TestSchedulerNegotiator ext
private class DummyScheduler implements ResourceScheduler {
@Override
- public List<Container> allocate(ApplicationId applicationId,
+ public Allocation allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release) throws IOException {
ArrayList<Container> containers = new ArrayList<Container>();
Container container = recordFactory.newRecordInstance(Container.class);
@@ -80,7 +82,7 @@ public class TestSchedulerNegotiator ext
container.getId().setAppId(applicationId);
container.getId().setId(testNum);
containers.add(container);
- return containers;
+ return new Allocation(containers, Resources.none());
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1126587&r1=1126586&r2=1126587&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Mon May 23 17:02:19 2011
@@ -8,6 +8,7 @@ import org.apache.hadoop.metrics2.lib.De
import static org.apache.hadoop.test.MetricsAsserts.*;
import static org.apache.hadoop.test.MockitoMaker.*;
import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.junit.Before;
@@ -37,7 +38,7 @@ public class TestQueueMetrics {
MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
- metrics.setAvailableQueueMemory(100*GB);
+ metrics.setAvailableResourcesToQueue(Resource.createResource(100*GB));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
@@ -71,8 +72,8 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
checkApps(userSource, 1, 1, 0, 0, 0, 0);
- metrics.setAvailableQueueMemory(100*GB);
- metrics.setAvailableUserMemory(user, 10*GB);
+ metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
+ metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
@@ -120,10 +121,10 @@ public class TestQueueMetrics {
checkApps(userSource, 1, 1, 0, 0, 0, 0);
checkApps(parentUserSource, 1, 1, 0, 0, 0, 0);
- parentMetrics.setAvailableQueueMemory(100*GB);
- metrics.setAvailableQueueMemory(100*GB);
- parentMetrics.setAvailableUserMemory(user, 10*GB);
- metrics.setAvailableUserMemory(user, 10*GB);
+ parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
+ metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
+ parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
+ metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
checkResources(queueSource, 0, 0, 100, 15, 5);
checkResources(parentQueueSource, 0, 0, 100, 15, 5);