You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2014/06/20 07:04:02 UTC

svn commit: r1604086 - in /hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-clie...

Author: vinayakumarb
Date: Fri Jun 20 05:03:56 2014
New Revision: 1604086

URL: http://svn.apache.org/r1604086
Log:
Merged latest changes from trunk

Added:
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/
      - copied from r1604085, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java
      - copied unchanged from r1604085, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java
      - copied unchanged from r1604085, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java
Removed:
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified:
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (contents, props changed)
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
    hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java

Propchange: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1603355-1604085

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/CHANGES.txt?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/CHANGES.txt Fri Jun 20 05:03:56 2014
@@ -213,6 +213,12 @@ Release 2.5.0 - UNRELEASED
     MAPREDUCE-5834. Increased test-timeouts in TestGridMixClasses to avoid
     occassional failures. (Mit Desai via vinodkv)
 
+    MAPREDUCE-5896. InputSplits should indicate which locations have the block 
+    cached in memory. (Sandy Ryza via kasha)
+
+    MAPREDUCE-5844. Add a configurable delay to reducer-preemption. 
+    (Maysam Yabandeh via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Propchange: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1603355-1604085

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Fri Jun 20 05:03:56 2014
@@ -475,8 +475,8 @@
    <Match>
      <Class name="org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator" />
      <Or>
-      <Field name="mapResourceReqt" />
-      <Field name="reduceResourceReqt" />
+      <Field name="mapResourceRequest" />
+      <Field name="reduceResourceRequest" />
       <Field name="maxReduceRampupLimit" />
       <Field name="reduceSlowStart" />
      </Or>

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Jun 20 05:03:56 2014
@@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.RackResolver;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -143,15 +144,21 @@ public class RMContainerAllocator extend
   private int lastCompletedTasks = 0;
   
   private boolean recalculateReduceSchedule = false;
-  private int mapResourceReqt;//memory
-  private int reduceResourceReqt;//memory
+  private int mapResourceRequest;//memory
+  private int reduceResourceRequest;//memory
   
   private boolean reduceStarted = false;
   private float maxReduceRampupLimit = 0;
   private float maxReducePreemptionLimit = 0;
+  /**
+   * after this threshold, if the container request is not allocated, it is
+   * considered delayed.
+   */
+  private long allocationDelayThresholdMs = 0;
   private float reduceSlowStart = 0;
   private long retryInterval;
   private long retrystartTime;
+  private Clock clock;
 
   private final AMPreemptionPolicy preemptionPolicy;
 
@@ -166,6 +173,7 @@ public class RMContainerAllocator extend
     super(clientService, context);
     this.preemptionPolicy = preemptionPolicy;
     this.stopped = new AtomicBoolean(false);
+    this.clock = context.getClock();
   }
 
   @Override
@@ -180,6 +188,9 @@ public class RMContainerAllocator extend
     maxReducePreemptionLimit = conf.getFloat(
         MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
+    allocationDelayThresholdMs = conf.getInt(
+        MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
+        MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
     RackResolver.init(conf);
     retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
                                 MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@@ -246,7 +257,7 @@ public class RMContainerAllocator extend
           getJob().getTotalMaps(), completedMaps,
           scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
           assignedRequests.maps.size(), assignedRequests.reduces.size(),
-          mapResourceReqt, reduceResourceReqt,
+          mapResourceRequest, reduceResourceRequest,
           pendingReduces.size(), 
           maxReduceRampupLimit, reduceSlowStart);
       recalculateReduceSchedule = false;
@@ -268,6 +279,18 @@ public class RMContainerAllocator extend
     scheduleStats.log("Final Stats: ");
   }
 
+  @Private
+  @VisibleForTesting
+  AssignedRequests getAssignedRequests() {
+    return assignedRequests;
+  }
+
+  @Private
+  @VisibleForTesting
+  ScheduledRequests getScheduledRequests() {
+    return scheduledRequests;
+  }
+
   public boolean getIsReduceStarted() {
     return reduceStarted;
   }
@@ -303,16 +326,16 @@ public class RMContainerAllocator extend
       int supportedMaxContainerCapability =
           getMaxContainerCapability().getMemory();
       if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
-        if (mapResourceReqt == 0) {
-          mapResourceReqt = reqEvent.getCapability().getMemory();
+        if (mapResourceRequest == 0) {
+          mapResourceRequest = reqEvent.getCapability().getMemory();
           eventHandler.handle(new JobHistoryEvent(jobId, 
               new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
-              mapResourceReqt)));
-          LOG.info("mapResourceReqt:"+mapResourceReqt);
-          if (mapResourceReqt > supportedMaxContainerCapability) {
+                  mapResourceRequest)));
+          LOG.info("mapResourceRequest:"+ mapResourceRequest);
+          if (mapResourceRequest > supportedMaxContainerCapability) {
             String diagMsg = "MAP capability required is more than the supported " +
-            "max container capability in the cluster. Killing the Job. mapResourceReqt: " + 
-            mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
+            "max container capability in the cluster. Killing the Job. mapResourceRequest: " +
+                mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability;
             LOG.info(diagMsg);
             eventHandler.handle(new JobDiagnosticsUpdateEvent(
                 jobId, diagMsg));
@@ -320,20 +343,20 @@ public class RMContainerAllocator extend
           }
         }
         //set the rounded off memory
-        reqEvent.getCapability().setMemory(mapResourceReqt);
+        reqEvent.getCapability().setMemory(mapResourceRequest);
         scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
       } else {
-        if (reduceResourceReqt == 0) {
-          reduceResourceReqt = reqEvent.getCapability().getMemory();
+        if (reduceResourceRequest == 0) {
+          reduceResourceRequest = reqEvent.getCapability().getMemory();
           eventHandler.handle(new JobHistoryEvent(jobId, 
               new NormalizedResourceEvent(
                   org.apache.hadoop.mapreduce.TaskType.REDUCE,
-              reduceResourceReqt)));
-          LOG.info("reduceResourceReqt:"+reduceResourceReqt);
-          if (reduceResourceReqt > supportedMaxContainerCapability) {
+                  reduceResourceRequest)));
+          LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
+          if (reduceResourceRequest > supportedMaxContainerCapability) {
             String diagMsg = "REDUCE capability required is more than the " +
             		"supported max container capability in the cluster. Killing the " +
-            		"Job. reduceResourceReqt: " + reduceResourceReqt +
+            		"Job. reduceResourceRequest: " + reduceResourceRequest +
             		" maxContainerCapability:" + supportedMaxContainerCapability;
             LOG.info(diagMsg);
             eventHandler.handle(new JobDiagnosticsUpdateEvent(
@@ -342,7 +365,7 @@ public class RMContainerAllocator extend
           }
         }
         //set the rounded off memory
-        reqEvent.getCapability().setMemory(reduceResourceReqt);
+        reqEvent.getCapability().setMemory(reduceResourceRequest);
         if (reqEvent.getEarlierAttemptFailed()) {
           //add to the front of queue for fail fast
           pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
@@ -394,8 +417,22 @@ public class RMContainerAllocator extend
     return host;
   }
 
-  private void preemptReducesIfNeeded() {
-    if (reduceResourceReqt == 0) {
+  @Private
+  @VisibleForTesting
+  synchronized void setReduceResourceRequest(int mem) {
+    this.reduceResourceRequest = mem;
+  }
+
+  @Private
+  @VisibleForTesting
+  synchronized void setMapResourceRequest(int mem) {
+    this.mapResourceRequest = mem;
+  }
+
+  @Private
+  @VisibleForTesting
+  void preemptReducesIfNeeded() {
+    if (reduceResourceRequest == 0) {
       return; //no reduces
     }
     //check if reduces have taken over the whole cluster and there are 
@@ -403,9 +440,9 @@ public class RMContainerAllocator extend
     if (scheduledRequests.maps.size() > 0) {
       int memLimit = getMemLimit();
       int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
-          assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
+          assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest);
       //availableMemForMap must be sufficient to run atleast 1 map
-      if (availableMemForMap < mapResourceReqt) {
+      if (availableMemForMap < mapResourceRequest) {
         //to make sure new containers are given to maps and not reduces
         //ramp down all scheduled reduces if any
         //(since reduces are scheduled at higher priority than maps)
@@ -414,22 +451,40 @@ public class RMContainerAllocator extend
           pendingReduces.add(req);
         }
         scheduledRequests.reduces.clear();
-        
-        //preempt for making space for at least one map
-        int premeptionLimit = Math.max(mapResourceReqt, 
-            (int) (maxReducePreemptionLimit * memLimit));
-        
-        int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, 
-            premeptionLimit);
-        
-        int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
-        toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
-        
-        LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
-        assignedRequests.preemptReduce(toPreempt);
+
+        //do further checking to find the number of map requests that were
+        //hanging around for a while
+        int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
+        if (hangingMapRequests > 0) {
+          //preempt for making space for at least one map
+          int premeptionLimit = Math.max(mapResourceRequest,
+              (int) (maxReducePreemptionLimit * memLimit));
+
+          int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
+              premeptionLimit);
+
+          int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest);
+          toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
+
+          LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
+          assignedRequests.preemptReduce(toPreempt);
+        }
       }
     }
   }
+
+  private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
+    if (allocationDelayThresholdMs <= 0)
+      return requestMap.size();
+    int hangingRequests = 0;
+    long currTime = clock.getTime();
+    for (ContainerRequest request: requestMap.values()) {
+      long delay = currTime - request.requestTimeMs;
+      if (delay > allocationDelayThresholdMs)
+        hangingRequests++;
+    }
+    return hangingRequests;
+  }
   
   @Private
   public void scheduleReduces(
@@ -715,11 +770,13 @@ public class RMContainerAllocator extend
   @Private
   public int getMemLimit() {
     int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
-    return headRoom + assignedRequests.maps.size() * mapResourceReqt + 
-       assignedRequests.reduces.size() * reduceResourceReqt;
+    return headRoom + assignedRequests.maps.size() * mapResourceRequest +
+       assignedRequests.reduces.size() * reduceResourceRequest;
   }
-  
-  private class ScheduledRequests {
+
+  @Private
+  @VisibleForTesting
+  class ScheduledRequests {
     
     private final LinkedList<TaskAttemptId> earlierFailedMaps = 
       new LinkedList<TaskAttemptId>();
@@ -729,7 +786,8 @@ public class RMContainerAllocator extend
       new HashMap<String, LinkedList<TaskAttemptId>>();
     private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = 
       new HashMap<String, LinkedList<TaskAttemptId>>();
-    private final Map<TaskAttemptId, ContainerRequest> maps = 
+    @VisibleForTesting
+    final Map<TaskAttemptId, ContainerRequest> maps =
       new LinkedHashMap<TaskAttemptId, ContainerRequest>();
     
     private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces = 
@@ -825,22 +883,22 @@ public class RMContainerAllocator extend
         int allocatedMemory = allocated.getResource().getMemory();
         if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
             || PRIORITY_MAP.equals(priority)) {
-          if (allocatedMemory < mapResourceReqt
+          if (allocatedMemory < mapResourceRequest
               || maps.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a map as either "
-                + " container memory less than required " + mapResourceReqt
+                + " container memory less than required " + mapResourceRequest
                 + " or no pending map tasks - maps.isEmpty=" 
                 + maps.isEmpty()); 
             isAssignable = false; 
           }
         } 
         else if (PRIORITY_REDUCE.equals(priority)) {
-          if (allocatedMemory < reduceResourceReqt
+          if (allocatedMemory < reduceResourceRequest
               || reduces.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a reduce as either "
-                + " container memory less than required " + reduceResourceReqt
+                + " container memory less than required " + reduceResourceRequest
                 + " or no pending reduce tasks - reduces.isEmpty=" 
                 + reduces.isEmpty()); 
             isAssignable = false;
@@ -1119,14 +1177,18 @@ public class RMContainerAllocator extend
     }
   }
 
-  private class AssignedRequests {
+  @Private
+  @VisibleForTesting
+  class AssignedRequests {
     private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
       new HashMap<ContainerId, TaskAttemptId>();
     private final LinkedHashMap<TaskAttemptId, Container> maps = 
       new LinkedHashMap<TaskAttemptId, Container>();
-    private final LinkedHashMap<TaskAttemptId, Container> reduces = 
+    @VisibleForTesting
+    final LinkedHashMap<TaskAttemptId, Container> reduces =
       new LinkedHashMap<TaskAttemptId, Container>();
-    private final Set<TaskAttemptId> preemptionWaitingReduces = 
+    @VisibleForTesting
+    final Set<TaskAttemptId> preemptionWaitingReduces =
       new HashSet<TaskAttemptId>();
     
     void add(Container container, TaskAttemptId tId) {

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Fri Jun 20 05:03:56 2014
@@ -29,8 +29,10 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -96,6 +98,8 @@ public abstract class RMContainerRequest
     super(clientService, context);
   }
 
+  @Private
+  @VisibleForTesting
   static class ContainerRequest {
     final TaskAttemptId attemptID;
     final Resource capability;
@@ -103,20 +107,39 @@ public abstract class RMContainerRequest
     final String[] racks;
     //final boolean earlierAttemptFailed;
     final Priority priority;
-    
+    /**
+     * the time when this request object was formed; can be used to avoid
+     * aggressive preemption for recently placed requests
+     */
+    final long requestTimeMs;
+
     public ContainerRequest(ContainerRequestEvent event, Priority priority) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
           event.getRacks(), priority);
     }
-    
+
+    public ContainerRequest(ContainerRequestEvent event, Priority priority,
+                            long requestTimeMs) {
+      this(event.getAttemptID(), event.getCapability(), event.getHosts(),
+          event.getRacks(), priority, requestTimeMs);
+    }
+
+    public ContainerRequest(TaskAttemptId attemptID,
+                            Resource capability, String[] hosts, String[] racks,
+                            Priority priority) {
+      this(attemptID, capability, hosts, racks, priority,
+          System.currentTimeMillis());
+    }
+
     public ContainerRequest(TaskAttemptId attemptID,
-        Resource capability, String[] hosts, String[] racks, 
-        Priority priority) {
+        Resource capability, String[] hosts, String[] racks,
+        Priority priority, long requestTimeMs) {
       this.attemptID = attemptID;
       this.capability = capability;
       this.hosts = hosts;
       this.racks = racks;
       this.priority = priority;
+      this.requestTimeMs = requestTimeMs;
     }
     
     public String toString() {

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Fri Jun 20 05:03:56 2014
@@ -295,6 +295,15 @@ public abstract class FileInputFormat<K,
                                 String[] hosts) {
     return new FileSplit(file, start, length, hosts);
   }
+  
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts, String[] inMemoryHosts) {
+    return new FileSplit(file, start, length, hosts, inMemoryHosts);
+  }
 
   /** Splits files returned by {@link #listStatus(JobConf)} when
    * they're too big.*/ 
@@ -337,22 +346,22 @@ public abstract class FileInputFormat<K,
 
           long bytesRemaining = length;
           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-            String[] splitHosts = getSplitHosts(blkLocations,
+            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                 length-bytesRemaining, splitSize, clusterMap);
             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
-                splitHosts));
+                splitHosts[0], splitHosts[1]));
             bytesRemaining -= splitSize;
           }
 
           if (bytesRemaining != 0) {
-            String[] splitHosts = getSplitHosts(blkLocations, length
+            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                 - bytesRemaining, bytesRemaining, clusterMap);
             splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
-                splitHosts));
+                splitHosts[0], splitHosts[1]));
           }
         } else {
-          String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
-          splits.add(makeSplit(path, 0, length, splitHosts));
+          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
+          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
         }
       } else { 
         //Create empty hosts array for zero length files
@@ -538,10 +547,30 @@ public abstract class FileInputFormat<K,
    * @param blkLocations The list of block locations
    * @param offset 
    * @param splitSize 
-   * @return array of hosts that contribute most to this split
+   * @return an array of hosts that contribute most to this split
    * @throws IOException
    */
   protected String[] getSplitHosts(BlockLocation[] blkLocations, 
+      long offset, long splitSize, NetworkTopology clusterMap) throws IOException {
+    return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize,
+        clusterMap)[0];
+  }
+  
+  /** 
+   * This function identifies and returns the hosts that contribute 
+   * most for a given split. For calculating the contribution, rack
+   * locality is treated on par with host locality, so hosts from racks
+   * that contribute the most are preferred over hosts on racks that 
+   * contribute less
+   * @param blkLocations The list of block locations
+   * @param offset 
+   * @param splitSize 
+   * @return two arrays - one of hosts that contribute most to this split, and
+   *    one of hosts that contribute most to this split that have the data
+   *    cached on them
+   * @throws IOException
+   */
+  private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 
       long offset, long splitSize, NetworkTopology clusterMap)
   throws IOException {
 
@@ -552,7 +581,8 @@ public abstract class FileInputFormat<K,
 
     //If this is the only block, just return
     if (bytesInThisBlock >= splitSize) {
-      return blkLocations[startIndex].getHosts();
+      return new String[][] { blkLocations[startIndex].getHosts(),
+          blkLocations[startIndex].getCachedHosts() };
     }
 
     long bytesInFirstBlock = bytesInThisBlock;
@@ -639,7 +669,9 @@ public abstract class FileInputFormat<K,
     
     } // for all indices
 
-    return identifyHosts(allTopos.length, racksMap);
+    // We don't yet support cached hosts when bytesInThisBlock > splitSize
+    return new String[][] { identifyHosts(allTopos.length, racksMap),
+        new String[0]};
   }
   
   private String[] identifyHosts(int replicationFactor, 

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java Fri Jun 20 05:03:56 2014
@@ -24,6 +24,7 @@ import java.io.DataOutput;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.fs.Path;
 
 /** A section of an input file.  Returned by {@link
@@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit 
-                       implements InputSplit {
+                       implements InputSplitWithLocationInfo {
   org.apache.hadoop.mapreduce.lib.input.FileSplit fs; 
   protected FileSplit() {
     fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit();
@@ -62,6 +63,20 @@ public class FileSplit extends org.apach
            length, hosts);
   }
   
+  /** Constructs a split with host information
+  *
+  * @param file the file name
+  * @param start the position of the first byte in the file to process
+  * @param length the number of bytes in the file to process
+  * @param hosts the list of hosts containing the block, possibly null
+  * @param inMemoryHosts the list of hosts containing the block in memory
+  */
+ public FileSplit(Path file, long start, long length, String[] hosts,
+     String[] inMemoryHosts) {
+   fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start,
+          length, hosts, inMemoryHosts);
+ }
+  
   public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) {
     this.fs = fs;
   }
@@ -92,4 +107,9 @@ public class FileSplit extends org.apach
     return fs.getLocations();
   }
   
+  @Override
+  @Evolving
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return fs.getLocationInfo();
+  }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java Fri Jun 20 05:03:56 2014
@@ -22,6 +22,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -51,10 +53,25 @@ public abstract class InputSplit {
   /**
    * Get the list of nodes by name where the data for the split would be local.
    * The locations do not need to be serialized.
+   * 
    * @return a new array of the node nodes.
    * @throws IOException
    * @throws InterruptedException
    */
   public abstract 
     String[] getLocations() throws IOException, InterruptedException;
+  
+  /**
+   * Gets info about which nodes the input split is stored on and how it is
+   * stored at each location.
+   * 
+   * @return list of <code>SplitLocationInfo</code>s describing how the split
+   *    data is stored at each location. A null value indicates that all the
+   *    locations have the data stored on disk.
+   * @throws IOException
+   */
+  @Evolving
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return null;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Jun 20 05:03:56 2014
@@ -579,7 +579,17 @@ public interface MRJobConfig {
       MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
   public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
       50;
-  
+
+  /**
+   * The threshold in terms of seconds after which an unsatisfied mapper request
+   * triggers reducer preemption to free space. Default 0 implies that the reduces
+   * should be preempted immediately after allocation if there is currently no
+   * room for newly allocated mappers.
+   */
+  public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
+      "mapreduce.job.reducer.preempt.delay.sec";
+  public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0;
+
   public static final String MR_AM_ENV =
       MR_AM_PREFIX + "env";
   

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Jun 20 05:03:56 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -359,6 +360,15 @@ public abstract class FileInputFormat<K,
                                 String[] hosts) {
     return new FileSplit(file, start, length, hosts);
   }
+  
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts, String[] inMemoryHosts) {
+    return new FileSplit(file, start, length, hosts, inMemoryHosts);
+  }
 
   /** 
    * Generate the list of files and make them into FileSplits.
@@ -392,17 +402,20 @@ public abstract class FileInputFormat<K,
           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
-                                     blkLocations[blkIndex].getHosts()));
+                        blkLocations[blkIndex].getHosts(),
+                        blkLocations[blkIndex].getCachedHosts()));
             bytesRemaining -= splitSize;
           }
 
           if (bytesRemaining != 0) {
             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
-                       blkLocations[blkIndex].getHosts()));
+                       blkLocations[blkIndex].getHosts(),
+                       blkLocations[blkIndex].getCachedHosts()));
           }
         } else { // not splitable
-          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
+          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
+                      blkLocations[0].getCachedHosts()));
         }
       } else { 
         //Create empty hosts array for zero length files

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Fri Jun 20 05:03:56 2014
@@ -22,11 +22,13 @@ import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
 
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -41,6 +43,7 @@ public class FileSplit extends InputSpli
   private long start;
   private long length;
   private String[] hosts;
+  private SplitLocationInfo[] hostInfos;
 
   public FileSplit() {}
 
@@ -57,6 +60,31 @@ public class FileSplit extends InputSpli
     this.length = length;
     this.hosts = hosts;
   }
+  
+  /** Constructs a split with host and cached-blocks information
+  *
+  * @param file the file name
+  * @param start the position of the first byte in the file to process
+  * @param length the number of bytes in the file to process
+  * @param hosts the list of hosts containing the block
+  * @param inMemoryHosts the list of hosts containing the block in memory
+  */
+ public FileSplit(Path file, long start, long length, String[] hosts,
+     String[] inMemoryHosts) {
+   this(file, start, length, hosts);
+   hostInfos = new SplitLocationInfo[hosts.length];
+   for (int i = 0; i < hosts.length; i++) {
+     // because N will be tiny, scanning is probably faster than a HashSet
+     boolean inMemory = false;
+     for (String inMemoryHost : inMemoryHosts) {
+       if (inMemoryHost.equals(hosts[i])) {
+         inMemory = true;
+         break;
+       }
+     }
+     hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
+   }
+ }
  
   /** The file containing this split's data. */
   public Path getPath() { return file; }
@@ -98,4 +126,10 @@ public class FileSplit extends InputSpli
       return this.hosts;
     }
   }
+  
+  @Override
+  @Evolving
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return hostInfos;
+  }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Jun 20 05:03:56 2014
@@ -83,6 +83,16 @@
 </property>
 
 <property>
+  <name>mapreduce.job.reducer.preempt.delay.sec</name>
+  <value>0</value>
+  <description>The threshold in terms of seconds after which an unsatisfied mapper 
+  request triggers reducer preemption to free space. Default 0 implies that the 
+  reduces should be preempted immediately after allocation if there is currently no
+  room for newly allocated mappers.
+  </description>
+</property>
+
+<property>
     <name>mapreduce.job.max.split.locations</name>
     <value>10</value>
     <description>The max number of block locations to store for each split for 

Propchange: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1598456-1604085

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Fri Jun 20 05:03:56 2014
@@ -103,6 +103,29 @@ public class TestFileInputFormat {
   }
   
   @Test
+  public void testSplitLocationInfo() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        "test:///a1/a2");
+    JobConf job = new JobConf(conf);
+    TextInputFormat fileInputFormat = new TextInputFormat();
+    fileInputFormat.configure(job);
+    FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1);
+    String[] locations = splits[0].getLocations();
+    Assert.assertEquals(2, locations.length);
+    SplitLocationInfo[] locationInfo = splits[0].getLocationInfo();
+    Assert.assertEquals(2, locationInfo.length);
+    SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
+        locationInfo[0] : locationInfo[1];
+    SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
+        locationInfo[0] : locationInfo[1];
+    Assert.assertTrue(localhostInfo.isOnDisk());
+    Assert.assertTrue(localhostInfo.isInMemory());
+    Assert.assertTrue(otherhostInfo.isOnDisk());
+    Assert.assertFalse(otherhostInfo.isInMemory());
+  }
+  
+  @Test
   public void testListStatusSimple() throws IOException {
     Configuration conf = new Configuration();
     conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
@@ -223,8 +246,9 @@ public class TestFileInputFormat {
     public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
         throws IOException {
       return new BlockLocation[] {
-          new BlockLocation(new String[] { "localhost:50010" },
-              new String[] { "localhost" }, 0, len) };
+          new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
+              new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
+              new String[0], 0, len, false) };
     }
 
     @Override

Modified: hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Fri Jun 20 05:03:56 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.After;
@@ -139,6 +140,28 @@ public class TestFileInputFormat {
         1, mockFs.numListLocatedStatusCalls);
     FileSystem.closeAll();
   }
+  
+  @Test
+  public void testSplitLocationInfo() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        "test:///a1/a2");
+    Job job = Job.getInstance(conf);
+    TextInputFormat fileInputFormat = new TextInputFormat();
+    List<InputSplit> splits = fileInputFormat.getSplits(job);
+    String[] locations = splits.get(0).getLocations();
+    Assert.assertEquals(2, locations.length);
+    SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo();
+    Assert.assertEquals(2, locationInfo.length);
+    SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
+        locationInfo[0] : locationInfo[1];
+    SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
+        locationInfo[0] : locationInfo[1];
+    Assert.assertTrue(localhostInfo.isOnDisk());
+    Assert.assertTrue(localhostInfo.isInMemory());
+    Assert.assertTrue(otherhostInfo.isOnDisk());
+    Assert.assertFalse(otherhostInfo.isInMemory());
+  }
 
   @Test
   public void testListStatusSimple() throws IOException {
@@ -402,9 +425,9 @@ public class TestFileInputFormat {
     public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
         throws IOException {
       return new BlockLocation[] {
-          new BlockLocation(new String[] { "localhost:50010" },
-              new String[] { "localhost" }, 0, len) };
-    }
+          new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
+              new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
+              new String[0], 0, len, false) };    }
 
     @Override
     protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,