You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/05/02 10:26:07 UTC
svn commit: r1098508 - in /hadoop/mapreduce/branches/MR-279: ./
yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/ha...
Author: acmurthy
Date: Mon May 2 08:26:06 2011
New Revision: 1098508
URL: http://svn.apache.org/viewvc?rev=1098508&view=rev
Log:
Implement 'delay scheduling' for better locality in CapacityScheduler and improved high-ram applications.
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1098508&r1=1098507&r2=1098508&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May 2 08:26:06 2011
@@ -3,6 +3,10 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+
+ Implement 'delay scheduling' for better locality in CapacityScheduler and
+ improved high-ram applications. (acmurthy)
+
MAPREDUCE-2462. Write job conf along with JobHistory, other minor improvements.
(Siddharth Seth via sharad)
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1098508&r1=1098507&r2=1098508&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Mon May 2 08:26:06 2011
@@ -68,6 +68,9 @@ public class Application {
final Resource currentConsumption = recordFactory.newRecordInstance(Resource.class);
final Resource overallConsumption = recordFactory.newRecordInstance(Resource.class);
+ Map<Priority, Integer> schedulingOpportunities =
+ new HashMap<Priority, Integer>();
+
/* Current consumption */
List<Container> acquired = new ArrayList<Container>();
List<Container> completedContainers = new ArrayList<Container>();
@@ -295,9 +298,18 @@ public class Application {
// Update future requirements
nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - containers.size());
+ if (nodeLocalRequest.getNumContainers() == 0) {
+ this.requests.get(priority).remove(node.getNodeAddress());
+ }
+
ResourceRequest rackLocalRequest =
requests.get(priority).get(node.getRackName());
rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - containers.size());
+ if (rackLocalRequest.getNumContainers() == 0) {
+ this.requests.get(priority).remove(node.getRackName());
+ }
+
+ // Do not remove ANY
ResourceRequest offSwitchRequest =
requests.get(priority).get(NodeManager.ANY);
offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - containers.size());
@@ -317,10 +329,15 @@ public class Application {
// Update future requirements
rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - containers.size());
+ if (rackLocalRequest.getNumContainers() == 0) {
+ this.requests.get(priority).remove(node.getRackName());
+ }
+
+ // Do not remove ANY
ResourceRequest offSwitchRequest =
requests.get(priority).get(NodeManager.ANY);
offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - containers.size());
- }
+}
/**
* The {@link ResourceScheduler} is allocating data-local resources
@@ -335,7 +352,10 @@ public class Application {
allocate(containers);
// Update future requirements
- offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - containers.size());
+
+ // Do not remove ANY
+ offSwitchRequest.setNumContainers(
+ offSwitchRequest.getNumContainers() - containers.size());
}
synchronized private void allocate(List<Container> containers) {
@@ -350,6 +370,37 @@ public class Application {
}
}
+ synchronized public void resetSchedulingOpportunities(Priority priority) {
+ Integer schedulingOpportunities = this.schedulingOpportunities.get(priority);
+ schedulingOpportunities = 0;
+ this.schedulingOpportunities.put(priority, schedulingOpportunities);
+ }
+
+ synchronized public void addSchedulingOpportunity(Priority priority) {
+ Integer schedulingOpportunities = this.schedulingOpportunities.get(priority);
+ if (schedulingOpportunities == null) {
+ schedulingOpportunities = 0;
+ }
+ ++schedulingOpportunities;
+ this.schedulingOpportunities.put(priority, schedulingOpportunities);
+ }
+
+ synchronized public int getSchedulingOpportunities(Priority priority) {
+ Integer schedulingOpportunities = this.schedulingOpportunities.get(priority);
+ if (schedulingOpportunities == null) {
+ schedulingOpportunities = 0;
+ this.schedulingOpportunities.put(priority, schedulingOpportunities);
+ }
+ return schedulingOpportunities;
+ }
+
+ private static final int OVERRIDE = 1000000;
+ synchronized public void overrideSchedulingOpportunities(Priority priority) {
+ Integer schedulingOpportunities = this.schedulingOpportunities.get(priority);
+ schedulingOpportunities = OVERRIDE;
+ this.schedulingOpportunities.put(priority, schedulingOpportunities);
+ }
+
synchronized public void showRequests() {
for (Priority priority : getPriorities()) {
Map<String, ResourceRequest> requests = getResourceRequests(priority);
@@ -426,6 +477,12 @@ public class Application {
return false;
}
+ public float getLocalityWaitFactor(Priority priority, int clusterNodes) {
+ // Estimate: Required unique resources (i.e. hosts + racks)
+ int requiredResources = Math.max(this.requests.get(priority).size()-1, 1);
+ return ((float)requiredResources / clusterNodes);
+ }
+
synchronized public void finish() {
// GC pending resources metrics
QueueMetrics metrics = queue.getMetrics();
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1098508&r1=1098507&r2=1098508&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon May 2 08:26:06 2011
@@ -95,7 +95,10 @@ implements ResourceScheduler, CapacitySc
private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
-
+ private Resource clusterResource =
+ RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
+ private int numNodeManagers = 0;
+
private Resource minimumAllocation;
private Map<ApplicationId, Application> applications =
@@ -123,6 +126,10 @@ implements ResourceScheduler, CapacitySc
return minimumAllocation;
}
+ public synchronized int getNumClusterNodes() {
+ return numNodeManagers;
+ }
+
@Override
public synchronized void reinitialize(Configuration conf,
ContainerTokenSecretManager containerTokenSecretManager, ClusterTracker clusterTracker)
@@ -393,7 +400,13 @@ implements ResourceScheduler, CapacitySc
LOG.info("Trying to fulfill reservation for application " +
reservedApplication.getApplicationId() + " on node: " + nm);
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
- queue.assignContainers(clusterResource, nm);
+ Resource released = queue.assignContainers(clusterResource, nm);
+
+ // Is the reservation necessary? If not, release the reservation
+ if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+ released, org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ queue.completedContainer(clusterResource, null, released, reservedApplication);
+ }
}
// Try to schedule more if there are no reservations to fulfill
@@ -418,7 +431,8 @@ implements ResourceScheduler, CapacitySc
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
- queue.completedContainer(clusterResource, container, application);
+ queue.completedContainer(clusterResource, container,
+ container.getResource(), application);
}
}
}
@@ -480,21 +494,21 @@ implements ResourceScheduler, CapacitySc
break;
}
}
- private Resource clusterResource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
-
public synchronized Resource getClusterResource() {
return clusterResource;
}
@Override
- public synchronized void removeNode(NodeInfo nodeInfo) {
- Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+ public synchronized void addNode(NodeInfo nodeManager) {
+ Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+ ++numNodeManagers;
}
@Override
- public synchronized void addNode(NodeInfo nodeManager) {
- Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+ public synchronized void removeNode(NodeInfo nodeInfo) {
+ Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+ --numNodeManagers;
}
public synchronized boolean releaseContainer(ApplicationId applicationId,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1098508&r1=1098507&r2=1098508&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Mon May 2 08:26:06 2011
@@ -30,4 +30,6 @@ public interface CapacitySchedulerContex
Resource getMinimumAllocation();
ContainerTokenSecretManager getContainerTokenSecretManager();
+
+ int getNumClusterNodes();
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1098508&r1=1098507&r2=1098508&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon May 2 08:26:06 2011
@@ -100,9 +100,12 @@ public class LeafQueue implements Queue
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
+ private CapacitySchedulerContext scheduler;
+
public LeafQueue(CapacitySchedulerContext cs,
String queueName, Queue parent,
Comparator<Application> applicationComparator, Queue old) {
+ this.scheduler = cs;
this.queueName = queueName;
this.parent = parent;
// must be after parent and queueName are initialized
@@ -491,7 +494,7 @@ public class LeafQueue implements Queue
LOG.info("DEBUG --- assignContainers:" +
" node=" + node.getNodeAddress() +
" #applications=" + applications.size());
-
+
// Check for reserved resources
Application reservedApplication = node.getReservedApplication();
if (reservedApplication != null) {
@@ -529,10 +532,14 @@ public class LeafQueue implements Queue
}
}
-
+ // Inform the application it is about to get a scheduling opportunity
+ application.addSchedulingOpportunity(priority);
+
+ // Try to schedule
Resource assigned =
assignContainersOnNode(clusterResource, node, application, priority);
+ // Did we schedule or reserve a container?
if (Resources.greaterThan(assigned, Resources.none())) {
Resource assignedResource =
application.getResourceRequest(priority, NodeManager.ANY).getCapability();
@@ -540,7 +547,10 @@ public class LeafQueue implements Queue
// Book-keeping
allocateResource(clusterResource,
application.getUser(), assignedResource);
-
+
+ // Reset scheduling opportunities
+ application.resetSchedulingOpportunities(priority);
+
// Done
return assignedResource;
} else {
@@ -565,6 +575,17 @@ public class LeafQueue implements Queue
// Do we reserve containers at this 'priority'?
if (application.isReserved(node, priority)) {
+
+ // Do we really need this reservation still?
+ ResourceRequest offSwitchRequest =
+ application.getResourceRequest(priority, NodeManager.ANY);
+ if (offSwitchRequest.getNumContainers() == 0) {
+ // Release
+ unreserve(application, priority, node);
+ return offSwitchRequest.getCapability();
+ }
+
+ // Try to assign if we have sufficient resources
assignContainersOnNode(clusterResource, node, application, priority);
}
}
@@ -572,7 +593,7 @@ public class LeafQueue implements Queue
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
- return Resources.none();
+ return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
}
private synchronized boolean assignToQueue(Resource clusterResource,
@@ -680,7 +701,8 @@ public class LeafQueue implements Queue
assigned = assignRackLocalContainers(clusterResource, node, application, priority);
if (Resources.greaterThan(assigned, Resources.none())) {
return assigned;
- }
+ }
+
// Off-switch
return assignOffSwitchContainers(clusterResource, node, application, priority);
}
@@ -737,7 +759,28 @@ public class LeafQueue implements Queue
}
if (type == NodeType.OFF_SWITCH) {
- return offSwitchRequest.getNumContainers() > 0;
+ // 'Delay' off-switch
+ long missedNodes = application.getSchedulingOpportunities(priority);
+ long requiredContainers = offSwitchRequest.getNumContainers();
+
+ float localityWaitFactor =
+ application.getLocalityWaitFactor(priority,
+ scheduler.getNumClusterNodes());
+
+ if (requiredContainers > 0) {
+ if ((requiredContainers * localityWaitFactor) < missedNodes) {
+ LOG.info("Application " + application.getApplicationId() +
+ " has missed " + missedNodes + " opportunities," +
+ " waitFactor= " + localityWaitFactor +
+ " for cluster of size " + scheduler.getNumClusterNodes());
+ return false;
+ }
+ return true;
+ }
+ return false;
+
+// return ((requiredContainers > 0) &&
+// (requiredContainers * localityWaitFactor) < missedNodes));
}
if (type == NodeType.RACK_LOCAL) {
@@ -761,7 +804,6 @@ public class LeafQueue implements Queue
return false;
}
-
private Resource assignContainer(Resource clusterResource, NodeInfo node,
Application application,
Priority priority, ResourceRequest request, NodeType type) {
@@ -771,22 +813,21 @@ public class LeafQueue implements Queue
" priority=" + priority.getPriority() +
" request=" + request + " type=" + type);
Resource capability = request.getCapability();
-
+
Resource available = node.getAvailableResource();
if (available.getMemory() > 0) {
-
+
int availableContainers =
available.getMemory() / capability.getMemory(); // TODO: A buggy
- // application
- // with this
- // zero would
- // crash the
- // scheduler.
-
+ // application
+ // with this
+ // zero would
+ // crash the
+ // scheduler.
+
if (availableContainers > 0) {
-
List<Container> containers =
new ArrayList<Container>();
Container container =
@@ -818,7 +859,7 @@ public class LeafQueue implements Queue
if (application.isReserved(node, priority)){
unreserve(application, priority, node);
}
-
+
LOG.info("allocatedContainer" +
" application=" + application.getApplicationId() +
" container=" + container +
@@ -831,7 +872,7 @@ public class LeafQueue implements Queue
} else {
// Reserve by 'charging' in advance...
reserve(application, priority, node, request.getCapability());
-
+
LOG.info("Reserved container " +
" application=" + application.getApplicationId() +
" resource=" + request.getCapability() +
@@ -873,29 +914,36 @@ public class LeafQueue implements Queue
}
}
+
@Override
public void completedContainer(Resource clusterResource,
- Container container, Application application) {
+ Container container, Resource allocatedResource, Application application) {
if (application != null) {
// Careful! Locking order is important!
synchronized (this) {
- // Inform the application
- application.completedContainer(container);
-
+
+ // Inform the application iff this was an allocated container,
+ // as opposed to an unfulfilled reservation
+ if (container != null) {
+ application.completedContainer(container);
+ }
+
// Book-keeping
releaseResource(clusterResource,
application.getUser(), container.getResource());
LOG.info("completedContainer" +
" container=" + container +
- " queue=" + this +
+ " resource=" + allocatedResource +
+ " queue=" + this +
" util=" + getUtilization() +
" used=" + usedResources +
" cluster=" + clusterResource);
}
// Inform the parent queue
- parent.completedContainer(clusterResource, container, application);
+ parent.completedContainer(clusterResource, container,
+ allocatedResource, application);
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1098508&r1=1098507&r2=1098508&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Mon May 2 08:26:06 2011
@@ -606,12 +606,13 @@ public class ParentQueue implements Queu
@Override
public void completedContainer(Resource clusterResource,
- Container container, Application application) {
+ Container container, Resource allocatedResource,
+ Application application) {
if (application != null) {
// Careful! Locking order is important!
// Book keeping
synchronized (this) {
- releaseResource(clusterResource, container.getResource());
+ releaseResource(clusterResource, allocatedResource);
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@@ -622,7 +623,8 @@ public class ParentQueue implements Queu
// Inform the parent
if (parent != null) {
- parent.completedContainer(clusterResource, container, application);
+ parent.completedContainer(clusterResource, container,
+ allocatedResource, application);
}
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1098508&r1=1098507&r2=1098508&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Mon May 2 08:26:06 2011
@@ -168,11 +168,14 @@ extends org.apache.hadoop.yarn.server.re
/**
* A container assigned to the queue has completed.
* @param clusterResource the resource of the cluster
- * @param container completed container
+ * @param container completed container,
+ * <code>null</code> if it was just a reservation
+ * @param allocatedResource allocated resource
* @param application application to which the container was assigned
*/
public void completedContainer(Resource clusterResource,
- Container container, Application application);
+ Container container, Resource allocatedResource,
+ Application application);
/**
* Get the number of applications in the queue.
@@ -201,5 +204,6 @@ extends org.apache.hadoop.yarn.server.re
* @param application the application for which the container was allocated
* @param container the container that was recovered.
*/
- public void recoverContainer(Resource clusterResource, Application application, Container container);
+ public void recoverContainer(Resource clusterResource, Application application,
+ Container container);
}