You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC
svn commit: r1082677 [34/38] - in /hadoop/mapreduce/branches/MR-279: ./
assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/
mr-client/hadoop-mapreduce-client-app/src/
mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,339 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+import javax.xml.crypto.NodeSetData;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
+import org.apache.hadoop.yarn.server.YarnServerConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.HeartbeatResponse;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.NodeStatus;
+import org.apache.hadoop.yarn.RegistrationResponse;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceTracker;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
+
+/**
+ * This class is responsible for the interaction with the NodeManagers.
+ * All the interactions with the NodeManagers happens via this interface.
+ *`
+ */
+public class RMResourceTrackerImpl extends AbstractService implements
+ResourceTracker, RMResourceTracker, ResourceContext {
+ private static final Log LOG = LogFactory.getLog(RMResourceTrackerImpl.class);
+ /* we dont garbage collect on nodes. A node can come back up again and re register,
+ * so no use garbage collecting. Though admin can break the RM by bouncing
+ * nodemanagers on different ports again and again.
+ */
+ private Map<String, NodeID> nodes = new ConcurrentHashMap<String, NodeID>();
+ private final Map<NodeID, NodeInfoTracker> nodeManagers =
+ new ConcurrentHashMap<NodeID, NodeInfoTracker>();
+ private final HeartBeatThread heartbeatThread;
+ private final TreeSet<NodeStatus> nmExpiryQueue =
+ new TreeSet<NodeStatus>(
+ new Comparator<NodeStatus>() {
+ public int compare(NodeStatus p1, NodeStatus p2) {
+ if (p1.lastSeen < p2.lastSeen) {
+ return -1;
+ } else if (p1.lastSeen > p2.lastSeen) {
+ return 1;
+ } else {
+ return (p1.nodeId.id -
+ p2.nodeId.id);
+ }
+ }
+ }
+ );
+
+ private ResourceListener resourceListener;
+ private InetSocketAddress resourceTrackerAddress;
+ private Server server;
+ private final ContainerTokenSecretManager containerTokenSecretManager;
+ private final AtomicInteger nodeCounter = new AtomicInteger(0);
+ private static final HeartbeatResponse reboot = new HeartbeatResponse();
+ private long nmExpiryInterval;
+
+ public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager) {
+ super(RMResourceTrackerImpl.class.getName());
+ reboot.reboot = true;
+ this.containerTokenSecretManager = containerTokenSecretManager;
+ this.heartbeatThread = new HeartBeatThread();
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ String resourceTrackerBindAddress =
+ conf.get(YarnServerConfig.RESOURCETRACKER_ADDRESS,
+ YarnServerConfig.DEFAULT_RESOURCETRACKER_BIND_ADDRESS);
+ resourceTrackerAddress = NetUtils.createSocketAddr(resourceTrackerBindAddress);
+ this.nmExpiryInterval = conf.getLong(YarnConfiguration.NM_EXPIRY_INTERVAL,
+ YarnConfiguration.DEFAULT_NM_EXPIRY_INTERVAL);
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ // ResourceTrackerServer authenticates NodeManager via Kerberos if
+ // security is enabled, so no secretManager.
+ YarnRPC rpc = YarnRPC.create(getConfig());
+ Configuration rtServerConf = new Configuration(getConfig());
+ rtServerConf.setClass(
+ CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ RMNMSecurityInfoClass.class, SecurityInfo.class);
+ this.server =
+ rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
+ rtServerConf, null);
+ this.server.start();
+ this.heartbeatThread.start();
+ LOG.info("Expiry interval of NodeManagers set to " + nmExpiryInterval);
+ super.start();
+ }
+
+ /**
+ * resolving the network topology.
+ * @param hostName the hostname of this node.
+ * @return the resolved {@link Node} for this nodemanager.
+ */
+ public static Node resolve(String hostName) {
+ return new NodeBase(hostName, NetworkTopology.DEFAULT_RACK);
+ }
+
+ @Override
+ public RegistrationResponse registerNodeManager(CharSequence node,
+ Resource capability) throws AvroRemoteException {
+ NodeID nodeId = getNodeId(node);
+ NodeInfoTracker nTracker = null;
+
+ synchronized(nodeManagers) {
+ if (!nodeManagers.containsKey(nodeId)) {
+ /* we do the resolving here, so that scheduler does not have to do it */
+ NodeInfo nodeManager = resourceListener.addNode(nodeId, node.toString(),
+ resolve(node.toString()),
+ capability);
+ HeartbeatResponse response = new HeartbeatResponse();
+ response.responseId = 0;
+ nTracker = new NodeInfoTracker(nodeManager, response);
+ nodeManagers.put(nodeId, nTracker);
+ } else {
+ nTracker = nodeManagers.get(nodeId);
+ NodeStatus status = nTracker.getNodeStatus();
+ status.lastSeen = System.currentTimeMillis();
+ nTracker.updateNodeStatus(status);
+ }
+ }
+ addForTracking(nTracker.getNodeStatus());
+ LOG.info("NodeManager from node " + node + " registered with capability: " +
+ capability.memory + ", assigned nodeId " + nodeId.id);
+
+ RegistrationResponse regResponse = new RegistrationResponse();
+ regResponse.nodeID = nodeId;
+ SecretKey secretKey =
+ this.containerTokenSecretManager.createAndGetSecretKey(node);
+ regResponse.secretKey = ByteBuffer.wrap(secretKey.getEncoded());
+ return regResponse;
+ }
+
+ @Override
+ public HeartbeatResponse nodeHeartbeat(NodeStatus nodeStatus)
+ throws AvroRemoteException {
+ nodeStatus.lastSeen = System.currentTimeMillis();
+ NodeInfoTracker nTracker = null;
+ synchronized(nodeManagers) {
+ nTracker = nodeManagers.get(nodeStatus.nodeId);
+ }
+ if (nTracker == null) {
+ /* node does not exist */
+ LOG.info("Node not found rebooting " + nodeStatus.nodeId);
+ return reboot;
+ }
+
+ NodeInfo nodeInfo = nTracker.getNodeInfo();
+ /* check to see if its an old heartbeat */
+ if (nodeStatus.responseId + 1 == nTracker.getLastHeartBeatResponse().responseId) {
+ return nTracker.getLastHeartBeatResponse();
+ } else if (nodeStatus.responseId + 1 < nTracker.getLastHeartBeatResponse().responseId) {
+ LOG.info("Too far behind rm response id:" +
+ nTracker.lastHeartBeatResponse.responseId + " nm response id:" + nodeStatus.responseId);
+ return reboot;
+ }
+
+ /* inform any listeners of node heartbeats */
+ NodeResponse nodeResponse = resourceListener.nodeUpdate(
+ nodeInfo, nodeStatus.containers);
+
+
+ HeartbeatResponse response = new HeartbeatResponse();
+ response.containersToCleanup = nodeResponse.getContainersToCleanUp();
+
+
+ response.appplicationsToCleanup = nodeResponse.getFinishedApplications();
+ response.responseId = nTracker.getLastHeartBeatResponse().responseId + 1;
+
+ nTracker.refreshHeartBeatResponse(response);
+ nTracker.updateNodeStatus(nodeStatus);
+ return response;
+ }
+
+ @Private
+ public synchronized NodeInfo getNodeManager(NodeID nodeId) {
+ NodeInfoTracker ntracker = nodeManagers.get(nodeId);
+ return (ntracker == null ? null: ntracker.getNodeInfo());
+ }
+
+ private synchronized NodeID getNodeId(CharSequence node) {
+ NodeID nodeId;
+ nodeId = nodes.get(node);
+ if (nodeId == null) {
+ nodeId = new NodeID();
+ nodeId.id = nodeCounter.getAndIncrement();
+ nodes.put(node.toString(), nodeId);
+ }
+ return nodeId;
+ }
+
+ @Override
+ public synchronized YarnClusterMetrics getClusterMetrics() {
+ YarnClusterMetrics ymetrics = new YarnClusterMetrics();
+ ymetrics.numNodeManagers = nodeManagers.size();
+ return ymetrics;
+ }
+
+ @Override
+ public void stop() {
+ if (this.server != null) {
+ this.server.close();
+ }
+ super.stop();
+ }
+
+ @Override
+ public synchronized void register(ResourceListener listener) {
+ //for now there is only one resource listener, so we dont
+ //really add it to a list.
+ this.resourceListener = listener;
+ }
+
+ @Override
+ public synchronized void unregister(ResourceListener listener) {
+ //TODO make the listener so that it dumps to a void listener
+ //rather than nullifying it.
+ }
+
+ @Override
+ public List<NodeInfo> getAllNodeInfo() {
+ List<NodeInfo> infoList = new ArrayList<NodeInfo>();
+ synchronized (nodeManagers) {
+ for (NodeInfoTracker t : nodeManagers.values()) {
+ infoList.add(t.getNodeInfo());
+ }
+ }
+ return infoList;
+ }
+
+ protected void addForTracking(NodeStatus status) {
+ synchronized(nmExpiryQueue) {
+ nmExpiryQueue.add(status);
+ }
+ }
+ protected void expireNMs(List<NodeID> nodes) {
+ /* for now do nothing */
+ }
+
+ /*
+ * This class runs continuosly to track the nodemanagers
+ * that might be dead.
+ */
+ private class HeartBeatThread extends Thread {
+ private volatile boolean stop = false;
+
+ public HeartBeatThread() {
+ super("RMResourceTrackerImpl:" + HeartBeatThread.class.getName());
+ }
+
+ @Override
+ public void run() {
+ /* the expiry queue does not need to be in sync with nodeManagers,
+ * if a nodemanager in the expiry queue cannot be found in nodemanagers
+ * its alright. We do not want to hold a hold on nodeManagers while going
+ * through the expiry queue.
+ */
+
+ List<NodeID> expired = new ArrayList<NodeID>();
+ LOG.info("Starting expiring thread with interval " + nmExpiryInterval);
+
+ while (!stop) {
+ NodeStatus leastRecent;
+ long now = System.currentTimeMillis();
+ expired.clear();
+ synchronized(nmExpiryQueue) {
+ while ((nmExpiryQueue.size() > 0) &&
+ (leastRecent = nmExpiryQueue.first()) != null &&
+ ((now - leastRecent.lastSeen) >
+ nmExpiryInterval)) {
+ nmExpiryQueue.remove(leastRecent);
+ NodeInfoTracker info;
+ synchronized(nodeManagers) {
+ info = nodeManagers.get(leastRecent.nodeId);
+ }
+ if (info == null) {
+ continue;
+ }
+ NodeStatus status = info.getNodeStatus();
+ if ((now - status.lastSeen) > nmExpiryInterval) {
+ expired.add(status.nodeId);
+ } else {
+ nmExpiryQueue.add(status);
+ }
+ }
+ }
+ expireNMs(expired);
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,32 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
+
+import java.util.List;
+
+/**
+ * The read-only interface for cluster resource
+ */
+public interface ResourceContext {
+ /**
+ * Get all node info
+ * @return a list of node info
+ */
+ List<NodeInfo> getAllNodeInfo();
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,315 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+/**
+ * This class keeps track of all the consumption of an application.
+ * This also keeps track of current running/completed
+ * containers for the application.
+ */
+@LimitedPrivate("yarn")
+@Evolving
+public class Application {
+ private static final Log LOG = LogFactory.getLog(Application.class);
+
+ private AtomicInteger containerCtr = new AtomicInteger(0);
+
+ final ApplicationID applicationId;
+ final Queue queue;
+ final String user;
+
+ final Set<Priority> priorities =
+ new TreeSet<Priority>(
+ new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
+ final Map<Priority, Map<String, ResourceRequest>> requests =
+ new HashMap<Priority, Map<String, ResourceRequest>>();
+ final Resource currentConsumption = new Resource();
+ final Resource overallConsumption = new Resource();
+
+ /* Current consumption */
+ List<Container> acquired = new ArrayList<Container>();
+ List<Container> completedContainers = new ArrayList<Container>();
+ /* Allocated by scheduler */
+ List<Container> allocated = new ArrayList<Container>();
+ Set<NodeInfo> applicationOnNodes = new HashSet<NodeInfo>();
+
+ public Application(ApplicationID applicationId, Queue queue, String user) {
+ this.applicationId = applicationId;
+ this.queue = queue;
+ this.user = user;
+ }
+
+ public ApplicationID getApplicationId() {
+ return applicationId;
+ }
+
+ public Queue getQueue() {
+ return queue;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public synchronized Map<Priority, Map<String, ResourceRequest>> getRequests() {
+ return requests;
+ }
+
+ public int getNewContainerId() {
+ return containerCtr.incrementAndGet();
+ }
+
+ /**
+ * the currently acquired/allocated containers by the application masters.
+ * @return the current containers being used by the application masters.
+ */
+ public synchronized List<Container> getCurrentContainers() {
+ List<Container> currentContainers = new ArrayList<Container>(acquired);
+ currentContainers.addAll(allocated);
+ return currentContainers;
+ }
+
+ /**
+ * The ApplicationMaster is acquiring the allocated/completed resources.
+ * @return allocated resources
+ */
+ synchronized public List<Container> acquire() {
+ // Return allocated containers
+ acquired.addAll(allocated);
+ List<Container> heartbeatContainers = allocated;
+ allocated = new ArrayList<Container>();
+
+ // Metrics
+ for (Container container : heartbeatContainers) {
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ overallConsumption, container.resource);
+ }
+
+ LOG.debug("acquire:" +
+ " application=" + applicationId +
+ " #acquired=" + heartbeatContainers.size());
+ heartbeatContainers = (heartbeatContainers == null) ?
+ new ArrayList<Container>() : heartbeatContainers;
+
+ heartbeatContainers.addAll(completedContainers);
+ completedContainers.clear();
+ return heartbeatContainers;
+ }
+
+ /**
+ * The ApplicationMaster is updating resource requirements for the
+ * application, by asking for more resources and releasing resources
+ * acquired by the application.
+ * @param requests resources to be acquired
+ * @param release resources being released
+ */
+ synchronized public void updateResourceRequests(List<ResourceRequest> requests) {
+ // Update resource requests
+ for (ResourceRequest request : requests) {
+ Priority priority = request.priority;
+ String hostName = request.hostName.toString();
+
+ Map<String, ResourceRequest> asks = this.requests.get(priority);
+
+ if (asks == null) {
+ asks = new HashMap<String, ResourceRequest>();
+ this.requests.put(priority, asks);
+ this.priorities.add(priority);
+ }
+
+ asks.put(hostName, request);
+
+ if (hostName.equals(NodeManager.ANY)) {
+ LOG.debug("update:" +
+ " application=" + applicationId +
+ " request=" + request);
+ }
+ }
+ }
+
+ public synchronized void releaseContainers(List<Container> release) {
+ // Release containers and update consumption
+ for (Container container : release) {
+ LOG.debug("update: " +
+ "application=" + applicationId + " released=" + container);
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+ currentConsumption, container.resource);
+ for (Iterator<Container> i=acquired.iterator(); i.hasNext();) {
+ Container c = i.next();
+ if (c.id.equals(container.id)) {
+ i.remove();
+ LOG.info("Removed acquired container: " + container.id);
+ }
+ }
+ }
+ }
+
+ synchronized public Collection<Priority> getPriorities() {
+ return priorities;
+ }
+
+ synchronized public Map<String, ResourceRequest>
+ getResourceRequests(Priority priority) {
+ return requests.get(priority);
+ }
+
+ synchronized public ResourceRequest getResourceRequest(Priority priority,
+ String node) {
+ Map<String, ResourceRequest> nodeRequests = requests.get(priority);
+ return (nodeRequests == null) ? null : nodeRequests.get(node);
+ }
+
+ synchronized public void completedContainer(Container container) {
+ LOG.info("Completed container: " + container);
+ completedContainers.add(container);
+ }
+
+ synchronized public void completedContainers(List<Container> containers) {
+ completedContainers.addAll(containers);
+ }
+
+ /**
+ * Resources have been allocated to this application by the resource scheduler.
+ * Track them.
+ * @param type the type of the node
+ * @param node the nodeinfo of the node
+ * @param priority the priority of the request.
+ * @param request the request
+ * @param containers the containers allocated.
+ */
+ synchronized public void allocate(NodeType type, NodeInfo node,
+ Priority priority, ResourceRequest request, List<Container> containers) {
+ applicationOnNodes.add(node);
+ if (type == NodeType.DATA_LOCAL) {
+ allocateNodeLocal(node, priority, request, containers);
+ } else if (type == NodeType.RACK_LOCAL) {
+ allocateRackLocal(node, priority, request, containers);
+ } else {
+ allocateOffSwitch(node, priority, request, containers);
+ }
+ }
+
+ /**
+ * The {@link ResourceScheduler} is allocating data-local resources
+ * to the application.
+ * @param allocatedContainers resources allocated to the application
+ */
+ synchronized private void allocateNodeLocal(NodeInfo node,
+ Priority priority, ResourceRequest nodeLocalRequest,
+ List<Container> containers) {
+ // Update consumption and track allocations
+ allocate(containers);
+
+ // Update future requirements
+ nodeLocalRequest.numContainers -= containers.size();
+ ResourceRequest rackLocalRequest =
+ requests.get(priority).get(node.getRackName());
+ rackLocalRequest.numContainers -= containers.size();
+ ResourceRequest offSwitchRequest =
+ requests.get(priority).get(NodeManager.ANY);
+ offSwitchRequest.numContainers -= containers.size();
+ }
+
+ /**
+ * The {@link ResourceScheduler} is allocating data-local resources
+ * to the application.
+ * @param allocatedContainers resources allocated to the application
+ */
+ synchronized private void allocateRackLocal(NodeInfo node,
+ Priority priority, ResourceRequest rackLocalRequest,
+ List<Container> containers) {
+
+ // Update consumption and track allocations
+ allocate(containers);
+
+ // Update future requirements
+ rackLocalRequest.numContainers -= containers.size();
+ ResourceRequest offSwitchRequest =
+ requests.get(priority).get(NodeManager.ANY);
+ offSwitchRequest.numContainers -= containers.size();
+ }
+
+ /**
+ * The {@link ResourceScheduler} is allocating data-local resources
+ * to the application.
+ * @param allocatedContainers resources allocated to the application
+ */
+ synchronized private void allocateOffSwitch(NodeInfo node,
+ Priority priority, ResourceRequest offSwitchRequest,
+ List<Container> containers) {
+
+ // Update consumption and track allocations
+ allocate(containers);
+
+ // Update future requirements
+ offSwitchRequest.numContainers -= containers.size();
+ }
+
+ synchronized private void allocate(List<Container> containers) {
+ // Update consumption and track allocations
+ for (Container container : containers) {
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ currentConsumption, container.resource);
+
+ allocated.add(container);
+
+ LOG.debug("allocate: applicationId=" + applicationId +
+ " container=" + container.id + " host=" + container.hostName);
+ }
+ }
+
+ synchronized public void showRequests() {
+ for (Priority priority : getPriorities()) {
+ Map<String, ResourceRequest> requests = getResourceRequests(priority);
+ if (requests != null) {
+ for (ResourceRequest request : requests.values()) {
+ LOG.debug("showRequests:" +
+ " application=" + applicationId +
+ " request=" + request);
+ }
+ }
+ }
+ }
+
+ synchronized public List<NodeInfo> getAllNodesForApplication() {
+ return new ArrayList<NodeInfo>(applicationOnNodes);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTracker.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTracker.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTracker.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,132 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * This interface defines tracking for all the node managers for the scheduler
+ * and all the containers running/allocated
+ * on the nodemanagers.
+ */
+@Evolving
+@Private
+public interface ClusterTracker {
+
+ /**
+ * The class that encapsulates response from clusterinfo for
+ * updates from the node managers.
+ */
+ public class NodeResponse {
+ private final List<Container> completed;
+ private final List<Container> toCleanUp;
+ private final List<ApplicationID> finishedApplications;
+
+ public NodeResponse(List<ApplicationID> finishedApplications,
+ List<Container> completed, List<Container> toKill) {
+ this.finishedApplications = finishedApplications;
+ this.completed = completed;
+ this.toCleanUp = toKill;
+ }
+ public List<ApplicationID> getFinishedApplications() {
+ return this.finishedApplications;
+ }
+ public List<Container> getCompletedContainers() {
+ return this.completed;
+ }
+ public List<Container> getContainersToCleanUp() {
+ return this.toCleanUp;
+ }
+ }
+
+ /**
+ * the current cluster resource.
+ * @return the current cluster resource.
+ */
+ public Resource getClusterResource();
+
+ /**
+ * Remove the node from this cluster.
+ * @param nodeInfo the nodemanager to be removed from tracking.
+ */
+ public void removeNode(NodeInfo nodeInfo);
+
+
+ /**
+ * Add a node for tracking
+ * @param nodeId the nodeid of the node
+ * @param hostName the hostname of the node
+ * @param node the node info of the node
+ * @param capability the total capability of the node.
+ * @return the {@link NodeInfo} that tracks this node.
+ */
+ public NodeInfo addNode(NodeID nodeId, String hostName, Node node,
+ Resource capability);
+
+ /**
+ * An application has released a container
+ * @param applicationId the application that is releasing the container
+ * @param container the container that is being released
+ */
+ public boolean releaseContainer(ApplicationID applicationId,
+ Container container);
+
+ /**
+ * Update the cluster with the node update.
+ * @param nodeInfo the node for which the update is
+ * @param containers the containers update for the node.
+ * @return the containers that are completed or need to be prempted.
+ */
+ public NodeResponse nodeUpdate(NodeInfo nodeInfo,
+ Map<CharSequence, List<Container>> containers);
+
+ /**
+ * check to see if this node is being tracked for resources and allocations.
+ * @param node the node to check for.
+ * @return true if this node is being tracked, false else.
+ */
+ public boolean isTracked(NodeInfo node);
+
+ /**
+ * Update the cluster with allocated containers on a node.
+ * @param nodeInfo the nodeinfo for the node that containers are allocated on.
+ * @param applicationId the application id of the application that containers
+ * are allocated to
+ * @param containers the list of containers allocated.
+ */
+ public void addAllocatedContainers(NodeInfo nodeInfo, ApplicationID applicationId,
+ List<Container> containers);
+
+ /**
+ * Notify each of the node data structures that the application has finished.
+ * @param applicationId the application id of the application that finished.
+ * @param nodes the list of nodes that need to be notified of application completion.
+ */
+ public void finishedApplication(ApplicationID applicationId, List<NodeInfo> nodes);
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTrackerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTrackerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTrackerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+
+@Evolving
+@Private
+public class ClusterTrackerImpl extends AbstractService implements ClusterTracker {
+
+ public ClusterTrackerImpl() {
+ super("ClusterTrackerImpl");
+ }
+
+ private static final Log LOG = LogFactory.getLog(ClusterTrackerImpl.class);
+ private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
+ private Resource clusterResource = new Resource();
+ private Configuration conf;
+
+ public void init(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public synchronized Resource getClusterResource() {
+ return clusterResource;
+ }
+
+ @Override
+ public synchronized void removeNode(NodeInfo nodeInfo) {
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+ clusterResource, nodeInfo.getTotalCapability());
+ nodes.remove(nodeInfo.getHostName());
+ }
+
+ @Override
+ public synchronized boolean isTracked(NodeInfo nodeInfo) {
+ NodeManager node = nodes.get(nodeInfo.getHostName());
+ return (node == null? false: true);
+ }
+
+ @Override
+ public synchronized NodeInfo addNode(NodeID nodeId,
+ String hostName, Node node, Resource capability) {
+ NodeManager nodeManager = new NodeManager(nodeId, hostName, node, capability);
+ nodes.put(nodeManager.getHostName(), nodeManager);
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ clusterResource, nodeManager.getTotalCapability());
+ return nodeManager;
+ }
+
+ @Override
+ public synchronized boolean releaseContainer(ApplicationID applicationId,
+ Container container) {
+ // Reap containers
+ LOG.info("Application " + applicationId + " released container " + container);
+ NodeManager nodeManager = nodes.get(container.hostName.toString());
+ return nodeManager.releaseContainer(container);
+ }
+
+ @Override
+ public synchronized NodeResponse nodeUpdate(NodeInfo nodeInfo,
+ Map<CharSequence,List<Container>> containers) {
+ NodeManager node = nodes.get(nodeInfo.getHostName());
+ LOG.debug("nodeUpdate: node=" + nodeInfo.getHostName() +
+ " available=" + nodeInfo.getAvailableResource().memory);
+ return node.statusUpdate(containers);
+
+ }
+
+ @Override
+ public synchronized void addAllocatedContainers(NodeInfo nodeInfo,
+ ApplicationID applicationId, List<Container> containers) {
+ NodeManager node = nodes.get(nodeInfo.getHostName());
+ node.allocateContainer(applicationId, containers);
+ }
+
+ @Override
+ public synchronized void finishedApplication(ApplicationID applicationId,
+ List<NodeInfo> nodesToNotify) {
+ for (NodeInfo node: nodesToNotify) {
+ NodeManager nodeManager = nodes.get(node.getHostName());
+ nodeManager.notifyFinishedApplication(applicationId);
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,318 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * This class is used by ClusterInfo to keep track of all the applications/containers
+ * running on a node.
+ *
+ */
+@Private
+@Unstable
+public class NodeManager implements NodeInfo {
+ private static final Log LOG = LogFactory.getLog(NodeManager.class);
+ private final NodeID nodeId;
+ private final String hostName;
+ private Resource totalCapability;
+ private Resource availableResource = new Resource();
+ private Resource usedResource = new Resource();
+ private final Node node;
+
+ private static final Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
+ private static final List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
+ private static final ApplicationID[] EMPTY_APPLICATION_ARRAY = new ApplicationID[]{};
+ private static final List<ApplicationID> EMPTY_APPLICATION_LIST = Arrays.asList(EMPTY_APPLICATION_ARRAY);
+
+ public static final String ANY = "*";
+ /* set of containers that are allocated containers */
+ private final Map<ContainerID, Container> allocatedContainers =
+ new TreeMap<ContainerID, Container>();
+
+ /* set of containers that are currently active on a node manager */
+ private final Map<ContainerID, Container> activeContainers =
+ new TreeMap<ContainerID, Container>();
+
+ /* set of containers that need to be cleaned */
+ private final Set<Container> containersToClean =
+ new TreeSet<Container>(new org.apache.hadoop.yarn.server.resourcemanager.resource.Container.Comparator());
+
+
+ /* the list of applications that have finished and need to be purged */
+ private final List<ApplicationID> finishedApplications = new ArrayList<ApplicationID>();
+
+ private volatile int numContainers;
+
+ public NodeManager(NodeID nodeId, String hostname,
+ Node node, Resource capability) {
+ this.nodeId = nodeId;
+ this.totalCapability = capability;
+ this.hostName = hostname;
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ availableResource, capability);
+ this.node = node;
+ }
+
+ /**
+ * NodeInfo for this node.
+ * @return the {@link NodeInfo} for this node.
+ */
+ public NodeInfo getNodeInfo() {
+ return this;
+ }
+
+ /**
+ * The Scheduler has allocated containers on this node to the
+ * given application.
+ *
+ * @param applicationId application
+ * @param containers allocated containers
+ */
+ public synchronized void allocateContainer(ApplicationID applicationId,
+ List<Container> containers) {
+ if (containers == null) {
+ LOG.error("Adding null containers for application " + applicationId);
+ return;
+ }
+ for (Container container : containers) {
+ allocateContainer(container);
+ }
+
+ LOG.info("addContainers:" +
+ " node=" + getHostName() +
+ " #containers=" + containers.size() +
+ " available=" + getAvailableResource().memory +
+ " used=" + getUsedResource().memory);
+ }
+
+ /**
+ * Status update from the NodeManager
+ * @param nodeStatus node status
+ * @return the set of containers no longer should be used by the
+ * node manager.
+ */
+ public synchronized NodeResponse
+ statusUpdate(Map<CharSequence,List<Container>> allContainers) {
+
+ if (allContainers == null) {
+ return new NodeResponse(EMPTY_APPLICATION_LIST, EMPTY_CONTAINER_LIST,
+ EMPTY_CONTAINER_LIST);
+ }
+
+ List<Container> listContainers = new ArrayList<Container>();
+ // Iterate through the running containers and update their status
+ for (Map.Entry<CharSequence, List<Container>> e :
+ allContainers.entrySet()) {
+ listContainers.addAll(e.getValue());
+ }
+ NodeResponse statusCheck = update(listContainers);
+ return statusCheck;
+ }
+
+ /**
+ * Status update for an application running on a given node
+ * @param node node
+ * @param containers containers update.
+ * @return containers that are completed or need to be preempted.
+ */
+ private synchronized NodeResponse update(List<Container> containers) {
+ List<Container> completedContainers = new ArrayList<Container>();
+ List<Container> containersToCleanUp = new ArrayList<Container>();
+ List<ApplicationID> lastfinishedApplications = new ArrayList<ApplicationID>();
+
+ for (Container container : containers) {
+ if (allocatedContainers.remove(container.id) != null) {
+ activeContainers.put(container.id, container);
+ LOG.info("Activated container " + container.id + " on node " +
+ getHostName());
+ }
+
+ if (container.state == ContainerState.COMPLETE) {
+ if (activeContainers.remove(container.id) != null) {
+ updateResource(container);
+ LOG.info("Completed container " + container);
+ }
+ completedContainers.add(container);
+ LOG.info("Removed completed container " + container.id + " on node " +
+ getHostName());
+ }
+ else if (container.state != ContainerState.COMPLETE &&
+ (!allocatedContainers.containsKey(container.id)) &&
+ !activeContainers.containsKey(container.id)) {
+ containersToCleanUp.add(container);
+ }
+ }
+ containersToCleanUp.addAll(containersToClean);
+ /* clear out containers to clean */
+ containersToClean.clear();
+ lastfinishedApplications.addAll(finishedApplications);
+ return new NodeResponse(lastfinishedApplications, completedContainers,
+ containersToCleanUp);
+ }
+
+ private synchronized void allocateContainer(Container container) {
+ deductAvailableResource(container.resource);
+ ++numContainers;
+
+ allocatedContainers.put(container.id, container);
+ LOG.info("Allocated container " + container.id +
+ " to node " + getHostName());
+
+ LOG.info("Assigned container " + container.id +
+ " of capacity " + container.resource + " on host " + getHostName() +
+ ", which currently has " + numContainers + " containers, " +
+ getUsedResource() + " used and " +
+ getAvailableResource() + " available");
+ }
+
+ private synchronized boolean isValidContainer(Container c) {
+ if (activeContainers.containsKey(c.id) || allocatedContainers.containsKey(c.id))
+ return true;
+ return false;
+ }
+
+ private synchronized void updateResource(Container container) {
+ addAvailableResource(container.resource);
+ --numContainers;
+ }
+
+ /**
+ * Release an allocated container on this node.
+ * @param container container to be released
+ * @return <code>true</code> iff the container was unused,
+ * <code>false</code> otherwise
+ */
+ public synchronized boolean releaseContainer(Container container) {
+ if (!isValidContainer(container)) {
+ LOG.error("Invalid container released " + container);
+ return false;
+ }
+
+ /* remove the containers from the nodemanger */
+
+ // Was this container launched?
+ activeContainers.remove(container.id);
+ allocatedContainers.remove(container.id);
+ containersToClean.add(container);
+ updateResource(container);
+
+ LOG.info("Released container " + container.id +
+ " of capacity " + container.resource + " on host " + getHostName() +
+ ", which currently has " + numContainers + " containers, " +
+ getUsedResource() + " used and " + getAvailableResource()
+ + " available" + ", release resources=" + true);
+ return true;
+ }
+
+ @Override
+ public NodeID getNodeID() {
+ return this.nodeId;
+ }
+
+ @Override
+ public String getHostName() {
+ return this.hostName;
+ }
+
+ @Override
+ public Resource getTotalCapability() {
+ return this.totalCapability;
+ }
+
+ @Override
+ public String getRackName() {
+ return node.getNetworkLocation();
+ }
+
+ @Override
+ public Node getNode() {
+ return this.node;
+ }
+
+ @Override
+ public synchronized Resource getAvailableResource() {
+ return this.availableResource;
+ }
+
+ @Override
+ public synchronized Resource getUsedResource() {
+ return this.usedResource;
+ }
+
+ public synchronized void addAvailableResource(Resource resource) {
+ if (resource == null) {
+ LOG.error("Invalid resource addition of null resource for " + this.hostName);
+ return;
+ }
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ availableResource, resource);
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+ usedResource, resource);
+ }
+
+ public synchronized void deductAvailableResource(Resource resource) {
+ if (resource == null) {
+ LOG.error("Invalid deduction of null resource for "+ this.hostName);
+ }
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+ availableResource, resource);
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ usedResource, resource);
+ }
+
+ public synchronized void notifyFinishedApplication(ApplicationID applicationId) {
+ finishedApplications.add(applicationId);
+ /* make sure to iterate through the list and remove all the containers that
+ * belong to this application.
+ */
+ }
+
+ @Override
+ public int getNumContainers() {
+ return numContainers;
+ }
+
+ @Override
+ public String toString() {
+ return "host: " + getHostName() + " #containers=" + getNumContainers() +
+ " available=" + getAvailableResource().memory +
+ " used=" + getUsedResource().memory;
+ }
+ }
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+/**
+ * Resource classification.
+ */
+public enum NodeType {
+ DATA_LOCAL,
+ RACK_LOCAL,
+ OFF_SWITCH
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Evolving
+@LimitedPrivate("yarn")
+public interface Queue {
+ String getQueueName();
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,67 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * This interface is implemented by services which want to get notified
+ * by the resource tracker with resource tracking information.
+ */
+@Evolving
+@Private
+public interface ResourceListener {
+
+ /**
+ * add a node to the resource listener.
+ * @param nodeId the nodeid of the node
+ * @param hostName the hostname of this node.
+ * @param node the topology information.
+ * @param capability the resource capability of the node.
+ * @return the {@link NodeInfo} object that tracks this nodemanager.
+ */
+ public NodeInfo addNode(NodeID nodeId,String hostName,
+ Node node, Resource capability);
+
+ /**
+ * A node has been removed from the cluster.
+ * @param node the node to remove.
+ */
+ public void removeNode(NodeInfo node);
+
+ /**
+ * A status update from a NodeManager
+ * @param nodeInfo NodeManager info
+ * @param containers the containers completed/running/failed on this node.
+ * @return response information for the node, which containers to kill and
+ * applications to clean.
+ */
+ public NodeResponse nodeUpdate(NodeInfo nodeInfo,
+ Map<CharSequence,List<Container>> containers);
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+
+/**
+ * This interface is the one implemented by the schedulers. It mainly extends
+ * {@link ResourceListener} and {@link YarnScheduler}.
+ *
+ */
+@LimitedPrivate("yarn")
+@Evolving
+public interface ResourceScheduler extends ResourceListener, YarnScheduler {
+ /**
+ * Re-initialize the <code>ResourceScheduler</code>.
+ * @param conf configuration
+ * @param secretManager token-secret manager
+ * @throws IOException
+ */
+ void reinitialize(Configuration conf,
+ ContainerTokenSecretManager secretManager);
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,63 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+/**
+ * This interface is used by the components to talk to the
+ * scheduler for allocating of resources, cleaning up resources.
+ *
+ */
+public interface YarnScheduler {
+ /**
+ * Allocates and returns resources.
+ * @param applicationId
+ * @param ask
+ * @param release
+ * @return
+ * @throws IOException
+ */
+ List<Container> allocate(ApplicationID applicationId,
+ List<ResourceRequest> ask, List<Container> release)
+ throws IOException;
+ /**
+ * A new application has been submitted to the ResourceManager
+ * @param applicationId application which has been submitted
+ * @param user application user
+ * @param queue queue to which the applications is being submitted
+ * @param priority application priority
+ */
+ public void addApplication(ApplicationID applicationId, String user,
+ String queue, Priority priority)
+ throws IOException;
+
+ /**
+ * A submitted application has completed.
+ * @param applicationId completed application
+ */
+ public void removeApplication(ApplicationID applicationId)
+ throws IOException;
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,353 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+@LimitedPrivate("yarn")
+@Evolving
+public class CapacityScheduler
+implements ResourceScheduler, CapacitySchedulerContext {
+
+ private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
+
+ private Queue root;
+
+ private final static List<Container> EMPTY_CONTAINER_LIST =
+ new ArrayList<Container>();
+
+ private final Comparator<Queue> queueComparator = new Comparator<Queue>() {
+ @Override
+ public int compare(Queue q1, Queue q2) {
+ if (q1.getUtilization() < q2.getUtilization()) {
+ return -1;
+ } else if (q1.getUtilization() > q2.getUtilization()) {
+ return 1;
+ }
+
+ return q1.getQueuePath().compareTo(q2.getQueuePath());
+ }
+ };
+
+ private final Comparator<Application> applicationComparator =
+ new Comparator<Application>() {
+ @Override
+ public int compare(Application a1, Application a2) {
+ return a1.getApplicationId().id - a2.getApplicationId().id;
+ }
+ };
+
+ private CapacitySchedulerConfiguration conf;
+ private ContainerTokenSecretManager containerTokenSecretManager;
+
+ private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
+
+ private final ClusterTracker clusterTracker;
+
+ private Resource minimumAllocation;
+
+ private Map<ApplicationID, Application> applications =
+ new TreeMap<ApplicationID, Application>(
+ new org.apache.hadoop.yarn.server.resourcemanager.resource.ApplicationID.Comparator());
+
+ public CapacityScheduler() {
+ this.clusterTracker = createClusterTracker();
+ }
+
+ protected ClusterTracker createClusterTracker() {
+ return new ClusterTrackerImpl();
+ }
+
+ public Queue getRootQueue() {
+ return root;
+ }
+
+ @Override
+ public CapacitySchedulerConfiguration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ContainerTokenSecretManager getContainerTokenSecretManager() {
+ return containerTokenSecretManager;
+ }
+
+ @Override
+ public Resource getMinimumAllocation() {
+ return minimumAllocation;
+ }
+
+ @Override
+ public void reinitialize(Configuration conf,
+ ContainerTokenSecretManager containerTokenSecretManager) {
+ this.conf = new CapacitySchedulerConfiguration(conf);
+ this.minimumAllocation = this.conf.getMinimumAllocation();
+ this.containerTokenSecretManager = containerTokenSecretManager;
+
+ initializeQueues(this.conf);
+ }
+
+ @Private
+ public static final String ROOT = "root";
+
+ @Private
+ public static final String ROOT_QUEUE =
+ CapacitySchedulerConfiguration.PREFIX + ROOT;
+
+ private void initializeQueues(CapacitySchedulerConfiguration conf) {
+ root = parseQueue(conf, null, ROOT);
+ LOG.info("Initialized root queue " + root);
+ }
+
+ private Queue parseQueue(CapacitySchedulerConfiguration conf,
+ Queue parent, String queueName) {
+ Queue queue;
+ String[] childQueueNames =
+ conf.getQueues((parent == null) ?
+ queueName : (parent.getQueuePath()+"."+queueName));
+ if (childQueueNames == null || childQueueNames.length == 0) {
+ queue = new LeafQueue(this, queueName, parent, applicationComparator);
+ } else {
+ ParentQueue parentQueue =
+ new ParentQueue(this, queueName, queueComparator, parent);
+ List<Queue> childQueues = new ArrayList<Queue>();
+ for (String childQueueName : childQueueNames) {
+ Queue childQueue =
+ parseQueue(
+ conf,
+ parentQueue,
+ childQueueName);
+ childQueues.add(childQueue);
+
+ queues.put(childQueueName, childQueue);
+ }
+ parentQueue.setChildQueues(childQueues);
+
+ queue = parentQueue;
+ }
+
+ LOG.info("Initialized queue: " + queue);
+ return queue;
+ }
+
+ @Override
+ public void addApplication(ApplicationID applicationId,
+ String user, String queueName, Priority priority)
+ throws IOException {
+ Queue queue = queues.get(queueName);
+
+ if (queue == null) {
+ throw new IOException("Application " + applicationId +
+ " submitted by user " + user + " to unknown queue: " + queueName);
+ }
+
+ if (!(queue instanceof LeafQueue)) {
+ throw new IOException("Application " + applicationId +
+ " submitted by user " + user + " to non-leaf queue: " + queueName);
+ }
+
+ Application application = new Application(applicationId, queue, user);
+ try {
+ queue.submitApplication(application, user, queueName, priority);
+ } catch (AccessControlException ace) {
+ throw new IOException(ace);
+ }
+
+ applications.put(applicationId, application);
+
+ LOG.info("Application Submission: " + applicationId.id +
+ ", user: " + user +
+ " queue: " + queue +
+ ", currently active: " + applications.size());
+ }
+
+ @Override
+ public void removeApplication(ApplicationID applicationId)
+ throws IOException {
+ Application application = getApplication(applicationId);
+
+ if (application == null) {
+// throw new IOException("Unknown application " + applicationId +
+// " has completed!");
+ LOG.info("Unknown application " + applicationId + " has completed!");
+ return;
+ }
+
+ // Inform the queue
+ Queue queue = queues.get(application.getQueue().getQueueName());
+ LOG.info("DEBUG --- removeApplication - appId: " + applicationId + " queue: " + queue);
+ queue.finishApplication(application, queue.getQueueName());
+
+ // Release containers and update queue capacities
+ processReleasedContainers(application, application.getCurrentContainers());
+
+ // Inform all NodeManagers about completion of application
+ clusterTracker.finishedApplication(applicationId,
+ application.getAllNodesForApplication());
+
+ // Remove from our data-structure
+ applications.remove(applicationId);
+ }
+
+ @Override
+ public NodeInfo addNode(NodeID nodeId,String hostName,
+ Node node, Resource capability) {
+ return clusterTracker.addNode(nodeId, hostName, node, capability);
+ }
+
+ @Override
+ public void removeNode(NodeInfo node) {
+ clusterTracker.removeNode(node);
+ }
+
+ @Override
+ public List<Container> allocate(ApplicationID applicationId,
+ List<ResourceRequest> ask, List<Container> release)
+ throws IOException {
+
+ Application application = getApplication(applicationId);
+ if (application == null) {
+ LOG.info("Calling allocate on removed " +
+ "or non existant application " + applicationId);
+ return EMPTY_CONTAINER_LIST;
+ }
+ normalizeRequests(ask);
+
+ LOG.info("DEBUG --- allocate: pre-update" +
+ " applicationId=" + applicationId +
+ " application=" + application);
+ application.showRequests();
+
+ // Update application requests
+ application.updateResourceRequests(ask);
+
+ // Release ununsed containers and update queue capacities
+ processReleasedContainers(application, release);
+
+ LOG.info("DEBUG --- allocate: post-update");
+ application.showRequests();
+
+ List<Container> allContainers = application.acquire();
+ LOG.info("DEBUG --- allocate:" +
+ " applicationId=" + applicationId +
+ " #ask=" + ask.size() +
+ " #release=" + release.size() +
+ " #allContainers=" + allContainers.size());
+ return allContainers;
+ }
+
+ private void normalizeRequests(List<ResourceRequest> asks) {
+ for (ResourceRequest ask : asks) {
+ normalizeRequest(ask);
+ }
+ }
+
+ private void normalizeRequest(ResourceRequest ask) {
+ int memory = ask.capability.memory;
+ int minMemory = minimumAllocation.memory;
+ memory =
+ minMemory * ((memory/minMemory) + (memory%minMemory));
+ }
+
+
+ @Override
+ public synchronized NodeResponse nodeUpdate(NodeInfo node,
+ Map<CharSequence,List<Container>> containers ) {
+
+ LOG.info("nodeUpdate: " + node);
+
+ NodeResponse nodeResponse = clusterTracker.nodeUpdate(node, containers);
+
+ // Completed containers
+ processCompletedContainers(nodeResponse.getCompletedContainers());
+
+ // Assign new containers
+ root.assignContainers(clusterTracker, node);
+
+ return nodeResponse;
+ }
+
+ private synchronized void processCompletedContainers(
+ List<Container> completedContainers) {
+ for (Container container: completedContainers) {
+ Application application = getApplication(container.id.appID);
+
+ // this is possible, since an application can be removed from scheduler
+ // but the nodemanger is just updating about a completed container.
+ if (application != null) {
+
+ // Inform the queue
+ LeafQueue queue = (LeafQueue)application.getQueue();
+ queue.completedContainer(clusterTracker, container, application);
+ }
+ }
+ }
+
+ private synchronized void processReleasedContainers(Application application,
+ List<Container> releasedContainers) {
+ // Inform the application
+ application.releaseContainers(releasedContainers);
+
+ // Inform clusterTracker
+ List<Container> unusedContainers = new ArrayList<Container>();
+ for (Container container : releasedContainers) {
+ if (clusterTracker.releaseContainer(
+ application.getApplicationId(),
+ container)) {
+ unusedContainers.add(container);
+ }
+ }
+
+ // Update queue capacities
+ processCompletedContainers(unusedContainers);
+ }
+
+ private synchronized Application getApplication(ApplicationID applicationId) {
+ return applications.get(applicationId);
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,163 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.Resource;
+
+public class CapacitySchedulerConfiguration extends Configuration {
+
+ private static final Log LOG =
+ LogFactory.getLog(CapacitySchedulerConfiguration.class);
+
+ private static final String CS_CONFIGURATION_FILE = "capacity-scheduler.xml";
+
+ @Private
+ public static final String PREFIX = "yarn.capacity-scheduler.";
+
+ @Private
+ public static final String DOT = ".";
+
+ @Private
+ public static final String MAXIMUM_SYSTEM_APPLICATIONS =
+ PREFIX + "maximum-applications";
+
+ @Private
+ public static final String QUEUES = "queues";
+
+ @Private
+ public static final String CAPACITY = "capacity";
+
+ @Private
+ public static final String MAXIMUM_CAPACITY = "maximum-capacity";
+
+ @Private
+ public static final String USER_LIMIT = "minimum-user-limit";
+
+ @Private
+ public static final String USER_LIMIT_FACTOR = "user-limit-factor";
+
+ private static final int MINIMUM_MEMORY = 1024;
+
+ @Private
+ public static final String MINIMUM_ALLOCATION =
+ PREFIX + "minimum-allocation-mb";
+
+ @Private
+ public static int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
+
+ @Private
+ public static int UNDEFINED = -1;
+
+ @Private
+ public static int MINIMUM_CAPACITY_VALUE = 1;
+
+ @Private
+ public static int MAXIMUM_CAPACITY_VALUE = 100;
+
+ @Private
+ public static int DEFAULT_USER_LIMIT = 100;
+
+ @Private
+ public static float DEFAULT_USER_LIMIT_FACTOR = 1.0f;
+
+ public CapacitySchedulerConfiguration() {
+ this(new Configuration());
+ }
+
+ public CapacitySchedulerConfiguration(Configuration configuration) {
+ super(configuration);
+ addResource(CS_CONFIGURATION_FILE);
+ }
+
+ private String getQueuePrefix(String queue) {
+ String queueName = PREFIX + queue + DOT;
+ return queueName;
+ }
+
+ public int getMaximumSystemApplications() {
+ int maxApplications =
+ getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS);
+ return maxApplications;
+ }
+
+ public int getCapacity(String queue) {
+ int capacity = getInt(getQueuePrefix(queue) + CAPACITY, UNDEFINED);
+ if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
+ throw new IllegalArgumentException("Illegal " +
+ "capacity of " + capacity + " for queue " + queue);
+ }
+ LOG.info("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue) +
+ ", capacity=" + capacity);
+ return capacity;
+ }
+
+ public int getMaximumCapacity(String queue) {
+ int maxCapacity =
+ getInt(getQueuePrefix(queue) + MAXIMUM_CAPACITY, UNDEFINED);
+ return maxCapacity;
+ }
+
+ public int getUserLimit(String queue) {
+ int userLimit =
+ getInt(getQueuePrefix(queue) + USER_LIMIT, DEFAULT_USER_LIMIT);
+ return userLimit;
+ }
+
+ public float getUserLimitFactor(String queue) {
+ float userLimitFactor =
+ getFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR,
+ DEFAULT_USER_LIMIT_FACTOR);
+ return userLimitFactor;
+ }
+
+ public void setUserLimitFactor(String queue, float userLimitFactor) {
+ setFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, userLimitFactor);
+ }
+
+ public void setCapacity(String queue, int capacity) {
+ setInt(getQueuePrefix(queue) + CAPACITY, capacity);
+ LOG.info("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue) +
+ ", capacity=" + capacity);
+ }
+
+ public String[] getQueues(String queue) {
+ LOG.info("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue));
+ String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);
+ LOG.info("CSConf - getQueues: queuePrefix=" + getQueuePrefix(queue) +
+ ", queues=" + ((queues == null) ? "" : StringUtils.arrayToString(queues)));
+ return queues;
+ }
+
+ public void setQueues(String queue, String[] subQueues) {
+ set(getQueuePrefix(queue) + QUEUES, StringUtils.arrayToString(subQueues));
+ LOG.info("CSConf - setQueues: qPrefix=" + getQueuePrefix(queue) +
+ ", queues=" + StringUtils.arrayToString(subQueues));
+ }
+
+ public Resource getMinimumAllocation() {
+ int minimumMemory = getInt(MINIMUM_ALLOCATION, MINIMUM_MEMORY);
+ return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+ createResource(minimumMemory);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * Read-only interface to {@link CapacityScheduler} context.
+ */
+public interface CapacitySchedulerContext {
+ CapacitySchedulerConfiguration getConfiguration();
+
+ Resource getMinimumAllocation();
+
+ ContainerTokenSecretManager getContainerTokenSecretManager();
+}