You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/05/20 10:09:31 UTC

svn commit: r1125273 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ ...

Author: sharad
Date: Fri May 20 08:09:30 2011
New Revision: 1125273

URL: http://svn.apache.org/viewvc?rev=1125273&view=rev
Log:
Refactored RMContainerAllocator to release unused containers.

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Fri May 20 08:09:30 2011
@@ -3,6 +3,7 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+    Refactored RMContainerAllocator to release unused containers. (sharad) 
 
     Fix null pointer exception in kill task attempt (mahadev)
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java Fri May 20 08:09:30 2011
@@ -63,4 +63,52 @@ public class ContainerLauncherEvent 
   public String toString() {
     return super.toString() + " for taskAttempt " + taskAttemptID;
   }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((containerID == null) ? 0 : containerID.hashCode());
+    result = prime * result
+        + ((containerMgrAddress == null) ? 0 : containerMgrAddress.hashCode());
+    result = prime * result
+        + ((containerToken == null) ? 0 : containerToken.hashCode());
+    result = prime * result
+        + ((taskAttemptID == null) ? 0 : taskAttemptID.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ContainerLauncherEvent other = (ContainerLauncherEvent) obj;
+    if (containerID == null) {
+      if (other.containerID != null)
+        return false;
+    } else if (!containerID.equals(other.containerID))
+      return false;
+    if (containerMgrAddress == null) {
+      if (other.containerMgrAddress != null)
+        return false;
+    } else if (!containerMgrAddress.equals(other.containerMgrAddress))
+      return false;
+    if (containerToken == null) {
+      if (other.containerToken != null)
+        return false;
+    } else if (!containerToken.equals(other.containerToken))
+      return false;
+    if (taskAttemptID == null) {
+      if (other.taskAttemptID != null)
+        return false;
+    } else if (!taskAttemptID.equals(other.taskAttemptID))
+      return false;
+    return true;
+  }
+
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Fri May 20 08:09:30 2011
@@ -36,6 +36,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.MRAppMasterConstants;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -208,7 +210,10 @@ public class ContainerLauncherImpl exten
         // and not yet processed
         if (eventQueue.contains(event)) {
           eventQueue.remove(event); // TODO: Any synchro needed?
-          // k: raise any event?
+          //deallocate the container
+          context.getEventHandler().handle(
+              new ContainerAllocatorEvent(event.getTaskAttemptID(),
+              ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
         } else {
           try {
             ContainerManager proxy = 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java Fri May 20 08:09:30 2011
@@ -45,7 +45,7 @@ public class ContainerRequestEvent exten
   
   ContainerRequestEvent(TaskAttemptId attemptID, Resource capability, 
       int priority) {
-    this(attemptID, capability, priority, null, null);
+    this(attemptID, capability, priority, new String[0], new String[0]);
     this.earlierAttemptFailed = true;
   }
   

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri May 20 08:09:30 2011
@@ -19,225 +19,107 @@
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationState;
-import org.apache.hadoop.yarn.api.records.ApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 /**
  * Allocates the container from the ResourceManager scheduler.
  */
-public class RMContainerAllocator extends RMCommunicator
+public class RMContainerAllocator extends RMContainerRequestor
     implements ContainerAllocator {
 
   private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
-  private static final String ANY = "*";
-  private int lastResponseID;
 
-  private final RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
-
-  //mapping for assigned containers
-  private final Map<ContainerId, TaskAttemptId> assignedMap =
-      new HashMap<ContainerId, TaskAttemptId>();
-
-  private final Map<Priority,
-  Map<Resource,LinkedList<ContainerRequestEvent>>> localRequestsQueue =
-      new TreeMap<Priority, Map<Resource,LinkedList<ContainerRequestEvent>>>();
-
-  //Key -> Priority
-  //Value -> Map
-  //Key->ResourceName (e.g., hostname, rackname, *)
-  //Value->Map
-  //Key->Resource Capability
-  //Value->ResourceReqeust
-  private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
-  remoteRequestsTable =
-      new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
-
-  private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
-  private final Set<Container> release = new TreeSet<Container>();
+  //holds information about the assigned containers to task attempts
+  private final AssignedRequests assignedRequests = new AssignedRequests();
+  
+  //holds pending requests to be fulfilled by RM
+  private final PendingRequests pendingRequests = new PendingRequests();
+  
+  private int containersAllocated = 0;
+  private int mapsAssigned = 0;
+  private int reducesAssigned = 0;
+  private int containersReleased = 0;
+  private int hostLocalAssigned = 0;
+  private int rackLocalAssigned = 0;
 
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
   }
 
-  // TODO: Need finer synchronization.
   @Override
   protected synchronized void heartbeat() throws Exception {
-    assign(getResources());
+    List<Container> allocatedContainers = getResources();
+    if (allocatedContainers.size() > 0) {
+      LOG.info("Before Assign: " + getStat());
+      pendingRequests.assign(allocatedContainers);
+      LOG.info("After Assign: " + getStat());
+    }
+  }
+
+  @Override
+  public void stop() {
+    super.stop();
+    LOG.info("Final Stats: " + getStat());
   }
 
   @Override
   public synchronized void handle(ContainerAllocatorEvent event) {
     LOG.info("Processing the event " + event.toString());
-    //TODO: can be replaced by switch instead of if-else
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
-      requestContainer((ContainerRequestEvent) event);
+      pendingRequests.add((ContainerRequestEvent) event);
     } else if (
         event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
-      //TODO: handle deallocation
-    }
-  }
-
-  protected synchronized void requestContainer(ContainerRequestEvent event) {
-    //add to the localRequestsQueue
-    //localRequests Queue is hashed by Resource and Priority for easy lookups
-    Map<Resource, LinkedList<ContainerRequestEvent>> eventMap =
-      this.localRequestsQueue.get(event.getPriority());
-    if (eventMap == null) {
-      eventMap = new HashMap<Resource, LinkedList<ContainerRequestEvent>>();
-      this.localRequestsQueue.put(event.getPriority(), eventMap);
-    }
-
-    LinkedList<ContainerRequestEvent> eventList =
-      eventMap.get(event.getCapability());
-    if (eventList == null) {
-      eventList = new LinkedList<ContainerRequestEvent>();
-      eventMap.put(event.getCapability(), eventList);
-    }
-    eventList.add(event);
-
-    if (event.getEarlierAttemptFailed()) {
-      addResourceRequest(event.getPriority(), ANY, event.getCapability());  
-    } else {
-
-      // Create resource requests
-      for (String host : event.getHosts()) {
-        // Data-local
-        addResourceRequest(event.getPriority(), host, event.getCapability());
+      TaskAttemptId aId = event.getAttemptID();
+      
+      boolean removed = pendingRequests.remove(aId);
+      if (!removed) {
+        Container container = assignedRequests.get(aId);
+        if (container != null) {
+          removed = true;
+          assignedRequests.remove(aId);
+          containersReleased++;
+          release(container);
+        }
       }
-
-      // Nothing Rack-local for now
-      for (String rack : event.getRacks()) {
-        addResourceRequest(event.getPriority(), rack, event.getCapability());
+      if (!removed) {
+        LOG.error("Could not deallocate container for task attemptId " + 
+            aId);
       }
-
-      // Off-switch
-      addResourceRequest(event.getPriority(), ANY, event.getCapability());
     }
   }
 
-  private void addResourceRequest(Priority priority, String resourceName,
-      Resource capability) {
-    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
-      this.remoteRequestsTable.put(priority, remoteRequests);
-      LOG.info("Added priority=" + priority);
-    }
-    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      reqMap = new HashMap<Resource, ResourceRequest>();
-      remoteRequests.put(resourceName, reqMap);
-    }
-    ResourceRequest remoteRequest = reqMap.get(capability);
-    if (remoteRequest == null) {
-      remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
-      remoteRequest.setPriority(priority);
-      remoteRequest.setHostName(resourceName);
-      remoteRequest.setCapability(capability);
-      remoteRequest.setNumContainers(0);
-      reqMap.put(capability, remoteRequest);
-    }
-    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
-
-    // Note this down for next interaction with ResourceManager
-    ask.add(remoteRequest);
-    LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId()
-        + " priority=" + priority.getPriority() + " resourceName=" + resourceName
-        + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
-        + ask.size());
+  private String getStat() {
+    return "PendingMaps:" + pendingRequests.maps.size() +
+        " PendingReduces:" + pendingRequests.reduces.size() +
+        " containersAllocated:" + containersAllocated +
+        " mapsAssigned:" + mapsAssigned + 
+        " reducesAssigned:" + reducesAssigned + 
+        " containersReleased:" + containersReleased +
+        " hostLocalAssigned:" + hostLocalAssigned + 
+        " rackLocalAssigned:" + rackLocalAssigned;
   }
-
-  private void decResourceRequest(Priority priority, String resourceName,
-      Resource capability) {
-    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
-    ResourceRequest remoteRequest = reqMap.get(capability);
-
-    LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
-        + " priority=" + priority.getPriority() + " resourceName=" + resourceName
-        + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
-        + ask.size());
-
-    remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
-    if (remoteRequest.getNumContainers() == 0) {
-      reqMap.remove(capability);
-      if (reqMap.size() == 0) {
-        remoteRequests.remove(resourceName);
-      }
-      if (remoteRequests.size() == 0) {
-        remoteRequestsTable.remove(priority);
-      }
-      //remove from ask if it may have
-      ask.remove(remoteRequest);
-    } else {
-      ask.add(remoteRequest);//this will override the request if ask doesn't
-      //already have it.
-    }
-
-    LOG.info("AFTER decResourceRequest:" + " applicationId="
-             + applicationId.getId() + " priority=" + priority.getPriority()
-             + " resourceName=" + resourceName + " numContainers="
-             + remoteRequest.getNumContainers() + " #asks=" + ask.size());
-  }
-
+  
   private List<Container> getResources() throws Exception {
-    ApplicationStatus status =
-        recordFactory.newRecordInstance(ApplicationStatus.class);
-    status.setApplicationId(applicationId);
-    status.setResponseId(lastResponseID);
-
-    AllocateRequest allocateRequest =
-        recordFactory.newRecordInstance(AllocateRequest.class);
-    allocateRequest.setApplicationStatus(status);
-    allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
-    allocateRequest.addAllReleases(new ArrayList<Container>(release));
-    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
-    AMResponse response = allocateResponse.getAMResponse();
-    lastResponseID = response.getResponseId();
-    List<Container> allContainers = response.getContainerList();
-    ask.clear();
-    release.clear();
-
-    LOG.info("getResources() for " + applicationId + ":" +
-        " ask=" + ask.size() +
-        " release= "+ release.size() +
-        " recieved=" + allContainers.size());
+    List<Container> allContainers = makeRemoteRequest();
     List<Container> allocatedContainers = new ArrayList<Container>();
     for (Container cont : allContainers) {
       if (cont.getState() != ContainerState.COMPLETE) {
@@ -245,11 +127,12 @@ public class RMContainerAllocator extend
         LOG.debug("Received Container :" + cont);
       } else {
         LOG.info("Received completed container " + cont);
-        TaskAttemptId attemptID = assignedMap.remove(cont.getId());
+        TaskAttemptId attemptID = assignedRequests.get(cont.getId());
         if (attemptID == null) {
           LOG.error("Container complete event for unknown container id " +
               cont.getId());
         } else {
+          assignedRequests.remove(attemptID);
           //send the container completed event to Task attempt
           eventHandler.handle(new TaskAttemptEvent(attemptID,
               TaskAttemptEventType.TA_CONTAINER_COMPLETED));
@@ -260,90 +143,220 @@ public class RMContainerAllocator extend
     return allocatedContainers;
   }
 
-  private void assign(List<Container> allocatedContainers) {
-    // Schedule in priority order
-    for (Priority priority : localRequestsQueue.keySet()) {
-      LOG.info("Assigning for priority " + priority);
-      assign(priority, allocatedContainers);
-      if (allocatedContainers.isEmpty()) {
-        break;
+  private class PendingRequests {
+    
+    private Resource mapResourceReqt;
+    private Resource reduceResourceReqt;
+    
+    private final LinkedList<TaskAttemptId> earlierFailedMaps = 
+      new LinkedList<TaskAttemptId>();
+    private final LinkedList<TaskAttemptId> earlierFailedReduces = 
+      new LinkedList<TaskAttemptId>();
+    
+    private final Map<String, LinkedList<TaskAttemptId>> mapsHostMapping = 
+      new HashMap<String, LinkedList<TaskAttemptId>>();
+    private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = 
+      new HashMap<String, LinkedList<TaskAttemptId>>();
+    private final Map<TaskAttemptId, ContainerRequestEvent> maps = 
+      new LinkedHashMap<TaskAttemptId, ContainerRequestEvent>();
+    
+    private final Map<TaskAttemptId, ContainerRequestEvent> reduces = 
+      new LinkedHashMap<TaskAttemptId, ContainerRequestEvent>();
+    
+    boolean remove(TaskAttemptId tId) {
+      ContainerRequestEvent req = maps.remove(tId);
+      if (req == null) {
+        req = reduces.remove(tId);
+      }
+      if (req == null) {
+        return false;
+      } else {
+        decContainerReq(req);
+        return true;
       }
     }
-
-    if (!allocatedContainers.isEmpty()) {
-      //TODO
-      //after the assigment, still containers are left
-      //This can happen if container requests are cancelled by AM, currently
-      //not there. release the unassigned containers??
-
-      //LOG.info("Releasing container " + allocatedContainer);
-      //release.add(allocatedContainer);
-    }
-  }
-
-  private void assign(Priority priority, List<Container> allocatedContainers) {
-    for (Iterator<Container> i=allocatedContainers.iterator(); i.hasNext();) {
-      Container allocatedContainer = i.next();
-      String host = allocatedContainer.getContainerManagerAddress();
-      Resource capability = allocatedContainer.getResource();
-
-      LinkedList<ContainerRequestEvent> requestList =
-        localRequestsQueue.get(priority).get(capability);
-
-      if (requestList == null) {
-        LOG.info("No request match at priority " + priority);
-        return;
+    
+    void add(ContainerRequestEvent event) {
+      
+      if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
+        if (mapResourceReqt == null) {
+          mapResourceReqt = event.getCapability();
+        }
+        maps.put(event.getAttemptID(), event);
+        
+        if (event.getEarlierAttemptFailed()) {
+          earlierFailedMaps.add(event.getAttemptID());
+        } else {
+          for (String host : event.getHosts()) {
+            LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+            if (list == null) {
+              list = new LinkedList<TaskAttemptId>();
+              mapsHostMapping.put(host, list);
+            }
+            list.add(event.getAttemptID());
+            LOG.info("Added attempt req to host " + host);
+         }
+         for (String rack: event.getRacks()) {
+           LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
+           if (list == null) {
+             list = new LinkedList<TaskAttemptId>();
+             mapsRackMapping.put(rack, list);
+           }
+           list.add(event.getAttemptID());
+           LOG.info("Added attempt req to rack " + rack);
+         }
+        }
+      } else {//reduce
+        if (reduceResourceReqt == null) {
+          reduceResourceReqt = event.getCapability();
+        }
+        
+        if (event.getEarlierAttemptFailed()) {
+          earlierFailedReduces.add(event.getAttemptID());
+        }
+        reduces.put(event.getAttemptID(), event);
       }
-
-      ContainerRequestEvent assigned = null;
-      //walk thru the requestList to see if in any host matches
-      Iterator<ContainerRequestEvent> it = requestList.iterator();
+      
+      addContainerReq(event);
+    }
+    
+    private void assign(List<Container> allocatedContainers) {
+      Iterator<Container> it = allocatedContainers.iterator();
+      LOG.info("Got allocated containers " + allocatedContainers.size());
+      containersAllocated += allocatedContainers.size();
       while (it.hasNext()) {
-        ContainerRequestEvent event = it.next();
-        if (event.getEarlierAttemptFailed()) {
-          // we want to fail fast. Ignore locality for rescheduling
-          // failed attempts.
-          assigned = event;
-          it.remove();
-          break;
-        }
-        if (Arrays.asList(event.getHosts()).contains(host)) { // TODO: Fix
-          assigned = event;
-          it.remove();
-          // Update resource requests
-          for (String hostName : event.getHosts()) {
-            decResourceRequest(priority, hostName, capability);
+        Container allocated = it.next();
+        ContainerRequestEvent assigned = null;
+        LOG.info("Assiging container " + allocated);
+        
+        //try to assign to earlierFailedMaps if present
+        while (assigned == null && earlierFailedMaps.size() > 0 && 
+            allocated.getResource().getMemory() >= mapResourceReqt.getMemory()) {
+          TaskAttemptId tId = earlierFailedMaps.removeFirst();
+          if (maps.containsKey(tId)) {
+            assigned = maps.remove(tId);
+            mapsAssigned++;
+            LOG.info("Assigned from earlierFailedMaps");
           }
-          break;
         }
-      }
-      if (assigned == null) {//host didn't match
-        if (requestList.size() > 0) {
-          //choose the first one in queue
-          assigned = requestList.remove();
+        
+        //try to assign to earlierFailedReduces if present
+        while (assigned == null && earlierFailedReduces.size() > 0 && 
+            allocated.getResource().getMemory() >= reduceResourceReqt.getMemory()) {
+          TaskAttemptId tId = earlierFailedReduces.removeFirst();
+          if (reduces.containsKey(tId)) {
+            assigned = reduces.remove(tId);
+            reducesAssigned++;
+            LOG.info("Assigned from earlierFailedReduces");
+          }
         }
-      }
+        
+        //try to assign to maps if present 
+        //first by host, then by rack, followed by *
+        while (assigned == null && maps.size() > 0
+            && allocated.getResource().getMemory() >= mapResourceReqt.getMemory()) {
+          String host = allocated.getContainerManagerAddress();
+          String[] hostport = host.split(":");
+          if (hostport.length == 2) {
+            host = hostport[0];
+          }
+          LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+          while (list != null && list.size() > 0) {
+            LOG.info("Host matched to the request list " + host);
+            TaskAttemptId tId = list.removeFirst();
+            if (maps.containsKey(tId)) {
+              assigned = maps.remove(tId);
+              mapsAssigned++;
+              hostLocalAssigned++;
+              LOG.info("Assigned based on host match " + host);
+            }
+          }
+          if (assigned == null) {
+            // TODO: get rack
+            String rack = "";
+            list = mapsRackMapping.get(rack);
+            while (list != null && list.size() > 0) {
+              TaskAttemptId tId = list.removeFirst();
+              if (maps.containsKey(tId)) {
+                assigned = maps.remove(tId);
+                mapsAssigned++;
+                rackLocalAssigned++;
+                LOG.info("Assigned based on rack match " + rack);
+              }
+            }
+            if (assigned == null && maps.size() > 0) {
+              TaskAttemptId tId = maps.keySet().iterator().next();
+              assigned = maps.remove(tId);
+              mapsAssigned++;
+              LOG.info("Assigned based on * match");
+            }
+          }
+        }
+        
+        //try to assign to reduces if present
+        if (assigned == null && reduces.size() > 0
+            && allocated.getResource().getMemory() >= reduceResourceReqt.getMemory()) {
+          TaskAttemptId tId = reduces.keySet().iterator().next();
+          assigned = reduces.remove(tId);
+          reducesAssigned++;
+          LOG.info("Assigned to reduce");
+        }
+        
+        if (assigned != null) {
+          
+          // Update resource requests
+          decContainerReq(assigned);
 
-      if (assigned != null) {
-        i.remove(); // Remove from allocated Containers list also.
+          // send the container-assigned event to task attempt
+          eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+              assigned.getAttemptID(), allocated.getId(),
+              allocated.getContainerManagerAddress(),
+              allocated.getNodeHttpAddress(),
+              allocated.getContainerToken()));
+
+          assignedRequests.add(allocated, assigned.getAttemptID());
+          
+          LOG.info("Assigned container (" + allocated + ") " +
+              " to task " + assigned.getAttemptID() +
+              " on node " + allocated.getContainerManagerAddress());
+        } else {
+          //not assigned to any request, release the container
+          LOG.info("Releasing unassigned container " + allocated);
+          containersReleased++;
+          release(allocated);
+        }
+      }
+    }
+  }
 
-        // Update resource requests
-        decResourceRequest(priority, ANY, capability);
+  private static class AssignedRequests {
+    private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
+      new HashMap<ContainerId, TaskAttemptId>();
+    private final Map<TaskAttemptId, Container> attemptToContainerMap = 
+      new HashMap<TaskAttemptId, Container>();
+    
+    void add(Container container, TaskAttemptId tId) {
+      LOG.info("Assigned container " + container.getContainerManagerAddress() 
+          + " to " + tId);
+      containerToAttemptMap.put(container.getId(), tId);
+      attemptToContainerMap.put(tId, container);
+    }
 
-        // send the container-assigned event to task attempt
-        eventHandler.handle(new TaskAttemptContainerAssignedEvent(
-            assigned.getAttemptID(), allocatedContainer.getId(),
-            allocatedContainer.getContainerManagerAddress(),
-            allocatedContainer.getNodeHttpAddress(),
-            allocatedContainer.getContainerToken()));
-
-        assignedMap.put(allocatedContainer.getId(), assigned.getAttemptID());
-
-        LOG.info("Assigned container (" + allocatedContainer + ") " +
-            " to task " + assigned.getAttemptID() + " at priority " + priority +
-            " on node " + allocatedContainer.getContainerManagerAddress());
+    boolean remove(TaskAttemptId tId) {
+      Container container = attemptToContainerMap.remove(tId);
+      if (container != null) {
+        containerToAttemptMap.remove(container.getId());
+        return true;
       }
+      return false;
+    }
+    
+    TaskAttemptId get(ContainerId cId) {
+      return containerToAttemptMap.get(cId);
     }
-  }
 
+    Container get(TaskAttemptId tId) {
+      return attemptToContainerMap.get(tId);
+    }
+  }
 }

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1125273&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Fri May 20 08:09:30 2011
@@ -0,0 +1,200 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+/**
+ * Keeps the data structures to send container requests to RM.
+ */
+public abstract class RMContainerRequestor extends RMCommunicator {
+  
+  private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
+  static final String ANY = "*";
+
+  private int lastResponseID;
+
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+  //Key -> Priority
+  //Value -> Map
+  //Key->ResourceName (e.g., hostname, rackname, *)
+  //Value->Map
+  //Key->Resource Capability
+  //Value->ResourceReqeust
+  private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+  remoteRequestsTable =
+      new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+  private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
+  private final Set<Container> release = new TreeSet<Container>(); 
+
+  public RMContainerRequestor(ClientService clientService, AppContext context) {
+    super(clientService, context);
+  }
+
+  protected abstract void heartbeat() throws Exception;
+
+  protected List<Container> makeRemoteRequest() throws YarnRemoteException {
+    ApplicationStatus status = recordFactory
+        .newRecordInstance(ApplicationStatus.class);
+    status.setApplicationId(applicationId);
+    status.setResponseId(lastResponseID);
+
+    AllocateRequest allocateRequest = recordFactory
+        .newRecordInstance(AllocateRequest.class);
+    allocateRequest.setApplicationStatus(status);
+    allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
+    allocateRequest.addAllReleases(new ArrayList<Container>(release));
+    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+    AMResponse response = allocateResponse.getAMResponse();
+    lastResponseID = response.getResponseId();
+    List<Container> allContainers = response.getContainerList();
+    ask.clear();
+    release.clear();
+
+    LOG.info("getResources() for " + applicationId + ":" + " ask="
+        + ask.size() + " release= " + release.size() + " recieved="
+        + allContainers.size());
+    return allContainers;
+  }
+
+  protected void addContainerReq(ContainerRequestEvent req) {
+    // Create resource requests
+    for (String host : req.getHosts()) {
+      // Data-local
+      addResourceRequest(req.getPriority(), host, req.getCapability());
+    }
+
+    // Nothing Rack-local for now
+    for (String rack : req.getRacks()) {
+      addResourceRequest(req.getPriority(), rack, req.getCapability());
+    }
+
+    // Off-switch
+    addResourceRequest(req.getPriority(), ANY, req.getCapability()); 
+  }
+
+  protected void decContainerReq(ContainerRequestEvent req) {
+    // Update resource requests
+    for (String hostName : req.getHosts()) {
+      decResourceRequest(req.getPriority(), hostName, req.getCapability());
+    }
+    
+    for (String rack : req.getRacks()) {
+      decResourceRequest(req.getPriority(), rack, req.getCapability());
+    }
+   
+    decResourceRequest(req.getPriority(), ANY, req.getCapability());
+  }
+
+  private void addResourceRequest(Priority priority, String resourceName,
+      Resource capability) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+      this.remoteRequestsTable.put(priority, remoteRequests);
+      LOG.info("Added priority=" + priority);
+    }
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      reqMap = new HashMap<Resource, ResourceRequest>();
+      remoteRequests.put(resourceName, reqMap);
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+    if (remoteRequest == null) {
+      remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
+      remoteRequest.setPriority(priority);
+      remoteRequest.setHostName(resourceName);
+      remoteRequest.setCapability(capability);
+      remoteRequest.setNumContainers(0);
+      reqMap.put(capability, remoteRequest);
+    }
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
+
+    // Note this down for next interaction with ResourceManager
+    ask.add(remoteRequest);
+    LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId()
+        + " priority=" + priority.getPriority() + " resourceName=" + resourceName
+        + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+        + ask.size());
+  }
+
+  private void decResourceRequest(Priority priority, String resourceName,
+      Resource capability) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    ResourceRequest remoteRequest = reqMap.get(capability);
+
+    LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
+        + " priority=" + priority.getPriority() + " resourceName=" + resourceName
+        + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+        + ask.size());
+
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+    if (remoteRequest.getNumContainers() == 0) {
+      reqMap.remove(capability);
+      if (reqMap.size() == 0) {
+        remoteRequests.remove(resourceName);
+      }
+      if (remoteRequests.size() == 0) {
+        remoteRequestsTable.remove(priority);
+      }
+      //remove from ask if it may have
+      ask.remove(remoteRequest);
+    } else {
+      ask.add(remoteRequest);//this will override the request if ask doesn't
+      //already have it.
+    }
+
+    LOG.info("AFTER decResourceRequest:" + " applicationId="
+             + applicationId.getId() + " priority=" + priority.getPriority()
+             + " resourceName=" + resourceName + " numContainers="
+             + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+  }
+
+  protected void release(Container container) {
+    release.add(container);
+  }
+  
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1125273&r1=1125272&r2=1125273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri May 20 08:09:30 2011
@@ -33,12 +33,15 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -175,7 +178,7 @@ public class TestRMContainerAllocator {
     allocator.sendRequest(event1);
 
     //send 1 more request with different priority
-    ContainerRequestEvent event2 = createReq(2, 2048, 2, new String[]{"h1"});
+    ContainerRequestEvent event2 = createReq(2, 3000, 2, new String[]{"h1"});
     allocator.sendRequest(event2);
 
     //send 1 more request with different priority
@@ -256,8 +259,19 @@ public class TestRMContainerAllocator {
 
   private ContainerRequestEvent createReq(
       int attemptid, int memory, int priority, String[] hosts) {
+    ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+    appId.setClusterTimestamp(0);
+    appId.setId(0);
+    JobId jobId = recordFactory.newRecordInstance(JobId.class);
+    jobId.setAppId(appId);
+    jobId.setId(0);
+    TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
+    taskId.setId(0);
+    taskId.setJobId(jobId);
+    taskId.setTaskType(TaskType.MAP);
     TaskAttemptId attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
     attemptId.setId(attemptid);
+    attemptId.setTaskId(taskId);
     Resource containerNeed = recordFactory.newRecordInstance(Resource.class);
     containerNeed.setMemory(memory);
     return new ContainerRequestEvent(attemptId, 
@@ -384,7 +398,8 @@ public class TestRMContainerAllocator {
       try {
         heartbeat();
       } catch (Exception e) {
-
+        LOG.error("error in heartbeat ", e);
+        throw new YarnException(e);
       }
 
       List<TaskAttemptContainerAssignedEvent> result = new ArrayList(events);