You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/05/20 10:09:31 UTC
svn commit: r1125273 - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
...
Author: sharad
Date: Fri May 20 08:09:30 2011
New Revision: 1125273
URL: http://svn.apache.org/viewvc?rev=1125273&view=rev
Log:
Refactored RMContainerAllocator to release unused containers.
Added:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Fri May 20 08:09:30 2011
@@ -3,6 +3,7 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+ Refactored RMContainerAllocator to release unused containers. (sharad)
Fix null pointer exception in kill task attempt (mahadev)
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java Fri May 20 08:09:30 2011
@@ -63,4 +63,52 @@ public class ContainerLauncherEvent
public String toString() {
return super.toString() + " for taskAttempt " + taskAttemptID;
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((containerID == null) ? 0 : containerID.hashCode());
+ result = prime * result
+ + ((containerMgrAddress == null) ? 0 : containerMgrAddress.hashCode());
+ result = prime * result
+ + ((containerToken == null) ? 0 : containerToken.hashCode());
+ result = prime * result
+ + ((taskAttemptID == null) ? 0 : taskAttemptID.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ContainerLauncherEvent other = (ContainerLauncherEvent) obj;
+ if (containerID == null) {
+ if (other.containerID != null)
+ return false;
+ } else if (!containerID.equals(other.containerID))
+ return false;
+ if (containerMgrAddress == null) {
+ if (other.containerMgrAddress != null)
+ return false;
+ } else if (!containerMgrAddress.equals(other.containerMgrAddress))
+ return false;
+ if (containerToken == null) {
+ if (other.containerToken != null)
+ return false;
+ } else if (!containerToken.equals(other.containerToken))
+ return false;
+ if (taskAttemptID == null) {
+ if (other.taskAttemptID != null)
+ return false;
+ } else if (!taskAttemptID.equals(other.taskAttemptID))
+ return false;
+ return true;
+ }
+
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Fri May 20 08:09:30 2011
@@ -36,6 +36,8 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.MRAppMasterConstants;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
@@ -208,7 +210,10 @@ public class ContainerLauncherImpl exten
// and not yet processed
if (eventQueue.contains(event)) {
eventQueue.remove(event); // TODO: Any synchro needed?
- // k: raise any event?
+ //deallocate the container
+ context.getEventHandler().handle(
+ new ContainerAllocatorEvent(event.getTaskAttemptID(),
+ ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
} else {
try {
ContainerManager proxy =
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java Fri May 20 08:09:30 2011
@@ -45,7 +45,7 @@ public class ContainerRequestEvent exten
ContainerRequestEvent(TaskAttemptId attemptID, Resource capability,
int priority) {
- this(attemptID, capability, priority, null, null);
+ this(attemptID, capability, priority, new String[0], new String[0]);
this.earlierAttemptFailed = true;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri May 20 08:09:30 2011
@@ -19,225 +19,107 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationState;
-import org.apache.hadoop.yarn.api.records.ApplicationStatus;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
/**
* Allocates the container from the ResourceManager scheduler.
*/
-public class RMContainerAllocator extends RMCommunicator
+public class RMContainerAllocator extends RMContainerRequestor
implements ContainerAllocator {
private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
- private static final String ANY = "*";
- private int lastResponseID;
- private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
-
- //mapping for assigned containers
- private final Map<ContainerId, TaskAttemptId> assignedMap =
- new HashMap<ContainerId, TaskAttemptId>();
-
- private final Map<Priority,
- Map<Resource,LinkedList<ContainerRequestEvent>>> localRequestsQueue =
- new TreeMap<Priority, Map<Resource,LinkedList<ContainerRequestEvent>>>();
-
- //Key -> Priority
- //Value -> Map
- //Key->ResourceName (e.g., hostname, rackname, *)
- //Value->Map
- //Key->Resource Capability
- //Value->ResourceReqeust
- private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
- remoteRequestsTable =
- new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
-
- private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
- private final Set<Container> release = new TreeSet<Container>();
+ //holds information about the assigned containers to task attempts
+ private final AssignedRequests assignedRequests = new AssignedRequests();
+
+ //holds pending requests to be fulfilled by RM
+ private final PendingRequests pendingRequests = new PendingRequests();
+
+ private int containersAllocated = 0;
+ private int mapsAssigned = 0;
+ private int reducesAssigned = 0;
+ private int containersReleased = 0;
+ private int hostLocalAssigned = 0;
+ private int rackLocalAssigned = 0;
public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context);
}
- // TODO: Need finer synchronization.
@Override
protected synchronized void heartbeat() throws Exception {
- assign(getResources());
+ List<Container> allocatedContainers = getResources();
+ if (allocatedContainers.size() > 0) {
+ LOG.info("Before Assign: " + getStat());
+ pendingRequests.assign(allocatedContainers);
+ LOG.info("After Assign: " + getStat());
+ }
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ LOG.info("Final Stats: " + getStat());
}
@Override
public synchronized void handle(ContainerAllocatorEvent event) {
LOG.info("Processing the event " + event.toString());
- //TODO: can be replaced by switch instead of if-else
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
- requestContainer((ContainerRequestEvent) event);
+ pendingRequests.add((ContainerRequestEvent) event);
} else if (
event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
- //TODO: handle deallocation
- }
- }
-
- protected synchronized void requestContainer(ContainerRequestEvent event) {
- //add to the localRequestsQueue
- //localRequests Queue is hashed by Resource and Priority for easy lookups
- Map<Resource, LinkedList<ContainerRequestEvent>> eventMap =
- this.localRequestsQueue.get(event.getPriority());
- if (eventMap == null) {
- eventMap = new HashMap<Resource, LinkedList<ContainerRequestEvent>>();
- this.localRequestsQueue.put(event.getPriority(), eventMap);
- }
-
- LinkedList<ContainerRequestEvent> eventList =
- eventMap.get(event.getCapability());
- if (eventList == null) {
- eventList = new LinkedList<ContainerRequestEvent>();
- eventMap.put(event.getCapability(), eventList);
- }
- eventList.add(event);
-
- if (event.getEarlierAttemptFailed()) {
- addResourceRequest(event.getPriority(), ANY, event.getCapability());
- } else {
-
- // Create resource requests
- for (String host : event.getHosts()) {
- // Data-local
- addResourceRequest(event.getPriority(), host, event.getCapability());
+ TaskAttemptId aId = event.getAttemptID();
+
+ boolean removed = pendingRequests.remove(aId);
+ if (!removed) {
+ Container container = assignedRequests.get(aId);
+ if (container != null) {
+ removed = true;
+ assignedRequests.remove(aId);
+ containersReleased++;
+ release(container);
+ }
}
-
- // Nothing Rack-local for now
- for (String rack : event.getRacks()) {
- addResourceRequest(event.getPriority(), rack, event.getCapability());
+ if (!removed) {
+ LOG.error("Could not deallocate container for task attemptId " +
+ aId);
}
-
- // Off-switch
- addResourceRequest(event.getPriority(), ANY, event.getCapability());
}
}
- private void addResourceRequest(Priority priority, String resourceName,
- Resource capability) {
- Map<String, Map<Resource, ResourceRequest>> remoteRequests =
- this.remoteRequestsTable.get(priority);
- if (remoteRequests == null) {
- remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
- this.remoteRequestsTable.put(priority, remoteRequests);
- LOG.info("Added priority=" + priority);
- }
- Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
- if (reqMap == null) {
- reqMap = new HashMap<Resource, ResourceRequest>();
- remoteRequests.put(resourceName, reqMap);
- }
- ResourceRequest remoteRequest = reqMap.get(capability);
- if (remoteRequest == null) {
- remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
- remoteRequest.setPriority(priority);
- remoteRequest.setHostName(resourceName);
- remoteRequest.setCapability(capability);
- remoteRequest.setNumContainers(0);
- reqMap.put(capability, remoteRequest);
- }
- remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
-
- // Note this down for next interaction with ResourceManager
- ask.add(remoteRequest);
- LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId()
- + " priority=" + priority.getPriority() + " resourceName=" + resourceName
- + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
- + ask.size());
+ private String getStat() {
+ return "PendingMaps:" + pendingRequests.maps.size() +
+ " PendingReduces:" + pendingRequests.reduces.size() +
+ " containersAllocated:" + containersAllocated +
+ " mapsAssigned:" + mapsAssigned +
+ " reducesAssigned:" + reducesAssigned +
+ " containersReleased:" + containersReleased +
+ " hostLocalAssigned:" + hostLocalAssigned +
+ " rackLocalAssigned:" + rackLocalAssigned;
}
-
- private void decResourceRequest(Priority priority, String resourceName,
- Resource capability) {
- Map<String, Map<Resource, ResourceRequest>> remoteRequests =
- this.remoteRequestsTable.get(priority);
- Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
- ResourceRequest remoteRequest = reqMap.get(capability);
-
- LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
- + " priority=" + priority.getPriority() + " resourceName=" + resourceName
- + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
- + ask.size());
-
- remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
- if (remoteRequest.getNumContainers() == 0) {
- reqMap.remove(capability);
- if (reqMap.size() == 0) {
- remoteRequests.remove(resourceName);
- }
- if (remoteRequests.size() == 0) {
- remoteRequestsTable.remove(priority);
- }
- //remove from ask if it may have
- ask.remove(remoteRequest);
- } else {
- ask.add(remoteRequest);//this will override the request if ask doesn't
- //already have it.
- }
-
- LOG.info("AFTER decResourceRequest:" + " applicationId="
- + applicationId.getId() + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
- }
-
+
private List<Container> getResources() throws Exception {
- ApplicationStatus status =
- recordFactory.newRecordInstance(ApplicationStatus.class);
- status.setApplicationId(applicationId);
- status.setResponseId(lastResponseID);
-
- AllocateRequest allocateRequest =
- recordFactory.newRecordInstance(AllocateRequest.class);
- allocateRequest.setApplicationStatus(status);
- allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
- allocateRequest.addAllReleases(new ArrayList<Container>(release));
- AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
- AMResponse response = allocateResponse.getAMResponse();
- lastResponseID = response.getResponseId();
- List<Container> allContainers = response.getContainerList();
- ask.clear();
- release.clear();
-
- LOG.info("getResources() for " + applicationId + ":" +
- " ask=" + ask.size() +
- " release= "+ release.size() +
- " recieved=" + allContainers.size());
+ List<Container> allContainers = makeRemoteRequest();
List<Container> allocatedContainers = new ArrayList<Container>();
for (Container cont : allContainers) {
if (cont.getState() != ContainerState.COMPLETE) {
@@ -245,11 +127,12 @@ public class RMContainerAllocator extend
LOG.debug("Received Container :" + cont);
} else {
LOG.info("Received completed container " + cont);
- TaskAttemptId attemptID = assignedMap.remove(cont.getId());
+ TaskAttemptId attemptID = assignedRequests.get(cont.getId());
if (attemptID == null) {
LOG.error("Container complete event for unknown container id " +
cont.getId());
} else {
+ assignedRequests.remove(attemptID);
//send the container completed event to Task attempt
eventHandler.handle(new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
@@ -260,90 +143,220 @@ public class RMContainerAllocator extend
return allocatedContainers;
}
- private void assign(List<Container> allocatedContainers) {
- // Schedule in priority order
- for (Priority priority : localRequestsQueue.keySet()) {
- LOG.info("Assigning for priority " + priority);
- assign(priority, allocatedContainers);
- if (allocatedContainers.isEmpty()) {
- break;
+ private class PendingRequests {
+
+ private Resource mapResourceReqt;
+ private Resource reduceResourceReqt;
+
+ private final LinkedList<TaskAttemptId> earlierFailedMaps =
+ new LinkedList<TaskAttemptId>();
+ private final LinkedList<TaskAttemptId> earlierFailedReduces =
+ new LinkedList<TaskAttemptId>();
+
+ private final Map<String, LinkedList<TaskAttemptId>> mapsHostMapping =
+ new HashMap<String, LinkedList<TaskAttemptId>>();
+ private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping =
+ new HashMap<String, LinkedList<TaskAttemptId>>();
+ private final Map<TaskAttemptId, ContainerRequestEvent> maps =
+ new LinkedHashMap<TaskAttemptId, ContainerRequestEvent>();
+
+ private final Map<TaskAttemptId, ContainerRequestEvent> reduces =
+ new LinkedHashMap<TaskAttemptId, ContainerRequestEvent>();
+
+ boolean remove(TaskAttemptId tId) {
+ ContainerRequestEvent req = maps.remove(tId);
+ if (req == null) {
+ req = reduces.remove(tId);
+ }
+ if (req == null) {
+ return false;
+ } else {
+ decContainerReq(req);
+ return true;
}
}
-
- if (!allocatedContainers.isEmpty()) {
- //TODO
- //after the assigment, still containers are left
- //This can happen if container requests are cancelled by AM, currently
- //not there. release the unassigned containers??
-
- //LOG.info("Releasing container " + allocatedContainer);
- //release.add(allocatedContainer);
- }
- }
-
- private void assign(Priority priority, List<Container> allocatedContainers) {
- for (Iterator<Container> i=allocatedContainers.iterator(); i.hasNext();) {
- Container allocatedContainer = i.next();
- String host = allocatedContainer.getContainerManagerAddress();
- Resource capability = allocatedContainer.getResource();
-
- LinkedList<ContainerRequestEvent> requestList =
- localRequestsQueue.get(priority).get(capability);
-
- if (requestList == null) {
- LOG.info("No request match at priority " + priority);
- return;
+
+ void add(ContainerRequestEvent event) {
+
+ if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
+ if (mapResourceReqt == null) {
+ mapResourceReqt = event.getCapability();
+ }
+ maps.put(event.getAttemptID(), event);
+
+ if (event.getEarlierAttemptFailed()) {
+ earlierFailedMaps.add(event.getAttemptID());
+ } else {
+ for (String host : event.getHosts()) {
+ LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+ if (list == null) {
+ list = new LinkedList<TaskAttemptId>();
+ mapsHostMapping.put(host, list);
+ }
+ list.add(event.getAttemptID());
+ LOG.info("Added attempt req to host " + host);
+ }
+ for (String rack: event.getRacks()) {
+ LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
+ if (list == null) {
+ list = new LinkedList<TaskAttemptId>();
+ mapsRackMapping.put(rack, list);
+ }
+ list.add(event.getAttemptID());
+ LOG.info("Added attempt req to rack " + rack);
+ }
+ }
+ } else {//reduce
+ if (reduceResourceReqt == null) {
+ reduceResourceReqt = event.getCapability();
+ }
+
+ if (event.getEarlierAttemptFailed()) {
+ earlierFailedReduces.add(event.getAttemptID());
+ }
+ reduces.put(event.getAttemptID(), event);
}
-
- ContainerRequestEvent assigned = null;
- //walk thru the requestList to see if in any host matches
- Iterator<ContainerRequestEvent> it = requestList.iterator();
+
+ addContainerReq(event);
+ }
+
+ private void assign(List<Container> allocatedContainers) {
+ Iterator<Container> it = allocatedContainers.iterator();
+ LOG.info("Got allocated containers " + allocatedContainers.size());
+ containersAllocated += allocatedContainers.size();
while (it.hasNext()) {
- ContainerRequestEvent event = it.next();
- if (event.getEarlierAttemptFailed()) {
- // we want to fail fast. Ignore locality for rescheduling
- // failed attempts.
- assigned = event;
- it.remove();
- break;
- }
- if (Arrays.asList(event.getHosts()).contains(host)) { // TODO: Fix
- assigned = event;
- it.remove();
- // Update resource requests
- for (String hostName : event.getHosts()) {
- decResourceRequest(priority, hostName, capability);
+ Container allocated = it.next();
+ ContainerRequestEvent assigned = null;
+ LOG.info("Assiging container " + allocated);
+
+ //try to assign to earlierFailedMaps if present
+ while (assigned == null && earlierFailedMaps.size() > 0 &&
+ allocated.getResource().getMemory() >= mapResourceReqt.getMemory()) {
+ TaskAttemptId tId = earlierFailedMaps.removeFirst();
+ if (maps.containsKey(tId)) {
+ assigned = maps.remove(tId);
+ mapsAssigned++;
+ LOG.info("Assigned from earlierFailedMaps");
}
- break;
}
- }
- if (assigned == null) {//host didn't match
- if (requestList.size() > 0) {
- //choose the first one in queue
- assigned = requestList.remove();
+
+ //try to assign to earlierFailedReduces if present
+ while (assigned == null && earlierFailedReduces.size() > 0 &&
+ allocated.getResource().getMemory() >= reduceResourceReqt.getMemory()) {
+ TaskAttemptId tId = earlierFailedReduces.removeFirst();
+ if (reduces.containsKey(tId)) {
+ assigned = reduces.remove(tId);
+ reducesAssigned++;
+ LOG.info("Assigned from earlierFailedReduces");
+ }
}
- }
+
+ //try to assign to maps if present
+ //first by host, then by rack, followed by *
+ while (assigned == null && maps.size() > 0
+ && allocated.getResource().getMemory() >= mapResourceReqt.getMemory()) {
+ String host = allocated.getContainerManagerAddress();
+ String[] hostport = host.split(":");
+ if (hostport.length == 2) {
+ host = hostport[0];
+ }
+ LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+ while (list != null && list.size() > 0) {
+ LOG.info("Host matched to the request list " + host);
+ TaskAttemptId tId = list.removeFirst();
+ if (maps.containsKey(tId)) {
+ assigned = maps.remove(tId);
+ mapsAssigned++;
+ hostLocalAssigned++;
+ LOG.info("Assigned based on host match " + host);
+ }
+ }
+ if (assigned == null) {
+ // TODO: get rack
+ String rack = "";
+ list = mapsRackMapping.get(rack);
+ while (list != null && list.size() > 0) {
+ TaskAttemptId tId = list.removeFirst();
+ if (maps.containsKey(tId)) {
+ assigned = maps.remove(tId);
+ mapsAssigned++;
+ rackLocalAssigned++;
+ LOG.info("Assigned based on rack match " + rack);
+ }
+ }
+ if (assigned == null && maps.size() > 0) {
+ TaskAttemptId tId = maps.keySet().iterator().next();
+ assigned = maps.remove(tId);
+ mapsAssigned++;
+ LOG.info("Assigned based on * match");
+ }
+ }
+ }
+
+ //try to assign to reduces if present
+ if (assigned == null && reduces.size() > 0
+ && allocated.getResource().getMemory() >= reduceResourceReqt.getMemory()) {
+ TaskAttemptId tId = reduces.keySet().iterator().next();
+ assigned = reduces.remove(tId);
+ reducesAssigned++;
+ LOG.info("Assigned to reduce");
+ }
+
+ if (assigned != null) {
+
+ // Update resource requests
+ decContainerReq(assigned);
- if (assigned != null) {
- i.remove(); // Remove from allocated Containers list also.
+ // send the container-assigned event to task attempt
+ eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+ assigned.getAttemptID(), allocated.getId(),
+ allocated.getContainerManagerAddress(),
+ allocated.getNodeHttpAddress(),
+ allocated.getContainerToken()));
+
+ assignedRequests.add(allocated, assigned.getAttemptID());
+
+ LOG.info("Assigned container (" + allocated + ") " +
+ " to task " + assigned.getAttemptID() +
+ " on node " + allocated.getContainerManagerAddress());
+ } else {
+ //not assigned to any request, release the container
+ LOG.info("Releasing unassigned container " + allocated);
+ containersReleased++;
+ release(allocated);
+ }
+ }
+ }
+ }
- // Update resource requests
- decResourceRequest(priority, ANY, capability);
+ private static class AssignedRequests {
+ private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
+ new HashMap<ContainerId, TaskAttemptId>();
+ private final Map<TaskAttemptId, Container> attemptToContainerMap =
+ new HashMap<TaskAttemptId, Container>();
+
+ void add(Container container, TaskAttemptId tId) {
+ LOG.info("Assigned container " + container.getContainerManagerAddress()
+ + " to " + tId);
+ containerToAttemptMap.put(container.getId(), tId);
+ attemptToContainerMap.put(tId, container);
+ }
- // send the container-assigned event to task attempt
- eventHandler.handle(new TaskAttemptContainerAssignedEvent(
- assigned.getAttemptID(), allocatedContainer.getId(),
- allocatedContainer.getContainerManagerAddress(),
- allocatedContainer.getNodeHttpAddress(),
- allocatedContainer.getContainerToken()));
-
- assignedMap.put(allocatedContainer.getId(), assigned.getAttemptID());
-
- LOG.info("Assigned container (" + allocatedContainer + ") " +
- " to task " + assigned.getAttemptID() + " at priority " + priority +
- " on node " + allocatedContainer.getContainerManagerAddress());
+ boolean remove(TaskAttemptId tId) {
+ Container container = attemptToContainerMap.remove(tId);
+ if (container != null) {
+ containerToAttemptMap.remove(container.getId());
+ return true;
}
+ return false;
+ }
+
+ TaskAttemptId get(ContainerId cId) {
+ return containerToAttemptMap.get(cId);
}
- }
+ Container get(TaskAttemptId tId) {
+ return attemptToContainerMap.get(tId);
+ }
+ }
}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1125273&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Fri May 20 08:09:30 2011
@@ -0,0 +1,200 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+/**
+ * Keeps the data structures to send container requests to RM.
+ */
+public abstract class RMContainerRequestor extends RMCommunicator {
+
+ private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
+ static final String ANY = "*";
+
+ private int lastResponseID;
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+ //Key -> Priority
+ //Value -> Map
+ //Key->ResourceName (e.g., hostname, rackname, *)
+ //Value->Map
+ //Key->Resource Capability
+ //Value->ResourceReqeust
+ private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+ remoteRequestsTable =
+ new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+ private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
+ private final Set<Container> release = new TreeSet<Container>();
+
+ public RMContainerRequestor(ClientService clientService, AppContext context) {
+ super(clientService, context);
+ }
+
+ protected abstract void heartbeat() throws Exception;
+
+ protected List<Container> makeRemoteRequest() throws YarnRemoteException {
+ ApplicationStatus status = recordFactory
+ .newRecordInstance(ApplicationStatus.class);
+ status.setApplicationId(applicationId);
+ status.setResponseId(lastResponseID);
+
+ AllocateRequest allocateRequest = recordFactory
+ .newRecordInstance(AllocateRequest.class);
+ allocateRequest.setApplicationStatus(status);
+ allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
+ allocateRequest.addAllReleases(new ArrayList<Container>(release));
+ AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+ AMResponse response = allocateResponse.getAMResponse();
+ lastResponseID = response.getResponseId();
+ List<Container> allContainers = response.getContainerList();
+ ask.clear();
+ release.clear();
+
+ LOG.info("getResources() for " + applicationId + ":" + " ask="
+ + ask.size() + " release= " + release.size() + " recieved="
+ + allContainers.size());
+ return allContainers;
+ }
+
+ protected void addContainerReq(ContainerRequestEvent req) {
+ // Create resource requests
+ for (String host : req.getHosts()) {
+ // Data-local
+ addResourceRequest(req.getPriority(), host, req.getCapability());
+ }
+
+ // Nothing Rack-local for now
+ for (String rack : req.getRacks()) {
+ addResourceRequest(req.getPriority(), rack, req.getCapability());
+ }
+
+ // Off-switch
+ addResourceRequest(req.getPriority(), ANY, req.getCapability());
+ }
+
+ protected void decContainerReq(ContainerRequestEvent req) {
+ // Update resource requests
+ for (String hostName : req.getHosts()) {
+ decResourceRequest(req.getPriority(), hostName, req.getCapability());
+ }
+
+ for (String rack : req.getRacks()) {
+ decResourceRequest(req.getPriority(), rack, req.getCapability());
+ }
+
+ decResourceRequest(req.getPriority(), ANY, req.getCapability());
+ }
+
+ private void addResourceRequest(Priority priority, String resourceName,
+ Resource capability) {
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ if (remoteRequests == null) {
+ remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+ this.remoteRequestsTable.put(priority, remoteRequests);
+ LOG.info("Added priority=" + priority);
+ }
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ reqMap = new HashMap<Resource, ResourceRequest>();
+ remoteRequests.put(resourceName, reqMap);
+ }
+ ResourceRequest remoteRequest = reqMap.get(capability);
+ if (remoteRequest == null) {
+ remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
+ remoteRequest.setPriority(priority);
+ remoteRequest.setHostName(resourceName);
+ remoteRequest.setCapability(capability);
+ remoteRequest.setNumContainers(0);
+ reqMap.put(capability, remoteRequest);
+ }
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
+
+ // Note this down for next interaction with ResourceManager
+ ask.add(remoteRequest);
+ LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId()
+ + " priority=" + priority.getPriority() + " resourceName=" + resourceName
+ + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+ + ask.size());
+ }
+
+ private void decResourceRequest(Priority priority, String resourceName,
+ Resource capability) {
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ ResourceRequest remoteRequest = reqMap.get(capability);
+
+ LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
+ + " priority=" + priority.getPriority() + " resourceName=" + resourceName
+ + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+ + ask.size());
+
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+ if (remoteRequest.getNumContainers() == 0) {
+ reqMap.remove(capability);
+ if (reqMap.size() == 0) {
+ remoteRequests.remove(resourceName);
+ }
+ if (remoteRequests.size() == 0) {
+ remoteRequestsTable.remove(priority);
+ }
+ //remove from ask if it may have
+ ask.remove(remoteRequest);
+ } else {
+ ask.add(remoteRequest);//this will override the request if ask doesn't
+ //already have it.
+ }
+
+ LOG.info("AFTER decResourceRequest:" + " applicationId="
+ + applicationId.getId() + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ }
+
+ protected void release(Container container) {
+ release.add(container);
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri May 20 08:09:30 2011
@@ -33,12 +33,15 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -175,7 +178,7 @@ public class TestRMContainerAllocator {
allocator.sendRequest(event1);
//send 1 more request with different priority
- ContainerRequestEvent event2 = createReq(2, 2048, 2, new String[]{"h1"});
+ ContainerRequestEvent event2 = createReq(2, 3000, 2, new String[]{"h1"});
allocator.sendRequest(event2);
//send 1 more request with different priority
@@ -256,8 +259,19 @@ public class TestRMContainerAllocator {
private ContainerRequestEvent createReq(
int attemptid, int memory, int priority, String[] hosts) {
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(0);
+ appId.setId(0);
+ JobId jobId = recordFactory.newRecordInstance(JobId.class);
+ jobId.setAppId(appId);
+ jobId.setId(0);
+ TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
+ taskId.setId(0);
+ taskId.setJobId(jobId);
+ taskId.setTaskType(TaskType.MAP);
TaskAttemptId attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
attemptId.setId(attemptid);
+ attemptId.setTaskId(taskId);
Resource containerNeed = recordFactory.newRecordInstance(Resource.class);
containerNeed.setMemory(memory);
return new ContainerRequestEvent(attemptId,
@@ -384,7 +398,8 @@ public class TestRMContainerAllocator {
try {
heartbeat();
} catch (Exception e) {
-
+ LOG.error("error in heartbeat ", e);
+ throw new YarnException(e);
}
List<TaskAttemptContainerAssignedEvent> result = new ArrayList(events);