You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/04/29 04:16:26 UTC
svn commit: r1097670 - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/re...
Author: acmurthy
Date: Fri Apr 29 02:16:26 2011
New Revision: 1097670
URL: http://svn.apache.org/viewvc?rev=1097670&view=rev
Log:
Added support High-RAM applications in CapacityScheduler.
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1097670&r1=1097669&r2=1097670&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Fri Apr 29 02:16:26 2011
@@ -3,6 +3,9 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+
+ Added support High-RAM applications in CapacityScheduler. (acmurthy)
+
Recovery of MR Application Master from failures. (sharad)
Add ACLs for queues and command-line utilities for viewing them.
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1097670&r1=1097669&r2=1097670&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Apr 29 02:16:26 2011
@@ -425,7 +425,7 @@ public abstract class TaskAttemptImpl im
memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
}
- return 1024; //FIXME: why not "return memory;" ?
+ return memory;
}
private static LocalResource getLocalResource(FileContext fc, Path file,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1097670&r1=1097669&r2=1097670&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Fri Apr 29 02:16:26 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -75,6 +76,17 @@ public class Application {
Set<NodeInfo> applicationOnNodes = new HashSet<NodeInfo>();
ApplicationMaster master;
+ /* Reserved containers */
+ private final Comparator<NodeManager> nodeComparator =
+ new Comparator<NodeManager>() {
+ @Override
+ public int compare(NodeManager o1, NodeManager o2) {
+ return o1.getNodeID().getId() - o2.getNodeID().getId();
+ }
+ };
+ final Map<Priority, Set<NodeManager>> reservedContainers =
+ new HashMap<Priority, Set<NodeManager>>();
+
public Application(ApplicationId applicationId, ApplicationMaster master,
Queue queue, String user) {
this.applicationId = applicationId;
@@ -341,4 +353,42 @@ public class Application {
return application;
}
+
+ public synchronized int getReservedContainers(Priority priority) {
+ Set<NodeManager> reservedNodes = this.reservedContainers.get(priority);
+ return (reservedNodes == null) ? 0 : reservedNodes.size();
+ }
+
+ public synchronized void reserveResource(NodeManager node, Priority priority,
+ Resource resource) {
+ Set<NodeManager> reservedNodes = this.reservedContainers.get(priority);
+ if (reservedNodes == null) {
+ reservedNodes = new TreeSet<NodeManager>(nodeComparator);
+ reservedContainers.put(priority, reservedNodes);
+ }
+ reservedNodes.add(node);
+ LOG.info("Application " + applicationId + " reserved " + resource +
+ " on node " + node + ", currently has " + reservedNodes.size() +
+ " at priority " + priority);
+ }
+
+ public synchronized void unreserveResource(NodeManager node, Priority priority) {
+ Set<NodeManager> reservedNodes = reservedContainers.get(priority);
+ reservedNodes.remove(node);
+ if (reservedNodes.isEmpty()) {
+ this.reservedContainers.remove(priority);
+ }
+
+ LOG.info("Application " + applicationId + " unreserved " +
+ " on node " + node + ", currently has " + reservedNodes.size() +
+ " at priority " + priority);
+ }
+
+ public synchronized boolean isReserved(NodeManager node, Priority priority) {
+ Set<NodeManager> reservedNodes = reservedContainers.get(priority);
+ if (reservedNodes != null) {
+ return reservedNodes.contains(node);
+ }
+ return false;
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java?rev=1097670&r1=1097669&r2=1097670&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java Fri Apr 29 02:16:26 2011
@@ -5,6 +5,8 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
@@ -23,4 +25,10 @@ public interface NodeManager extends Nod
void notifyFinishedApplication(ApplicationId applicationId);
+ Application getReservedApplication();
+
+ void reserveResource(Application application, Priority priority,
+ Resource resource);
+
+ void unreserveResource(Application application, Priority priority);
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1097670&r1=1097669&r2=1097670&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java Fri Apr 29 02:16:26 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -341,10 +342,57 @@ public class NodeManagerImpl implements
}
}
+ private Application reservedApplication = null;
+
+ @Override
+ public synchronized void reserveResource(
+ Application application, Priority priority, Resource resource) {
+ // Check if it's already reserved
+ if (reservedApplication != null) {
+
+ // Cannot reserve more than one application on a given node!
+ if (!reservedApplication.applicationId.equals(application.applicationId)) {
+ throw new IllegalStateException("Trying to reserve resource " + resource +
+ " for application " + application.getApplicationId() +
+ " when currently reserved resource " + resource +
+ " for application " + reservedApplication.getApplicationId() +
+ " on node " + this);
+ }
+
+ LOG.info("Updated reserved resource " + resource + " on node " +
+ this + " for application " + application);
+ } else {
+ this.reservedApplication = application;
+ LOG.info("Reserved resource " + resource + " on node " + this +
+ " for application " + application);
+ }
+ }
+
+ @Override
+ public synchronized void unreserveResource(Application application,
+ Priority priority) {
+ // Cannot unreserve for wrong application...
+ if (!reservedApplication.applicationId.equals(application.applicationId)) {
+ throw new IllegalStateException("Trying to unreserve " +
+ " for application " + application.getApplicationId() +
+ " when currently reserved " +
+ " for application " + reservedApplication.getApplicationId() +
+ " on node " + this);
+ }
+
+ this.reservedApplication = null;
+ }
+
+ @Override
+ public synchronized Application getReservedApplication() {
+ return reservedApplication;
+ }
+
@Override
public String toString() {
return "host: " + getNodeAddress() + " #containers=" + getNumContainers() +
" available=" + getAvailableResource().getMemory() +
" used=" + getUsedResource().getMemory();
}
+
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1097670&r1=1097669&r2=1097670&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Apr 29 02:16:26 2011
@@ -361,8 +361,28 @@ implements ResourceScheduler, CapacitySc
// Completed containers
processCompletedContainers(nodeResponse.getCompletedContainers());
NodeManager nm = nodes.get(node.getNodeAddress());
+
// Assign new containers
- root.assignContainers(clusterResource, nm);
+ // 1. Check for reserved applications
+ // 2. Schedule if there are no reservations
+
+ Application reservedApplication = nm.getReservedApplication();
+ if (reservedApplication != null) {
+ // Try to fulfill the reservation
+ LOG.info("Trying to fulfill reservation for application " +
+ reservedApplication.getApplicationId() + " on node: " + node);
+ LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
+ queue.assignContainers(clusterResource, nm);
+ }
+
+ // Try to schedule more if there are no reservations to fulfill
+ if (nm.getReservedApplication() == null) {
+ root.assignContainers(clusterResource, nm);
+ } else {
+ LOG.info("Skipping scheduling since node " + node +
+ " is reserved by application " +
+ nm.getReservedApplication().getApplicationId());
+ }
return nodeResponse;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1097670&r1=1097669&r2=1097670&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Fri Apr 29 02:16:26 2011
@@ -480,6 +480,13 @@ public class LeafQueue implements Queue
" node=" + node.getNodeAddress() +
" #applications=" + applications.size());
+ // Check for reserved resources
+ Application reservedApplication = node.getReservedApplication();
+ if (reservedApplication != null) {
+ return assignReservedContainers(reservedApplication, node,
+ clusterResource);
+ }
+
// Try to assign containers to applications in fifo order
for (Application application : applications) {
@@ -540,6 +547,23 @@ public class LeafQueue implements Queue
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
}
+ private synchronized Resource assignReservedContainers(Application application,
+ NodeManager node, Resource clusterResource) {
+ synchronized (application) {
+ for (Priority priority : application.getPriorities()) {
+
+ // Do we reserve containers at this 'priority'?
+ if (application.isReserved(node, priority)) {
+ assignContainersOnNode(clusterResource, node, application, priority);
+ }
+ }
+ }
+
+ // Doesn't matter... since it's already charged for at time of reservation
+ // "re-reservation" is *free*
+ return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ }
+
private synchronized boolean assignToQueue(Resource clusterResource,
Resource required) {
float newUtilization =
@@ -622,7 +646,9 @@ public class LeafQueue implements Queue
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, NodeManager.ANY);
- return (offSwitchRequest.getNumContainers() > 0);
+ int requiredContainers = offSwitchRequest.getNumContainers();
+ int reservedContainers = application.getReservedContainers(priority);
+ return ((requiredContainers - reservedContainers) > 0);
}
Resource assignContainersOnNode(Resource clusterResource, NodeManager node,
@@ -739,57 +765,106 @@ public class LeafQueue implements Queue
" request=" + request + " type=" + type);
Resource capability = request.getCapability();
- int availableContainers =
- node.getAvailableResource().getMemory() / capability.getMemory(); // TODO: A buggy
+ Resource available = node.getAvailableResource();
+
+ if (available.getMemory() > 0) {
+
+ int availableContainers =
+ available.getMemory() / capability.getMemory(); // TODO: A buggy
// application
// with this
// zero would
// crash the
// scheduler.
- if (availableContainers > 0) {
- List<Container> containers =
- new ArrayList<Container>();
- Container container =
+ if (availableContainers > 0) {
+
+
+ List<Container> containers =
+ new ArrayList<Container>();
+ Container container =
org.apache.hadoop.yarn.server.resourcemanager.resource.Container
- .create(recordFactory, application.getApplicationId(),
- application.getNewContainerId(), node.getNodeAddress(),
- node.getHttpAddress(), capability);
-
- // If security is enabled, send the container-tokens too.
- if (UserGroupInformation.isSecurityEnabled()) {
- ContainerToken containerToken = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerToken.class);
- ContainerTokenIdentifier tokenidentifier =
- new ContainerTokenIdentifier(container.getId(),
- container.getContainerManagerAddress(), container.getResource());
- containerToken.setIdentifier(ByteBuffer.wrap(tokenidentifier.getBytes()));
- containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
- containerToken.setPassword(ByteBuffer.wrap(containerTokenSecretManager
+ .create(recordFactory, application.getApplicationId(),
+ application.getNewContainerId(), node.getNodeAddress(),
+ node.getHttpAddress(), capability);
+
+ // If security is enabled, send the container-tokens too.
+ if (UserGroupInformation.isSecurityEnabled()) {
+ ContainerToken containerToken = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerToken.class);
+ ContainerTokenIdentifier tokenidentifier =
+ new ContainerTokenIdentifier(container.getId(),
+ container.getContainerManagerAddress(), container.getResource());
+ containerToken.setIdentifier(ByteBuffer.wrap(tokenidentifier.getBytes()));
+ containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
+ containerToken.setPassword(ByteBuffer.wrap(containerTokenSecretManager
.createPassword(tokenidentifier)));
- containerToken.setService(container.getContainerManagerAddress());
- container.setContainerToken(containerToken);
- }
-
- containers.add(container);
+ containerToken.setService(container.getContainerManagerAddress());
+ container.setContainerToken(containerToken);
+ }
- // Allocate container to the application
- application.allocate(type, node, priority, request, containers);
-
- node.allocateContainer(application.getApplicationId(), containers);
-
- LOG.info("allocatedContainer" +
- " container=" + container +
- " queue=" + this.toString() +
- " util=" + getUtilization() +
- " used=" + usedResources +
- " cluster=" + clusterResource);
+ containers.add(container);
- return container.getResource();
+ // Allocate
+ allocate(application, type, priority, request, node, containers);
+
+ // Did we previously reserve containers at this 'priority'?
+ if (application.isReserved(node, priority)){
+ unreserve(application, priority, node);
+ }
+
+ LOG.info("allocatedContainer" +
+ " application=" + application.getApplicationId() +
+ " container=" + container +
+ " queue=" + this.toString() +
+ " util=" + getUtilization() +
+ " used=" + usedResources +
+ " cluster=" + clusterResource);
+
+ return container.getResource();
+ } else {
+ // Reserve by 'charging' in advance...
+ reserve(application, priority, node, request.getCapability());
+
+ LOG.info("Reserved container " +
+ " application=" + application.getApplicationId() +
+ " resource=" + request.getCapability() +
+ " queue=" + this.toString() +
+ " util=" + getUtilization() +
+ " used=" + usedResources +
+ " cluster=" + clusterResource);
+
+ return request.getCapability();
+
+ }
}
-
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
}
+ private void allocate(Application application, NodeType type,
+ Priority priority, ResourceRequest request,
+ NodeManager node, List<Container> containers) {
+ // Allocate container to the application
+ application.allocate(type, node, priority, request, containers);
+
+ // Inform the NodeManager about the allocation
+ node.allocateContainer(application.getApplicationId(), containers);
+ }
+
+ private void reserve(Application application, Priority priority,
+ NodeManager node, Resource resource) {
+ application.reserveResource(node, priority, resource);
+ node.reserveResource(application, priority, resource);
+ }
+
+ private void unreserve(Application application, Priority priority,
+ NodeManager node) {
+ // Done with the reservation?
+ if (application.isReserved(node, priority)) {
+ application.unreserveResource(node, priority);
+ node.unreserveResource(application, priority);
+ }
+ }
+
@Override
public void completedContainer(Resource clusterResource,
Container container, Application application) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1097670&r1=1097669&r2=1097670&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Fri Apr 29 02:16:26 2011
@@ -525,7 +525,7 @@ public class ParentQueue implements Queu
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
assigned, assignedToChild);
- LOG.info("completedContainer" +
+ LOG.info("assignedContainer" +
" queue=" + getQueueName() +
" util=" + getUtilization() +
" used=" + usedResources +
@@ -553,8 +553,8 @@ public class ParentQueue implements Queu
return (getUtilization() < absoluteMaxCapacity);
}
- private boolean canAssign(NodeInfo node) {
- return
+ private boolean canAssign(NodeManager node) {
+ return (node.getReservedApplication() == null) &&
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThanOrEqual(
node.getAvailableResource(),
minimumAllocation);