You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC
svn commit: r1082677 [35/38] - in /hadoop/mapreduce/branches/MR-279: ./
assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/
mr-client/hadoop-mapreduce-client-app/src/
mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,681 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+@Private
+@Unstable
+public class LeafQueue implements Queue {
+ private static final Log LOG = LogFactory.getLog(LeafQueue.class);
+
+ private final String queueName;
+ private final Queue parent;
+ private final float capacity;
+ private final float absoluteCapacity;
+ private final float maxCapacity;
+ private final float absoluteMaxCapacity;
+ private final int userLimit;
+ private final float userLimitFactor;
+
+ private final int maxApplications;
+ private final int maxApplicationsPerUser;
+
+ private Resource usedResources =
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+ private float utilization = 0.0f;
+ private float usedCapacity = 0.0f;
+ private volatile int numContainers;
+
+ Set<Application> applications;
+
+ public final Resource minimumAllocation;
+
+ private ContainerTokenSecretManager containerTokenSecretManager;
+
+ private Map<String, User> users = new HashMap<String, User>();
+
+ public LeafQueue(CapacitySchedulerContext cs,
+ String queueName, Queue parent,
+ Comparator<Application> applicationComparator) {
+ this.queueName = queueName;
+ this.parent = parent;
+
+ this.minimumAllocation = cs.getMinimumAllocation();
+ this.containerTokenSecretManager = cs.getContainerTokenSecretManager();
+
+ this.capacity =
+ (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
+ this.absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
+
+ this.maxCapacity = cs.getConfiguration().getMaximumCapacity(getQueuePath());
+ this.absoluteMaxCapacity =
+ (maxCapacity == CapacitySchedulerConfiguration.UNDEFINED) ?
+ Float.MAX_VALUE : (parent.getAbsoluteCapacity() * maxCapacity) / 100;
+
+ this.userLimit = cs.getConfiguration().getUserLimit(getQueuePath());
+
+ this.userLimitFactor =
+ cs.getConfiguration().getUserLimitFactor(getQueuePath());
+
+ int maxSystemJobs = cs.getConfiguration().getMaximumSystemApplications();
+ this.maxApplications = (int)(maxSystemJobs * absoluteCapacity);
+ this.maxApplicationsPerUser =
+ (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
+
+ LOG.info("DEBUG --- LeafQueue:" +
+ " name=" + queueName +
+ ", fullname=" + getQueuePath() +
+ ", capacity=" + capacity +
+ ", asboluteCapacity=" + absoluteCapacity +
+ ", maxCapacity=" + maxCapacity +
+ ", asboluteMaxCapacity=" + absoluteMaxCapacity +
+ ", userLimit=" + userLimit + ", userLimitFactor=" + userLimitFactor +
+ ", maxApplications=" + maxApplications +
+ ", maxApplicationsPerUser=" + maxApplicationsPerUser);
+
+ this.applications = new TreeSet<Application>(applicationComparator);
+ }
+
+ @Override
+ public float getCapacity() {
+ return capacity;
+ }
+
+ @Override
+ public float getAbsoluteCapacity() {
+ return absoluteCapacity;
+ }
+
+ @Override
+ public float getMaximumCapacity() {
+ return maxCapacity;
+ }
+
+ @Override
+ public float getAbsoluteMaximumCapacity() {
+ return absoluteMaxCapacity;
+ }
+
+ @Override
+ public Queue getParent() {
+ return parent;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ @Override
+ public String getQueuePath() {
+ return parent.getQueuePath() + "." + getQueueName();
+ }
+
+ @Override
+ public float getUsedCapacity() {
+ return usedCapacity;
+ }
+
+ @Override
+ public synchronized Resource getUsedResources() {
+ return usedResources;
+ }
+
+ @Override
+ public synchronized float getUtilization() {
+ return utilization;
+ }
+
+ @Override
+ public synchronized List<Application> getApplications() {
+ return new ArrayList<Application>(applications);
+ }
+
+ @Override
+ public List<Queue> getChildQueues() {
+ return null;
+ }
+
+ synchronized void setUtilization(float utilization) {
+ this.utilization = utilization;
+ }
+
+ synchronized void setUsedCapacity(float usedCapacity) {
+ this.usedCapacity = usedCapacity;
+ }
+
+ public synchronized int getNumApplications() {
+ return applications.size();
+ }
+
+ public int getNumContainers() {
+ return numContainers;
+ }
+
+ public String toString() {
+ return queueName + ":" + capacity + ":" + absoluteCapacity + ":" +
+ getUsedCapacity() + ":" + getUtilization() + ":" +
+ getNumApplications() + ":" + getNumContainers();
+ }
+
+ private synchronized User getUser(String userName) {
+ User user = users.get(userName);
+ if (user == null) {
+ user = new User();
+ users.put(userName, user);
+ }
+ return user;
+ }
+
+ @Override
+ public void submitApplication(Application application, String userName,
+ String queue, Priority priority)
+ throws AccessControlException {
+ // Careful! Locking order is important!
+ synchronized (this) {
+
+ // Check submission limits for queues
+ if (getNumApplications() >= maxApplications) {
+ throw new AccessControlException("Queue " + getQueuePath() +
+ " already has " + getNumApplications() + " applications," +
+ " cannot accept submission of application: " +
+ application.getApplicationId());
+ }
+
+ // Check submission limits for the user on this queue
+ User user = getUser(userName);
+ if (user.getApplications() >= maxApplicationsPerUser) {
+ throw new AccessControlException("Queue " + getQueuePath() +
+ " already has " + user.getApplications() +
+ " applications from user " + userName +
+ " cannot accept submission of application: " +
+ application.getApplicationId());
+ }
+
+ // Accept
+ user.submitApplication();
+ applications.add(application);
+
+ LOG.info("Application submission -" +
+ " appId: " + application.getApplicationId() +
+ " user: " + user + "," + " leaf-queue: " + getQueueName() +
+ " #user-applications: " + user.getApplications() +
+ " #queue-applications: " + getNumApplications());
+ }
+
+ // Inform the parent queue
+ parent.submitApplication(application, userName, queue, priority);
+ }
+
+ @Override
+ public void finishApplication(Application application, String queue)
+ throws AccessControlException {
+ // Careful! Locking order is important!
+ synchronized (this) {
+ applications.remove(application);
+
+ User user = getUser(application.getUser());
+ user.finishApplication();
+ if (user.getApplications() == 0) {
+ users.remove(application.getUser());
+ }
+
+ LOG.info("Application completion -" +
+ " appId: " + application.getApplicationId() +
+ " user: " + application.getUser() +
+ " queue: " + getQueueName() +
+ " #user-applications: " + user.getApplications() +
+ " #queue-applications: " + getNumApplications());
+ }
+
+ // Inform the parent queue
+ parent.finishApplication(application, queue);
+ }
+
+ @Override
+ public synchronized Resource
+ assignContainers(ClusterTracker cluster, NodeInfo node) {
+
+ LOG.info("DEBUG --- assignContainers:" +
+ " node=" + node.getHostName() +
+ " #applications=" + applications.size());
+
+ // Try to assign containers to applications in fifo order
+ for (Application application : applications) {
+
+ LOG.info("DEBUG --- pre-assignContainers");
+ application.showRequests();
+
+ synchronized (application) {
+ for (Priority priority : application.getPriorities()) {
+
+ // Do we need containers at this 'priority'?
+ if (!needContainers(application, priority)) {
+ continue;
+ }
+
+ // Are we going over limits by allocating to this application?
+ ResourceRequest required =
+ application.getResourceRequest(priority, NodeManager.ANY);
+ if (required != null && required.numContainers > 0) {
+
+ // Maximum Capacity of the queue
+ if (!assignToQueue(cluster, required.capability)) {
+ return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ }
+
+ // User limits
+ if (!assignToUser(application.getUser(), cluster, required.capability)) {
+ return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ }
+
+ }
+
+ Resource assigned =
+ assignContainersOnNode(cluster, node, application, priority);
+
+ if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+ assigned,
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ Resource assignedResource =
+ application.getResourceRequest(priority, NodeManager.ANY).capability;
+
+ // Book-keeping
+ allocateResource(cluster.getClusterResource(),
+ application.getUser(), assignedResource);
+
+ // Done
+ return assignedResource;
+ } else {
+ // Do not assign out of order w.r.t priorities
+ break;
+ }
+ }
+ }
+
+ LOG.info("DEBUG --- post-assignContainers");
+ application.showRequests();
+ }
+
+ return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ }
+
+ private synchronized boolean assignToQueue(ClusterTracker cluster,
+ Resource required) {
+ float newUtilization =
+ (float)(usedResources.memory + required.memory) /
+ (cluster.getClusterResource().memory * absoluteCapacity);
+ if (newUtilization > absoluteMaxCapacity) {
+ LOG.info(getQueueName() +
+ " current-capacity (" + getUtilization() + ") +" +
+ " required (" + required.memory + ")" +
+ " > max-capacity (" + absoluteMaxCapacity + ")");
+ return false;
+ }
+ return true;
+ }
+
+ private synchronized boolean assignToUser(String userName, ClusterTracker cluster,
+ Resource required) {
+ // What is our current capacity?
+ // * It is equal to the max(required, queue-capacity) if
+ // we're running below capacity. The 'max' ensures that jobs in queues
+ // with miniscule capacity (< 1 slot) make progress
+ // * If we're running over capacity, then its
+ // (usedResources + required) (which extra resources we are allocating)
+
+ // Allow progress for queues with miniscule capacity
+ final int queueCapacity =
+ Math.max(
+ divideAndCeil((int)(absoluteCapacity * cluster.getClusterResource().memory),
+ minimumAllocation.memory),
+ required.memory);
+
+ final int consumed = usedResources.memory;
+ final int currentCapacity =
+ (consumed < queueCapacity) ? queueCapacity : (consumed + required.memory);
+
+ // Never allow a single user to take more than the
+ // queue's configured capacity * user-limit-factor.
+ // Also, the queue's configured capacity should be higher than
+ // queue-hard-limit * ulMin
+
+ final int activeUsers = users.size();
+ User user = getUser(userName);
+
+ int limit =
+ Math.min(
+ Math.max(divideAndCeil(currentCapacity, activeUsers),
+ divideAndCeil((int)userLimit*currentCapacity, 100)),
+ (int)(queueCapacity * userLimitFactor)
+ );
+
+ // Note: We aren't considering the current request since there is a fixed
+ // overhead of the AM, so...
+ if ((user.getConsumedResources().memory) > limit) {
+ LOG.info("User " + userName + " in queue " + getQueueName() +
+ " will exceed limit, required: " + required +
+ " consumed: " + user.getConsumedResources() + " limit: " + limit +
+ " queueCapacity: " + queueCapacity +
+ " qconsumed: " + consumed +
+ " currentCapacity: " + currentCapacity +
+ " activeUsers: " + activeUsers
+ );
+ return false;
+ }
+
+ return true;
+ }
+
+ private static int divideAndCeil(int a, int b) {
+ if (b == 0) {
+ LOG.info("divideAndCeil called with a=" + a + " b=" + b);
+ return 0;
+ }
+ return (a + (b - 1)) / b;
+ }
+
+ boolean needContainers(Application application, Priority priority) {
+ ResourceRequest offSwitchRequest =
+ application.getResourceRequest(priority, NodeManager.ANY);
+
+ return (offSwitchRequest.numContainers > 0);
+ }
+
+ Resource assignContainersOnNode(ClusterTracker cluster, NodeInfo node,
+ Application application, Priority priority) {
+
+ Resource assigned =
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+
+ // Data-local
+ assigned = assignNodeLocalContainers(cluster, node, application, priority);
+ if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+ assigned,
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ return assigned;
+ }
+
+ // Rack-local
+ assigned = assignRackLocalContainers(cluster, node, application, priority);
+ if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+ assigned,
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ return assigned;
+ }
+
+ // Off-switch
+ return assignOffSwitchContainers(cluster, node, application, priority);
+ }
+
+ Resource assignNodeLocalContainers(ClusterTracker cluster, NodeInfo node,
+ Application application, Priority priority) {
+ ResourceRequest request =
+ application.getResourceRequest(priority, node.getHostName());
+ if (request != null) {
+ if (canAssign(application, priority, node, NodeType.DATA_LOCAL)) {
+ return assignContainer(cluster, node, application, priority, request,
+ NodeType.DATA_LOCAL);
+ }
+ }
+
+ return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ }
+
+ Resource assignRackLocalContainers(ClusterTracker cluster, NodeInfo node,
+ Application application, Priority priority) {
+ ResourceRequest request =
+ application.getResourceRequest(priority, node.getRackName());
+ if (request != null) {
+ if (canAssign(application, priority, node, NodeType.RACK_LOCAL)) {
+ return assignContainer(cluster, node, application, priority, request,
+ NodeType.RACK_LOCAL);
+ }
+ }
+
+ return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ }
+
+ Resource assignOffSwitchContainers(ClusterTracker cluster, NodeInfo node,
+ Application application, Priority priority) {
+ ResourceRequest request =
+ application.getResourceRequest(priority, NodeManager.ANY);
+ if (request != null) {
+ if (canAssign(application, priority, node, NodeType.OFF_SWITCH)) {
+ return assignContainer(cluster, node, application, priority, request,
+ NodeType.OFF_SWITCH);
+ }
+ }
+
+ return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ }
+
+ boolean canAssign(Application application, Priority priority,
+ NodeInfo node, NodeType type) {
+
+ ResourceRequest offSwitchRequest =
+ application.getResourceRequest(priority, NodeManager.ANY);
+
+ if (offSwitchRequest.numContainers == 0) {
+ return false;
+ }
+
+ if (type == NodeType.OFF_SWITCH) {
+ return offSwitchRequest.numContainers > 0;
+ }
+
+ if (type == NodeType.RACK_LOCAL) {
+ ResourceRequest rackLocalRequest =
+ application.getResourceRequest(priority, node.getRackName());
+ if (rackLocalRequest == null) {
+ // No point waiting for rack-locality if we don't need this rack
+ return offSwitchRequest.numContainers > 0;
+ } else {
+ return rackLocalRequest.numContainers > 0;
+ }
+ }
+
+ if (type == NodeType.DATA_LOCAL) {
+ ResourceRequest nodeLocalRequest =
+ application.getResourceRequest(priority, node.getHostName());
+ if (nodeLocalRequest != null) {
+ return nodeLocalRequest.numContainers > 0;
+ }
+ }
+
+ return false;
+ }
+
+ private Resource assignContainer(ClusterTracker cluster, NodeInfo node,
+ Application application,
+ Priority priority, ResourceRequest request, NodeType type) {
+ LOG.info("DEBUG --- assignContainers:" +
+ " node=" + node.getHostName() +
+ " application=" + application.getApplicationId().id +
+ " priority=" + priority.priority +
+ " request=" + request + " type=" + type);
+ Resource capability = request.capability;
+
+ int availableContainers =
+ node.getAvailableResource().memory / capability.memory; // TODO: A buggy
+ // application
+ // with this
+ // zero would
+ // crash the
+ // scheduler.
+
+ if (availableContainers > 0) {
+ List<Container> containers =
+ new ArrayList<Container>();
+ Container container =
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Container
+ .create(application.getApplicationId(),
+ application.getNewContainerId(),
+ node.getHostName(), capability);
+
+ // If security is enabled, send the container-tokens too.
+ if (UserGroupInformation.isSecurityEnabled()) {
+ ContainerToken containerToken = new ContainerToken();
+ ContainerTokenIdentifier tokenidentifier =
+ new ContainerTokenIdentifier(container.id,
+ container.hostName.toString(), container.resource);
+ containerToken.identifier =
+ ByteBuffer.wrap(tokenidentifier.getBytes());
+ containerToken.kind = ContainerTokenIdentifier.KIND.toString();
+ containerToken.password =
+ ByteBuffer.wrap(containerTokenSecretManager
+ .createPassword(tokenidentifier));
+ containerToken.service = container.hostName; // TODO: port
+ container.containerToken = containerToken;
+ }
+
+ containers.add(container);
+
+ // Allocate container to the application
+ application.allocate(type, node, priority, request, containers);
+
+ // Update resource usage on the node
+ cluster.addAllocatedContainers(node, application.getApplicationId(),
+ containers);
+
+ LOG.info("allocatedContainer" +
+ " container=" + container +
+ " queue=" + this.toString() +
+ " util=" + getUtilization() +
+ " used=" + usedResources +
+ " cluster=" + cluster.getClusterResource());
+
+ return container.resource;
+ }
+
+ return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ }
+
+ @Override
+ public void completedContainer(ClusterTracker cluster,
+ Container container, Application application) {
+ if (application != null) {
+ // Careful! Locking order is important!
+ synchronized (this) {
+ // Inform the application
+ application.completedContainer(container);
+
+ // Book-keeping
+ releaseResource(cluster.getClusterResource(),
+ application.getUser(), container.resource);
+
+ LOG.info("completedContainer" +
+ " container=" + container +
+ " queue=" + this +
+ " util=" + getUtilization() +
+ " used=" + usedResources +
+ " cluster=" + cluster.getClusterResource());
+ }
+
+ // Inform the parent queue
+ parent.completedContainer(cluster, container, application);
+ }
+ }
+
+ private synchronized void allocateResource(Resource clusterResource,
+ String userName, Resource resource) {
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+ addResource(usedResources, resource);
+ update(clusterResource);
+ ++numContainers;
+
+ User user = getUser(userName);
+ user.assignContainer(resource);
+ }
+
+ private synchronized void releaseResource(Resource clusterResource,
+ String userName, Resource resource) {
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+ subtractResource(usedResources, resource);
+ update(clusterResource);
+ --numContainers;
+
+ User user = getUser(userName);
+ user.releaseContainer(resource);
+ }
+
+ private synchronized void update(Resource clusterResource) {
+ setUtilization(usedResources.memory / (clusterResource.memory * absoluteCapacity));
+ setUsedCapacity(usedResources.memory / (clusterResource.memory * capacity));
+ }
+
+ static class User {
+ Resource consumed =
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+ int applications = 0;
+
+ public Resource getConsumedResources() {
+ return consumed;
+ }
+
+ public int getApplications() {
+ return applications;
+ }
+
+ public synchronized void submitApplication() {
+ ++applications;
+ }
+
+ public synchronized void finishApplication() {
+ --applications;
+ }
+
+ public synchronized void assignContainer(Resource resource) {
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ consumed, resource);
+ }
+
+ public synchronized void releaseContainer(Resource resource) {
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+ consumed, resource);
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,408 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+
+@Private
+@Evolving
+public class ParentQueue implements Queue {
+
+ private static final Log LOG = LogFactory.getLog(ParentQueue.class);
+
+ private final Queue parent;
+ private final String queueName;
+ private final float capacity;
+ private final float maximumCapacity;
+ private final float absoluteCapacity;
+ private final float absoluteMaxCapacity;
+
+ private float usedCapacity = 0.0f;
+ private float utilization = 0.0f;
+
+ private final Set<Queue> childQueues;
+
+ private Resource usedResources =
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+
+ private final boolean rootQueue;
+
+ private final Resource minimumAllocation;
+
+ private volatile int numApplications;
+ private volatile int numContainers;
+
+ public ParentQueue(CapacitySchedulerContext cs,
+ String queueName, Comparator<Queue> comparator, Queue parent) {
+ minimumAllocation = cs.getMinimumAllocation();
+
+ this.parent = parent;
+ this.queueName = queueName;
+ this.rootQueue = (parent == null);
+
+ LOG.info("PQ: parent=" + parent + ", qName=" + queueName +
+ " qPath=" + getQueuePath() + ", root=" + rootQueue);
+ this.capacity =
+ (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
+
+ float parentAbsoluteCapacity =
+ (parent == null) ? 1.0f : parent.getAbsoluteCapacity();
+ this.absoluteCapacity = parentAbsoluteCapacity * capacity;
+
+ this.maximumCapacity =
+ cs.getConfiguration().getMaximumCapacity(getQueuePath());
+ this.absoluteMaxCapacity =
+ (maximumCapacity == CapacitySchedulerConfiguration.UNDEFINED) ?
+ Float.MAX_VALUE : (parentAbsoluteCapacity * maximumCapacity) / 100;
+
+ this.childQueues = new TreeSet<Queue>(comparator);
+
+ LOG.info("Initialized parent-queue " + queueName +
+ " name=" + queueName +
+ ", fullname=" + getQueuePath() +
+ ", capacity=" + capacity +
+ ", asboluteCapacity=" + absoluteCapacity +
+ ", maxCapacity=" + maximumCapacity +
+ ", asboluteMaxCapacity=" + absoluteMaxCapacity);
+ }
+
+ public void setChildQueues(Collection<Queue> childQueues) {
+
+ // Validate
+ float childCapacities = 0;
+ for (Queue queue : childQueues) {
+ childCapacities += queue.getCapacity();
+ }
+ if (childCapacities != 1.0f) {
+ throw new IllegalArgumentException("Illegal" +
+ " capacity of " + childCapacities +
+ " for children of queue " + queueName);
+ }
+
+ this.childQueues.addAll(childQueues);
+ LOG.info("DEBUG --- setChildQueues: " + getChildQueuesToPrint());
+ }
+
+ @Override
+ public Queue getParent() {
+ return parent;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ @Override
+ public String getQueuePath() {
+ String parentPath = ((parent == null) ? "" : (parent.getQueuePath() + "."));
+ return parentPath + getQueueName();
+ }
+
+ @Override
+ public float getCapacity() {
+ return capacity;
+ }
+
+ @Override
+ public float getAbsoluteCapacity() {
+ return absoluteCapacity;
+ }
+
+ @Override
+ public float getAbsoluteMaximumCapacity() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public float getMaximumCapacity() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public float getUsedCapacity() {
+ return usedCapacity;
+ }
+
+ @Override
+ public synchronized Resource getUsedResources() {
+ return usedResources;
+ }
+
+ @Override
+ public synchronized float getUtilization() {
+ return utilization;
+ }
+
+ @Override
+ public List<Application> getApplications() {
+ return null;
+ }
+
+ @Override
+ public synchronized List<Queue> getChildQueues() {
+ return new ArrayList<Queue>(childQueues);
+ }
+
+ public int getNumContainers() {
+ return numContainers;
+ }
+
+ public int getNumApplications() {
+ return numApplications;
+ }
+
+ public String toString() {
+ return queueName + ":" + capacity + ":" + absoluteCapacity + ":" +
+ getUsedCapacity() + ":" + getUtilization() + ":" +
+ getNumApplications() + ":" + getNumContainers() + ":" +
+ childQueues.size() + " child-queues";
+ }
+
+ @Override
+ public void submitApplication(Application application, String user,
+ String queue, Priority priority)
+ throws AccessControlException {
+ // Sanity check
+ if (queue.equals(queueName)) {
+ throw new AccessControlException("Cannot submit application " +
+ "to non-leaf queue: " + queueName);
+ }
+
+ ++numApplications;
+
+ LOG.info("Application submission -" +
+ " appId: " + application.getApplicationId() +
+ " user: " + user +
+ " leaf-queue of parent: " + getQueueName() +
+ " #applications: " + getNumApplications());
+
+ // Inform the parent queue
+ if (parent != null) {
+ parent.submitApplication(application, user, queue, priority);
+ }
+ }
+
+ @Override
+ public void finishApplication(Application application, String queue)
+ throws AccessControlException {
+ // Sanity check
+ if (queue.equals(queueName)) {
+ throw new AccessControlException("Cannot finish application " +
+ "from non-leaf queue: " + queueName);
+ }
+
+ --numApplications;
+
+ LOG.info("Application completion -" +
+ " appId: " + application.getApplicationId() +
+ " user: " + application.getUser() +
+ " leaf-queue of parent: " + getQueueName() +
+ " #applications: " + getNumApplications());
+
+ // Inform the parent queue
+ if (parent != null) {
+ parent.finishApplication(application, queue);
+ }
+ }
+
+ synchronized void setUsedCapacity(float usedCapacity) {
+ this.usedCapacity = usedCapacity;
+ }
+
+ synchronized void setUtilization(float utilization) {
+ this.utilization = utilization;
+ }
+
+ @Override
+ public synchronized Resource assignContainers(ClusterTracker cluster,
+ NodeInfo node) {
+ Resource assigned =
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+
+ while (canAssign(node)) {
+ LOG.info("DEBUG --- Trying to assign containers to child-queue of " +
+ getQueueName());
+
+ // Are we over maximum-capacity for this queue?
+ if (!assignToQueue()) {
+ LOG.info(getQueueName() +
+ " current-capacity (" + getUtilization() + ") > max-capacity (" +
+ absoluteMaxCapacity + ")");
+ break;
+ }
+
+ // Schedule
+ Resource assignedToChild = assignContainersToChildQueues(cluster, node);
+
+ // Done if no child-queue assigned anything
+ if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+ assignedToChild,
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ // Track resource utilization for the parent-queue
+ allocateResource(cluster.getClusterResource(), assignedToChild);
+
+ // Track resource utilization in this pass of the scheduler
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+ assigned, assignedToChild);
+
+ LOG.info("completedContainer" +
+ " queue=" + getQueueName() +
+ " util=" + getUtilization() +
+ " used=" + usedResources +
+ " cluster=" + cluster.getClusterResource());
+
+ } else {
+ break;
+ }
+
+ LOG.info("DEBUG ---" +
+ " parentQ=" + getQueueName() +
+ " assigned=" + assigned +
+ " utilization=" + getUtilization());
+
+ // Do not assign more than one container if this isn't the root queue
+ if (!rootQueue) {
+ break;
+ }
+ }
+
+ return assigned;
+ }
+
+ private synchronized boolean assignToQueue() {
+ return (getUtilization() < absoluteMaxCapacity);
+ }
+
+ private boolean canAssign(NodeInfo node) {
+ return
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThanOrEqual(
+ node.getAvailableResource(),
+ minimumAllocation);
+ }
+
+ synchronized Resource assignContainersToChildQueues(ClusterTracker cluster,
+ NodeInfo node) {
+ Resource assigned =
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+
+ printChildQueues();
+
+ // Try to assign to most 'under-served' sub-queue
+ for (Iterator<Queue> iter=childQueues.iterator(); iter.hasNext();) {
+ Queue childQueue = iter.next();
+ LOG.info("DEBUG --- Trying to assign to" +
+ " queue: " + childQueue.getQueuePath() +
+ " stats: " + childQueue);
+ assigned = childQueue.assignContainers(cluster, node);
+
+ // If we do assign, remove the queue and re-insert in-order to re-sort
+ if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+ assigned,
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ // Remove and re-insert to sort
+ iter.remove();
+ LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() +
+ " stats: " + childQueue);
+ childQueues.add(childQueue);
+ printChildQueues();
+ break;
+ }
+ }
+
+ return assigned;
+ }
+
+ String getChildQueuesToPrint() {
+ StringBuilder sb = new StringBuilder();
+ for (Queue q : childQueues) {
+ sb.append(q.getQueuePath() + "(" + q.getUtilization() + "), ");
+ }
+ return sb.toString();
+ }
+ void printChildQueues() {
+ LOG.info("DEBUG --- printChildQueues - queue: " + getQueuePath() +
+ " child-queues: " + getChildQueuesToPrint());
+ }
+
+ @Override
+ public void completedContainer(ClusterTracker cluster,
+ Container container, Application application) {
+ if (application != null) {
+ // Careful! Locking order is important!
+ // Book keeping
+ synchronized (this) {
+ releaseResource(cluster.getClusterResource(), container.resource);
+
+ LOG.info("completedContainer" +
+ " queue=" + getQueueName() +
+ " util=" + getUtilization() +
+ " used=" + usedResources +
+ " cluster=" + cluster.getClusterResource());
+ }
+
+ // Inform the parent
+ if (parent != null) {
+ parent.completedContainer(cluster, container, application);
+ }
+ }
+ }
+
+ private synchronized void allocateResource(Resource clusterResource,
+ Resource resource) {
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+ addResource(usedResources, resource);
+ update(clusterResource);
+ ++numContainers;
+ }
+
+ private synchronized void releaseResource(Resource clusterResource,
+ Resource resource) {
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+ subtractResource(usedResources, resource);
+ update(clusterResource);
+ --numContainers;
+ }
+
+ private synchronized void update(Resource clusterResource) {
+ setUtilization(usedResources.memory / (clusterResource.memory * absoluteCapacity));
+ setUsedCapacity(usedResources.memory / (clusterResource.memory * capacity));
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,164 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * Queue represents a node in the tree of
+ * hierarchical queues in the {@link CapacityScheduler}.
+ */
+@Stable
+@Private
+public interface Queue
+extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
+ /**
+ * Get the parent <code>Queue</code>.
+ * @return the parent queue
+ */
+ public Queue getParent();
+
+ /**
+ * Get the queue name.
+ * @return the queue name
+ */
+ public String getQueueName();
+
+ /**
+ * Get the full name of the queue, including the heirarchy.
+ * @return the full name of the queue
+ */
+ public String getQueuePath();
+
+ /**
+ * Get the configured <em>capacity</em> of the queue.
+ * @return queue capacity
+ */
+ public float getCapacity();
+
+ /**
+ * Get capacity of the parent of the queue as a function of the
+ * cumulative capacity in the cluster.
+ * @return capacity of the parent of the queue as a function of the
+ * cumulative capacity in the cluster
+ */
+ public float getAbsoluteCapacity();
+
+ /**
+ * Get the configured maximum-capacity of the queue.
+ * @return the configured maximum-capacity of the queue
+ */
+ public float getMaximumCapacity();
+
+ /**
+ * Get maximum-capacity of the queue as a funciton of the cumulative capacity
+ * of the cluster.
+ * @return maximum-capacity of the queue as a funciton of the cumulative capacity
+ * of the cluster
+ */
+ public float getAbsoluteMaximumCapacity();
+
+ /**
+ * Get the currently utilized capacity of the queue
+ * relative to it's parent queue.
+ * @return the currently utilized capacity of the queue
+ * relative to it's parent queue
+ */
+ public float getUsedCapacity();
+
+ /**
+ * Get the currently utilized resources in the cluster
+ * by the queue and children (if any).
+ * @return used resources by the queue and it's children
+ */
+ public Resource getUsedResources();
+
+ /**
+ * Get the current <em>utilization</em> of the queue
+ * and it's children (if any).
+ * Utilization is defined as the ratio of
+ * <em>used-capacity over configured-capacity</em> of the queue.
+ * @return queue utilization
+ */
+ public float getUtilization();
+
+ /**
+ * Get child queues
+ * @return child queues
+ */
+ public List<Queue> getChildQueues();
+
+ /**
+ * Get applications in this queue
+ * @return applications in the queue
+ */
+ public List<Application> getApplications();
+
+ /**
+ * Submit a new application to the queue.
+ * @param application application being submitted
+ * @param user user who submitted the application
+ * @param queue queue to which the application is submitted
+ * @param priority application priority
+ */
+ public void submitApplication(Application application, String user,
+ String queue, Priority priority)
+ throws AccessControlException;
+
+ /**
+ * An application submitted to this queue has finished.
+ * @param application
+ * @param queue application queue
+ */
+ public void finishApplication(Application application, String queue)
+ throws AccessControlException;
+
+ /**
+ * Assign containers to applications in the queue or it's children (if any).
+ * @param cluster cluster resources
+ * @param node node on which resources are available
+ * @return
+ */
+ public Resource assignContainers(ClusterTracker cluster, NodeInfo node);
+
+ /**
+ * A container assigned to the queue has completed.
+ * @param cluster cluster resources
+ * @param container completed container
+ * @param application application to which the container was assigned
+ */
+ public void completedContainer(ClusterTracker cluster,
+ Container container, Application application);
+
+ /**
+ * Get the number of applications in the queue.
+ * @return number of applications
+ */
+ public int getNumApplications();
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,440 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+@LimitedPrivate("yarn")
+@Evolving
+public class FifoScheduler implements ResourceScheduler {
+
+ private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
+
+ Configuration conf;
+ private ContainerTokenSecretManager containerTokenSecretManager;
+ private final ClusterTracker clusterTracker;
+
+ // TODO: The memory-block size should be site-configurable?
+ public static final int MINIMUM_MEMORY = 1024;
+ private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
+ private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
+
+ public static final Resource MINIMUM_ALLOCATION =
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
+ MINIMUM_MEMORY);
+
+ Map<ApplicationID, Application> applications =
+ new TreeMap<ApplicationID, Application>(
+ new org.apache.hadoop.yarn.server.resourcemanager.resource.ApplicationID.Comparator());
+
+ private static final Queue DEFAULT_QUEUE = new Queue() {
+ @Override
+ public String getQueueName() {
+ return "default";
+ }
+ };
+
+ public FifoScheduler() {
+ this.clusterTracker = createClusterTracker();
+ }
+
+ public FifoScheduler(Configuration conf,
+ ContainerTokenSecretManager containerTokenSecretManager)
+ {
+ this();
+ reinitialize(conf, containerTokenSecretManager);
+ }
+
+ protected ClusterTracker createClusterTracker() {
+ return new ClusterTrackerImpl();
+ }
+
+ @Override
+ public void reinitialize(Configuration conf,
+ ContainerTokenSecretManager containerTokenSecretManager)
+ {
+ this.conf = conf;
+ this.containerTokenSecretManager = containerTokenSecretManager;
+ }
+
+ @Override
+ public synchronized List<Container> allocate(ApplicationID applicationId,
+ List<ResourceRequest> ask, List<Container> release)
+ throws IOException {
+ Application application = getApplication(applicationId);
+ if (application == null) {
+ LOG.error("Calling allocate on removed " +
+ "or non existant application " + applicationId);
+ return EMPTY_CONTAINER_LIST;
+ }
+ normalizeRequests(ask);
+
+ LOG.debug("allocate: pre-update" +
+ " applicationId=" + applicationId +
+ " application=" + application);
+ application.showRequests();
+
+ // Update application requests
+ application.updateResourceRequests(ask);
+
+ // Release containers
+ releaseContainers(application, release);
+
+ application.showRequests();
+
+ List<Container> allContainers = application.acquire();
+ LOG.debug("allocate:" +
+ " applicationId=" + applicationId +
+ " #ask=" + ask.size() +
+ " #release=" + release.size() +
+ " #allContainers=" + allContainers.size());
+ return allContainers;
+ }
+
+ private void releaseContainers(Application application, List<Container> release) {
+ application.releaseContainers(release);
+ for (Container container : release) {
+ clusterTracker.releaseContainer(application.getApplicationId(), container);
+ }
+ }
+
+ private void normalizeRequests(List<ResourceRequest> asks) {
+ for (ResourceRequest ask : asks) {
+ normalizeRequest(ask);
+ }
+ }
+
+ private void normalizeRequest(ResourceRequest ask) {
+ int memory = ask.capability.memory;
+ memory =
+ MINIMUM_MEMORY * ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY));
+ }
+
+ private synchronized Application getApplication(ApplicationID applicationId) {
+ return applications.get(applicationId);
+ }
+
+ @Override
+ public synchronized void addApplication(ApplicationID applicationId,
+ String user, String unusedQueue, Priority unusedPriority)
+ throws IOException {
+ applications.put(applicationId,
+ new Application(applicationId, DEFAULT_QUEUE, user));
+ LOG.info("Application Submission: " + applicationId.id + " from " + user +
+ ", currently active: " + applications.size());
+ }
+
+ @Override
+ public synchronized void removeApplication(ApplicationID applicationId)
+ throws IOException {
+ Application application = getApplication(applicationId);
+ if (application == null) {
+ throw new IOException("Unknown application " + applicationId +
+ " has completed!");
+ }
+
+ // Release current containers
+ releaseContainers(application, application.getCurrentContainers());
+
+ // Let the cluster know that the applications are done
+ clusterTracker.finishedApplication(applicationId,
+ application.getAllNodesForApplication());
+
+ // Remove the application
+ applications.remove(applicationId);
+ }
+
+ /**
+ * Heart of the scheduler...
+ *
+ * @param node node on which resources are available to be allocated
+ */
+ private synchronized void assignContainers(NodeInfo node) {
+ LOG.debug("assignContainers:" +
+ " node=" + node.getHostName() +
+ " #applications=" + applications.size());
+
+ // Try to assign containers to applications in fifo order
+ for (Map.Entry<ApplicationID, Application> e : applications.entrySet()) {
+ Application application = e.getValue();
+ LOG.debug("pre-assignContainers");
+ application.showRequests();
+ synchronized (application) {
+ for (Priority priority : application.getPriorities()) {
+ int maxContainers =
+ getMaxAllocatableContainers(application, priority, node,
+ NodeType.OFF_SWITCH);
+ // Ensure the application needs containers of this priority
+ if (maxContainers > 0) {
+ int assignedContainers =
+ assignContainersOnNode(node, application, priority);
+ // Do not assign out of order w.r.t priorities
+ if (assignedContainers == 0) {
+ break;
+ }
+ }
+ }
+ }
+ LOG.debug("post-assignContainers");
+ application.showRequests();
+
+ // Done
+ if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.lessThan(
+ node.getAvailableResource(), MINIMUM_ALLOCATION)) {
+ return;
+ }
+ }
+ }
+
+ private int getMaxAllocatableContainers(Application application,
+ Priority priority, NodeInfo node, NodeType type) {
+ ResourceRequest offSwitchRequest =
+ application.getResourceRequest(priority, NodeManager.ANY);
+ int maxContainers = offSwitchRequest.numContainers;
+
+ if (type == NodeType.OFF_SWITCH) {
+ return maxContainers;
+ }
+
+ if (type == NodeType.RACK_LOCAL) {
+ ResourceRequest rackLocalRequest =
+ application.getResourceRequest(priority, node.getRackName());
+ if (rackLocalRequest == null) {
+ return maxContainers;
+ }
+
+ maxContainers = Math.min(maxContainers, rackLocalRequest.numContainers);
+ }
+
+ if (type == NodeType.DATA_LOCAL) {
+ ResourceRequest nodeLocalRequest =
+ application.getResourceRequest(priority, node.getHostName());
+ if (nodeLocalRequest != null) {
+ maxContainers = Math.min(maxContainers, nodeLocalRequest.numContainers);
+ }
+ }
+
+ return maxContainers;
+ }
+
+
+ private int assignContainersOnNode(NodeInfo node,
+ Application application, Priority priority
+ ) {
+ // Data-local
+ int nodeLocalContainers =
+ assignNodeLocalContainers(node, application, priority);
+
+ // Rack-local
+ int rackLocalContainers =
+ assignRackLocalContainers(node, application, priority);
+
+ // Off-switch
+ int offSwitchContainers =
+ assignOffSwitchContainers(node, application, priority);
+
+
+ LOG.debug("assignContainersOnNode:" +
+ " node=" + node.getHostName() +
+ " application=" + application.getApplicationId().id +
+ " priority=" + priority.priority +
+ " #assigned=" +
+ (nodeLocalContainers + rackLocalContainers + offSwitchContainers));
+
+
+ return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
+ }
+
+ private int assignNodeLocalContainers(NodeInfo node,
+ Application application, Priority priority) {
+ int assignedContainers = 0;
+ ResourceRequest request =
+ application.getResourceRequest(priority, node.getHostName());
+ if (request != null) {
+ int assignableContainers =
+ Math.min(
+ getMaxAllocatableContainers(application, priority, node,
+ NodeType.DATA_LOCAL),
+ request.numContainers);
+ assignedContainers =
+ assignContainers(node, application, priority,
+ assignableContainers, request, NodeType.DATA_LOCAL);
+ }
+ return assignedContainers;
+ }
+
+ private int assignRackLocalContainers(NodeInfo node,
+ Application application, Priority priority) {
+ int assignedContainers = 0;
+ ResourceRequest request =
+ application.getResourceRequest(priority, node.getRackName());
+ if (request != null) {
+ int assignableContainers =
+ Math.min(
+ getMaxAllocatableContainers(application, priority, node,
+ NodeType.RACK_LOCAL),
+ request.numContainers);
+ assignedContainers =
+ assignContainers(node, application, priority,
+ assignableContainers, request, NodeType.RACK_LOCAL);
+ }
+ return assignedContainers;
+ }
+
+ private int assignOffSwitchContainers(NodeInfo node,
+ Application application, Priority priority) {
+ int assignedContainers = 0;
+ ResourceRequest request =
+ application.getResourceRequest(priority, NodeManager.ANY);
+ if (request != null) {
+ assignedContainers =
+ assignContainers(node, application, priority,
+ request.numContainers, request, NodeType.OFF_SWITCH);
+ }
+ return assignedContainers;
+ }
+
+ private int assignContainers(NodeInfo node, Application application,
+ Priority priority, int assignableContainers,
+ ResourceRequest request, NodeType type) {
+ LOG.debug("assignContainers:" +
+ " node=" + node.getHostName() +
+ " application=" + application.getApplicationId().id +
+ " priority=" + priority.priority +
+ " assignableContainers=" + assignableContainers +
+ " request=" + request + " type=" + type);
+ Resource capability = request.capability;
+
+ int availableContainers =
+ node.getAvailableResource().memory / capability.memory; // TODO: A buggy
+ // application
+ // with this
+ // zero would
+ // crash the
+ // scheduler.
+ int assignedContainers =
+ Math.min(assignableContainers, availableContainers);
+
+ if (assignedContainers > 0) {
+ List<Container> containers =
+ new ArrayList<Container>(assignedContainers);
+ for (int i=0; i < assignedContainers; ++i) {
+ Container container =
+ org.apache.hadoop.yarn.server.resourcemanager.resource.Container
+ .create(application.getApplicationId(),
+ application.getNewContainerId(),
+ node.getHostName(), capability);
+ // If security is enabled, send the container-tokens too.
+ if (UserGroupInformation.isSecurityEnabled()) {
+ ContainerToken containerToken = new ContainerToken();
+ ContainerTokenIdentifier tokenidentifier =
+ new ContainerTokenIdentifier(container.id,
+ container.hostName.toString(), container.resource);
+ containerToken.identifier =
+ ByteBuffer.wrap(tokenidentifier.getBytes());
+ containerToken.kind = ContainerTokenIdentifier.KIND.toString();
+ containerToken.password =
+ ByteBuffer.wrap(containerTokenSecretManager
+ .createPassword(tokenidentifier));
+ containerToken.service = container.hostName; // TODO: port
+ container.containerToken = containerToken;
+ }
+ containers.add(container);
+ }
+ application.allocate(type, node, priority, request, containers);
+ clusterTracker.addAllocatedContainers(node, application.getApplicationId(), containers);
+ }
+
+ return assignedContainers;
+ }
+
+ private synchronized void applicationCompletedContainers(
+ List<Container> completedContainers) {
+ for (Container c: completedContainers) {
+ Application app = applications.get(c.id.appID);
+ /** this is possible, since an application can be removed from scheduler but
+ * the nodemanger is just updating about a completed container.
+ */
+ if (app != null) {
+ app.completedContainer(c);
+ }
+ }
+ }
+
+ @Override
+ public synchronized NodeResponse nodeUpdate(NodeInfo node,
+ Map<CharSequence,List<Container>> containers ) {
+
+ NodeResponse nodeResponse = clusterTracker.nodeUpdate(node, containers);
+ applicationCompletedContainers(nodeResponse.getCompletedContainers());
+ LOG.info("Node heartbeat " + node.getNodeID() + " resource = " + node.getAvailableResource());
+ if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+ greaterThanOrEqual(node.getAvailableResource(), MINIMUM_ALLOCATION)) {
+ assignContainers(node);
+ }
+ LOG.info("Node after allocation " + node.getNodeID() + " resource = "
+ + node.getAvailableResource());
+
+ // TODO: Add the list of containers to be preempted when we support
+ // preemption.
+ return nodeResponse;
+ }
+
+ @Override
+ public NodeInfo addNode(NodeID nodeId,String hostName,
+ Node node, Resource capability) {
+ return clusterTracker.addNode(nodeId, hostName, node, capability);
+ }
+
+ @Override
+ public void removeNode(NodeInfo node) {
+ clusterTracker.removeNode(node);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,88 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Inject;
+
+import org.apache.hadoop.yarn.Application;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import static org.apache.hadoop.yarn.util.StringHelper.*;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+class AppsBlock extends HtmlBlock {
+ final AppsList list;
+
+ @Inject AppsBlock(AppsList list, ViewContext ctx) {
+ super(ctx);
+ this.list = list;
+ }
+
+ @Override public void render(Block html) {
+ TBODY<TABLE<Hamlet>> tbody = html.
+ table("#apps").
+ thead().
+ tr().
+ th(".id", "ID").
+ th(".user", "User").
+ th(".name", "Name").
+ th(".queue", "Queue").
+ th(".state", "State").
+ th(".progress", "Progress").
+ th(".master", "Master UI")._()._().
+ tbody();
+ int i = 0;
+ for (Application app : list.apps) {
+ String appId = Apps.toString(app.id());
+ CharSequence master = app.master();
+ String am = master == null ? "UNASSIGNED"
+ : join(master, ':', app.httpPort());
+ String percent = String.format("%.1f", app.status().progress * 100);
+ tbody.
+ tr().
+ td().
+ br().$title(String.valueOf(app.id().id))._(). // for sorting
+ a(url("app", appId), appId)._().
+ td(app.user().toString()).
+ td(app.name().toString()).
+ td(app.queue().toString()).
+ td(app.state().toString()).
+ td().
+ br().$title(percent)._(). // for sorting
+ div(_PROGRESSBAR).
+ $title(join(percent, '%')). // tooltip
+ div(_PROGRESSBAR_VALUE).
+ $style(join("width:", percent, '%'))._()._()._().
+ td().
+ a(master == null ? "#" : join("http://", am), am)._()._();
+ if (list.rendering != Render.HTML && ++i >= 20) break;
+ }
+ tbody._()._();
+
+ if (list.rendering == Render.JS_ARRAY) {
+ echo("<script type='text/javascript'>\n",
+ "var appsData=");
+ list.toDataTableArrays(writer());
+ echo("\n</script>\n");
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,82 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Inject;
+import com.google.inject.servlet.RequestScoped;
+
+import java.io.PrintWriter;
+import java.util.List;
+
+import static org.apache.commons.lang.StringEscapeUtils.*;
+import static org.apache.hadoop.yarn.util.StringHelper.*;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+import static org.apache.hadoop.yarn.webapp.view.Jsons.*;
+
+import org.apache.hadoop.yarn.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.ToJSON;
+import org.apache.hadoop.yarn.webapp.Controller.RequestContext;
+
+// So we only need to do asm.getApplications once in a request
+@RequestScoped
+class AppsList implements ToJSON {
+ final RequestContext rc;
+ final List<Application> apps;
+ Render rendering;
+
+ @Inject AppsList(RequestContext ctx, ApplicationsManager asm) {
+ rc = ctx;
+ apps = asm.getApplications();
+ }
+
+ void toDataTableArrays(PrintWriter out) {
+ out.append('[');
+ boolean first = true;
+ for (Application app : apps) {
+ if (first) {
+ first = false;
+ } else {
+ out.append(",\n");
+ }
+ String appID = Apps.toString(app.id());
+ CharSequence master = app.master();
+ String ui = master == null ? "UNASSIGNED"
+ : join(master, ':', app.httpPort());
+ out.append("[\"");
+ appendSortable(out, app.id().id);
+ appendLink(out, appID, rc.prefix(), "app", appID).append(_SEP).
+ append(escapeHtml(app.user().toString())).append(_SEP).
+ append(escapeHtml(app.name().toString())).append(_SEP).
+ append(app.state().toString()).append(_SEP);
+ appendProgressBar(out, app.status().progress).append(_SEP);
+ appendLink(out, ui, rc.prefix(), master == null ? "#" : "http://", ui).
+ append("\"]");
+ }
+ out.append(']');
+ }
+
+ @Override
+ public void toJSON(PrintWriter out) {
+ out.print("{\"aaData\":");
+ toDataTableArrays(out);
+ out.print("}\n");
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,179 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Inject;
+import com.google.inject.servlet.RequestScoped;
+
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Queue;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import static org.apache.hadoop.yarn.util.StringHelper.*;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+class CapacitySchedulerPage extends RmView {
+ static final String _Q = ".ui-state-default.ui-corner-all";
+ static final float WIDTH_F = 0.8f;
+ static final String Q_END = "left:101%";
+ static final String OVER = "font-size:1px;background:rgba(255, 140, 0, 0.8)";
+ static final String UNDER = "font-size:1px;background:rgba(50, 205, 50, 0.8)";
+ static final float EPSILON = 1e-8f;
+
+ @RequestScoped
+ static class Parent {
+ Queue queue;
+ }
+
+ public static class QueueBlock extends HtmlBlock {
+ final Parent parent;
+
+ @Inject QueueBlock(Parent parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void render(Block html) {
+ UL<Hamlet> ul = html.ul();
+ Queue parentQueue = parent.queue;
+ for (Queue queue : parentQueue.getChildQueues()) {
+ float used = queue.getUsedCapacity();
+ float set = queue.getCapacity();
+ float delta = Math.abs(set - used) + 0.001f;
+ float max = queue.getMaximumCapacity();
+ if (max < EPSILON) max = 1f;
+ String absMaxPct = percent(queue.getAbsoluteMaximumCapacity());
+ LI<UL<Hamlet>> li = ul.
+ li().
+ a(_Q).$style(width(max * WIDTH_F)).
+ $title(join("used:", percent(used), " set:", percent(set),
+ " max:", percent(max))).
+ //span().$style(Q_END)._(absMaxPct)._().
+ span().$style(join(width(delta/max), ';',
+ used > set ? OVER : UNDER, ';',
+ used > set ? left(set/max) : left(used/max)))._('.')._().
+ span(".q", queue.getQueuePath().substring(5))._();
+ if (queue instanceof ParentQueue) {
+ parent.queue = queue;
+ li.
+ _(QueueBlock.class);
+ }
+ li._();
+ }
+ ul._();
+ }
+ }
+
+ static class QueuesBlock extends HtmlBlock {
+ final CapacityScheduler cs;
+ final Parent parent;
+
+ @Inject QueuesBlock(ResourceManager rm, Parent parent) {
+ cs = (CapacityScheduler) rm.getResourceScheduler();
+ this.parent = parent;
+ }
+
+ @Override
+ public void render(Block html) {
+ UL<DIV<DIV<Hamlet>>> ul = html.
+ div("#cs-wrapper.ui-widget").
+ div(".ui-widget-header.ui-corner-top").
+ _("Application Queues")._().
+ div("#cs.ui-widget-content.ui-corner-bottom").
+ ul();
+ if (cs == null) {
+ ul.
+ li().
+ a(_Q).$style(width(WIDTH_F)).
+ span().$style(Q_END)._("100% ")._().
+ span(".q", "default")._()._();
+ } else {
+ Queue root = cs.getRootQueue();
+ parent.queue = root;
+ float used = root.getUsedCapacity();
+ float set = root.getCapacity();
+ float delta = Math.abs(set - used) + 0.001f;
+ ul.
+ li().
+ a(_Q).$style(width(WIDTH_F)).
+ $title(join("used:", percent(used))).
+ span().$style(Q_END)._("100%")._().
+ span().$style(join(width(delta), ';', used > set ? OVER : UNDER,
+ ';', used > set ? left(set) : left(used)))._(".")._().
+ span(".q", "root")._().
+ _(QueueBlock.class)._();
+ }
+ ul._()._().
+ script().$type("text/javascript").
+ _("$('#cs').hide();")._()._().
+ _(AppsBlock.class);
+ }
+ }
+
+ @Override protected void postHead(Page.HTML<_> html) {
+ html.
+ style().$type("text/css").
+ _("#cs { padding: 0.5em 0 1em 0; margin-bottom: 1em; position: relative }",
+ "#cs ul { list-style: none }",
+ "#cs a { font-weight: normal; margin: 2px; position: relative }",
+ "#cs a span { font-weight: normal; font-size: 80% }",
+ "#cs-wrapper .ui-widget-header { padding: 0.2em 0.5em }")._().
+ script("/static/jt/jquery.jstree.js").
+ script().$type("text/javascript").
+ _("$(function() {",
+ " $('#cs a span').addClass('ui-corner-all').css('position', 'absolute');",
+ " $('#cs').bind('loaded.jstree', function (e, data) {",
+ " data.inst.open_all(); }).",
+ " jstree({",
+ " core: { animation: 188, html_titles: true },",
+ " plugins: ['themeroller', 'html_data', 'ui'],",
+ " themeroller: { item_open: 'ui-icon-minus',",
+ " item_clsd: 'ui-icon-plus', item_leaf: 'ui-icon-gear'",
+ " }",
+ " });",
+ " $('#cs').bind('select_node.jstree', function(e, data) {",
+ " var q = $('.q', data.rslt.obj).first().text();",
+ " if (q == 'root') q = '';",
+ " $('#apps').dataTable().fnFilter(q, 3);",
+ " });",
+ " $('#cs').show();",
+ "});")._();
+ }
+
+ @Override protected Class<? extends SubView> content() {
+ return QueuesBlock.class;
+ }
+
+ static String percent(float f) {
+ return String.format("%.1f%%", f * 100);
+ }
+
+ static String width(float f) {
+ return String.format("width:%.1f%%", f * 100);
+ }
+
+ static String left(float f) {
+ return String.format("left:%.1f%%", f * 100);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+class DefaultSchedulerPage extends RmView {
+
+ static class QueueBlock extends HtmlBlock {
+ @Override public void render(Block html) {
+ html.h2("Under construction");
+ }
+ }
+
+ @Override protected Class<? extends SubView> content() {
+ return QueueBlock.class;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/InfoPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/InfoPage.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/InfoPage.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/InfoPage.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.view.InfoBlock;
+
+public class InfoPage extends RmView {
+
+ @Override protected void preHead(Page.HTML<_> html) {
+ commonPreHead(html);
+ }
+
+ @Override protected Class<? extends SubView> content() {
+ return InfoBlock.class;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+public class NavBlock extends HtmlBlock {
+
+ @Override public void render(Block html) {
+ html.
+ div("#nav").
+ h3("Cluster").
+ ul().
+ li().a(url("cluster"), "About")._().
+ li().a(url("nodes"), "Nodes")._().
+ li().a(url("apps"), "Applications")._().
+ li().a(url("scheduler"), "Scheduler")._()._().
+ h3("Tools").
+ ul().
+ li().a("/conf", "Configuration")._().
+ li().a("/logs", "Local logs")._().
+ li().a("/stacks", "Server stacks")._().
+ li().a("/metrics", "Server metrics")._()._()._().
+ div("#themeswitcher")._();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,86 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Inject;
+
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ResourceContext;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+class NodesPage extends RmView {
+
+ static class NodesBlock extends HtmlBlock {
+ final ResourceContext resource;
+
+ @Inject
+ NodesBlock(ResourceContext rc, ViewContext ctx) {
+ super(ctx);
+ resource = rc;
+ }
+
+ @Override
+ protected void render(Block html) {
+ TBODY<TABLE<Hamlet>> tbody = html.table("#nodes").
+ thead().
+ tr().
+ th(".rack", "Rack").
+ th(".nodeid", "Node ID").
+ th(".host", "Host").
+ th(".containers", "Containers").
+ th(".mem", "Mem Used (MB)").
+ th(".mem", "Mem Avail (MB)")._()._().
+ tbody();
+ for (NodeInfo ni : resource.getAllNodeInfo()) {
+ tbody.tr().
+ td(ni.getRackName()).
+ td(String.valueOf(ni.getNodeID().id)).
+ td(ni.getHostName()).
+ td(String.valueOf(ni.getNumContainers())).
+ td(String.valueOf(ni.getUsedResource().memory)).
+ td(String.valueOf(ni.getAvailableResource().memory))._();
+ }
+ tbody._()._();
+ }
+ }
+
+ @Override protected void preHead(Page.HTML<_> html) {
+ commonPreHead(html);
+ setTitle("Nodes of the cluster");
+ set(DATATABLES_ID, "nodes");
+ set(initID(DATATABLES, "nodes"), nodesTableInit());
+ setTableStyles(html, "nodes");
+ }
+
+ @Override protected Class<? extends SubView> content() {
+ return NodesBlock.class;
+ }
+
+ private String nodesTableInit() {
+ return tableInit().
+ // rack, nodeid, host, containers, memused, memavail
+ append(", aoColumns:[null, null, null, {bSearchable:false}, ").
+ append("{bSearchable:false}, {bSearchable:false}]}").toString();
+ }
+}