You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2013/04/01 18:47:42 UTC

svn commit: r1463203 [3/4] - in /hadoop/common/branches/HDFS-347/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop...

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java Mon Apr  1 16:47:16 2013
@@ -18,10 +18,9 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -35,7 +34,17 @@ import org.apache.avro.util.Utf8;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class TaskFailedEvent implements HistoryEvent {
-  private TaskFailed datum = new TaskFailed();
+  private TaskFailed datum = null;
+
+  private TaskAttemptID failedDueToAttempt;
+  private TaskID id;
+  private TaskType taskType;
+  private long finishTime;
+  private String status;
+  private String error;
+  private Counters counters;
+
+  private static final Counters EMPTY_COUNTERS = new Counters();
 
   /**
    * Create an event to record task failure
@@ -45,45 +54,87 @@ public class TaskFailedEvent implements 
    * @param error Error String
    * @param status Status
    * @param failedDueToAttempt The attempt id due to which the task failed
+   * @param counters Counters for the task
    */
   public TaskFailedEvent(TaskID id, long finishTime, 
       TaskType taskType, String error, String status,
-      TaskAttemptID failedDueToAttempt) {
-    datum.taskid = new Utf8(id.toString());
-    datum.error = new Utf8(error);
-    datum.finishTime = finishTime;
-    datum.taskType = new Utf8(taskType.name());
-    datum.failedDueToAttempt = failedDueToAttempt == null
-      ? null
-      : new Utf8(failedDueToAttempt.toString());
-    datum.status = new Utf8(status);
+      TaskAttemptID failedDueToAttempt, Counters counters) {
+    this.id = id;
+    this.finishTime = finishTime;
+    this.taskType = taskType;
+    this.error = error;
+    this.status = status;
+    this.failedDueToAttempt = failedDueToAttempt;
+    this.counters = counters;
   }
 
+  public TaskFailedEvent(TaskID id, long finishTime, 
+	      TaskType taskType, String error, String status,
+	      TaskAttemptID failedDueToAttempt) {
+    this(id, finishTime, taskType, error, status,
+        failedDueToAttempt, EMPTY_COUNTERS);
+  }
+  
   TaskFailedEvent() {}
 
-  public Object getDatum() { return datum; }
-  public void setDatum(Object datum) { this.datum = (TaskFailed)datum; }
+  public Object getDatum() {
+    if(datum == null) {
+      datum = new TaskFailed();
+      datum.taskid = new Utf8(id.toString());
+      datum.error = new Utf8(error);
+      datum.finishTime = finishTime;
+      datum.taskType = new Utf8(taskType.name());
+      datum.failedDueToAttempt =
+          failedDueToAttempt == null
+          ? null
+          : new Utf8(failedDueToAttempt.toString());
+      datum.status = new Utf8(status);
+      datum.counters = EventWriter.toAvro(counters);
+    }
+    return datum;
+  }
+  
+  public void setDatum(Object odatum) {
+    this.datum = (TaskFailed)odatum;
+    this.id =
+        TaskID.forName(datum.taskid.toString());
+    this.taskType =
+        TaskType.valueOf(datum.taskType.toString());
+    this.finishTime = datum.finishTime;
+    this.error = datum.error.toString();
+    this.failedDueToAttempt =
+        datum.failedDueToAttempt == null
+        ? null
+        : TaskAttemptID.forName(
+            datum.failedDueToAttempt.toString());
+    this.status = datum.status.toString();
+    this.counters =
+        EventReader.fromAvro(datum.counters);
+  }
 
   /** Get the task id */
-  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
+  public TaskID getTaskId() { return id; }
   /** Get the error string */
-  public String getError() { return datum.error.toString(); }
+  public String getError() { return error; }
   /** Get the finish time of the attempt */
-  public long getFinishTime() { return datum.finishTime; }
+  public long getFinishTime() {
+    return finishTime;
+  }
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(datum.taskType.toString());
+    return taskType;
   }
   /** Get the attempt id due to which the task failed */
   public TaskAttemptID getFailedAttemptID() {
-    return datum.failedDueToAttempt == null
-      ? null
-      : TaskAttemptID.forName(datum.failedDueToAttempt.toString());
+    return failedDueToAttempt;
   }
   /** Get the task status */
-  public String getTaskStatus() { return datum.status.toString(); }
+  public String getTaskStatus() { return status; }
+  /** Get task counters */
+  public Counters getCounters() { return counters; }
   /** Get the event type */
-  public EventType getEventType() { return EventType.TASK_FAILED; }
+  public EventType getEventType() {
+    return EventType.TASK_FAILED;
+  }
 
-  
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Mon Apr  1 16:47:16 2013
@@ -49,6 +49,8 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
  * {@link InputFormat#getSplits(JobContext)} method. 
@@ -76,7 +78,7 @@ import org.apache.hadoop.net.NetworkTopo
 @InterfaceStability.Stable
 public abstract class CombineFileInputFormat<K, V>
   extends FileInputFormat<K, V> {
-
+  
   public static final String SPLIT_MINSIZE_PERNODE = 
     "mapreduce.input.fileinputformat.split.minsize.per.node";
   public static final String SPLIT_MINSIZE_PERRACK = 
@@ -163,7 +165,6 @@ public abstract class CombineFileInputFo
   @Override
   public List<InputSplit> getSplits(JobContext job) 
     throws IOException {
-
     long minSizeNode = 0;
     long minSizeRack = 0;
     long maxSize = 0;
@@ -286,56 +287,100 @@ public abstract class CombineFileInputFo
                                  rackToNodes, maxSize);
       totLength += files[i].getLength();
     }
+    createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 
+                 maxSize, minSizeNode, minSizeRack, splits);
+  }
 
+  @VisibleForTesting
+  void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                     HashMap<OneBlockInfo, String[]> blockToNodes,
+                     HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                     long totLength,
+                     long maxSize,
+                     long minSizeNode,
+                     long minSizeRack,
+                     List<InputSplit> splits                     
+                    ) {
     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
     Set<String> nodes = new HashSet<String>();
     long curSplitSize = 0;
+    
+    int numNodes = nodeToBlocks.size();
+    long totalLength = totLength;
 
-    // process all nodes and create splits that are local
-    // to a node. 
-    for (Iterator<Map.Entry<String, 
-         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
-         iter.hasNext();) {
-
-      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-      nodes.add(one.getKey());
-      List<OneBlockInfo> blocksInNode = one.getValue();
-
-      // for each block, copy it into validBlocks. Delete it from 
-      // blockToNodes so that the same block does not appear in 
-      // two different splits.
-      for (OneBlockInfo oneblock : blocksInNode) {
-        if (blockToNodes.containsKey(oneblock)) {
-          validBlocks.add(oneblock);
-          blockToNodes.remove(oneblock);
-          curSplitSize += oneblock.length;
-
-          // if the accumulated split size exceeds the maximum, then 
-          // create this split.
-          if (maxSize != 0 && curSplitSize >= maxSize) {
-            // create an input split and add it to the splits array
-            addCreatedSplit(splits, nodes, validBlocks);
-            curSplitSize = 0;
-            validBlocks.clear();
+    while(true) {
+      // it is allowed for maxSize to be 0. Disable smoothing load for such cases
+      int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
+                                        ((int) (totalLength/maxSize))/numNodes
+                                        : Integer.MAX_VALUE;
+      int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
+      numNodes = 0;
+
+      // process all nodes and create splits that are local to a node.
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
+          .entrySet().iterator(); iter.hasNext();) {
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        nodes.add(one.getKey());
+        List<OneBlockInfo> blocksInNode = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from
+        // blockToNodes so that the same block does not appear in
+        // two different splits.
+        int splitsInNode = 0;
+        for (OneBlockInfo oneblock : blocksInNode) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+
+            // if the accumulated split size exceeds the maximum, then
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(splits, nodes, validBlocks);
+              totalLength -= curSplitSize;
+              curSplitSize = 0;
+              validBlocks.clear();
+              splitsInNode++;
+              if (splitsInNode == maxSplitsByNodeOnly) {
+                // stop grouping on a node so as not to create
+                // disproportionately more splits on a node because it happens
+                // to have many blocks
+                // consider only these nodes in next round of grouping because
+                // they have leftover blocks that may need to be grouped
+                numNodes++;
+                break;
+              }
+            }
           }
         }
-      }
-      // if there were any blocks left over and their combined size is
-      // larger than minSplitNode, then combine them into one split.
-      // Otherwise add them back to the unprocessed pool. It is likely 
-      // that they will be combined with other blocks from the 
-      // same rack later on.
-      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(splits, nodes, validBlocks);
-      } else {
-        for (OneBlockInfo oneblock : validBlocks) {
-          blockToNodes.put(oneblock, oneblock.hosts);
+        // if there were any blocks left over and their combined size is
+        // larger than minSplitNode, then combine them into one split.
+        // Otherwise add them back to the unprocessed pool. It is likely
+        // that they will be combined with other blocks from the
+        // same rack later on.
+        if (minSizeNode != 0 && curSplitSize >= minSizeNode
+            && splitsInNode == 0) {
+          // haven't created any split on this machine. so its ok to add a
+          // smaller
+          // one for parallelism. Otherwise group it in the rack for balanced
+          // size
+          // create an input split and add it to the splits array
+          addCreatedSplit(splits, nodes, validBlocks);
+          totalLength -= curSplitSize;
+        } else {
+          for (OneBlockInfo oneblock : validBlocks) {
+            blockToNodes.put(oneblock, oneblock.hosts);
+          }
         }
+        validBlocks.clear();
+        nodes.clear();
+        curSplitSize = 0;
+      }
+      
+      if(!(numNodes>0 && totalLength>0)) {
+        break;
       }
-      validBlocks.clear();
-      nodes.clear();
-      curSplitSize = 0;
     }
 
     // if blocks in a rack are below the specified minimum size, then keep them
@@ -458,7 +503,6 @@ public abstract class CombineFileInputFo
       offset[i] = validBlocks.get(i).offset;
       length[i] = validBlocks.get(i).length;
     }
-
      // add this split to the list that is returned
     CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
                                    length, locations.toArray(new String[0]));
@@ -474,7 +518,8 @@ public abstract class CombineFileInputFo
   /**
    * information about one file from the File System
    */
-  private static class OneFileInfo {
+  @VisibleForTesting
+  static class OneFileInfo {
     private long fileSize;               // size of the file
     private OneBlockInfo[] blocks;       // all blocks in this file
 
@@ -545,45 +590,55 @@ public abstract class CombineFileInputFo
           }
           blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
         }
+        
+        populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
+                          nodeToBlocks, rackToNodes);
+      }
+    }
+    
+    @VisibleForTesting
+    static void populateBlockInfo(OneBlockInfo[] blocks,
+                          HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                          HashMap<OneBlockInfo, String[]> blockToNodes,
+                          HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                          HashMap<String, Set<String>> rackToNodes) {
+      for (OneBlockInfo oneblock : blocks) {
+        // add this block to the block --> node locations map
+        blockToNodes.put(oneblock, oneblock.hosts);
+
+        // For blocks that do not have host/rack information,
+        // assign to default  rack.
+        String[] racks = null;
+        if (oneblock.hosts.length == 0) {
+          racks = new String[]{NetworkTopology.DEFAULT_RACK};
+        } else {
+          racks = oneblock.racks;
+        }
 
-        for (OneBlockInfo oneblock : blocks) {
-          // add this block to the block --> node locations map
-          blockToNodes.put(oneblock, oneblock.hosts);
-
-          // For blocks that do not have host/rack information,
-          // assign to default  rack.
-          String[] racks = null;
-          if (oneblock.hosts.length == 0) {
-            racks = new String[]{NetworkTopology.DEFAULT_RACK};
-          } else {
-            racks = oneblock.racks;
+        // add this block to the rack --> block map
+        for (int j = 0; j < racks.length; j++) {
+          String rack = racks[j];
+          List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+          if (blklist == null) {
+            blklist = new ArrayList<OneBlockInfo>();
+            rackToBlocks.put(rack, blklist);
           }
-
-          // add this block to the rack --> block map
-          for (int j = 0; j < racks.length; j++) {
-            String rack = racks[j];
-            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              rackToBlocks.put(rack, blklist);
-            }
-            blklist.add(oneblock);
-            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
-              // Add this host to rackToNodes map
-              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
-            }
+          blklist.add(oneblock);
+          if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+            // Add this host to rackToNodes map
+            addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
           }
+        }
 
-          // add this block to the node --> block map
-          for (int j = 0; j < oneblock.hosts.length; j++) {
-            String node = oneblock.hosts[j];
-            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              nodeToBlocks.put(node, blklist);
-            }
-            blklist.add(oneblock);
+        // add this block to the node --> block map
+        for (int j = 0; j < oneblock.hosts.length; j++) {
+          String node = oneblock.hosts[j];
+          List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+          if (blklist == null) {
+            blklist = new ArrayList<OneBlockInfo>();
+            nodeToBlocks.put(node, blklist);
           }
+          blklist.add(oneblock);
         }
       }
     }
@@ -600,7 +655,8 @@ public abstract class CombineFileInputFo
   /**
    * information about one block from the File System
    */
-  private static class OneBlockInfo {
+  @VisibleForTesting
+  static class OneBlockInfo {
     Path onepath;                // name of this file
     long offset;                 // offset in file
     long length;                 // length of this block

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java Mon Apr  1 16:47:16 2013
@@ -154,7 +154,8 @@ public class TokenCache {
    */
   @InterfaceAudience.Private
   public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
-  private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
+  private static final Text JOB_TOKEN = new Text("JobToken");
+  private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
   
   /**
    * load job token from a file
@@ -194,4 +195,14 @@ public class TokenCache {
   public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
     return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
   }
+
+  @InterfaceAudience.Private
+  public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
+    credentials.addSecretKey(SHUFFLE_TOKEN, key);
+  }
+
+  @InterfaceAudience.Private
+  public static byte[] getShuffleSecretKey(Credentials credentials) {
+    return getSecretKey(credentials, SHUFFLE_TOKEN);
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java Mon Apr  1 16:47:16 2013
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.spli
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -29,9 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 
 /**
  * A utility that reads the split meta info and creates
@@ -44,8 +44,8 @@ public class SplitMetaInfoReader {
   public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
       JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) 
   throws IOException {
-    long maxMetaInfoSize = conf.getLong(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE, 
-        10000000L);
+    long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
+        MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
     Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
     String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
     FileStatus fStatus = fs.getFileStatus(metaSplitFile);

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Mon Apr  1 16:47:16 2013
@@ -82,7 +82,7 @@ class Fetcher<K,V> extends Thread {
   private final int connectionTimeout;
   private final int readTimeout;
   
-  private final SecretKey jobTokenSecret;
+  private final SecretKey shuffleSecretKey;
 
   private volatile boolean stopped = false;
 
@@ -92,7 +92,7 @@ class Fetcher<K,V> extends Thread {
   public Fetcher(JobConf job, TaskAttemptID reduceId, 
                  ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
                  Reporter reporter, ShuffleClientMetrics metrics,
-                 ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
+                 ExceptionReporter exceptionReporter, SecretKey shuffleKey) {
     this.reporter = reporter;
     this.scheduler = scheduler;
     this.merger = merger;
@@ -100,7 +100,7 @@ class Fetcher<K,V> extends Thread {
     this.exceptionReporter = exceptionReporter;
     this.id = ++nextId;
     this.reduce = reduceId.getTaskID().getId();
-    this.jobTokenSecret = jobTokenSecret;
+    this.shuffleSecretKey = shuffleKey;
     ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.IO_ERROR.toString());
     wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
@@ -221,7 +221,6 @@ class Fetcher<K,V> extends Thread {
     
     // Construct the url and connect
     DataInputStream input;
-    boolean connectSucceeded = false;
     
     try {
       URL url = getMapOutputURL(host, maps);
@@ -229,7 +228,8 @@ class Fetcher<K,V> extends Thread {
       
       // generate hash of the url
       String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
-      String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
+      String encHash = SecureShuffleUtils.hashFromString(msgToEncode,
+          shuffleSecretKey);
       
       // put url hash into http header
       connection.addRequestProperty(
@@ -237,7 +237,6 @@ class Fetcher<K,V> extends Thread {
       // set the read timeout
       connection.setReadTimeout(readTimeout);
       connect(connection, connectionTimeout);
-      connectSucceeded = true;
       input = new DataInputStream(connection.getInputStream());
 
       // Validate response code
@@ -255,7 +254,7 @@ class Fetcher<K,V> extends Thread {
       }
       LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
       // verify that replyHash is HMac of encHash
-      SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
+      SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecretKey);
       LOG.info("for url="+msgToEncode+" sent hash and received reply");
     } catch (IOException ie) {
       boolean connectExcpt = ie instanceof ConnectException;
@@ -265,18 +264,10 @@ class Fetcher<K,V> extends Thread {
 
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
-      if (!connectSucceeded) {
-        for(TaskAttemptID left: remaining) {
-          scheduler.copyFailed(left, host, connectSucceeded, connectExcpt);
-        }
-      } else {
-        // If we got a read error at this stage, it implies there was a problem
-        // with the first map, typically lost map. So, penalize only that map
-        // and add the rest
-        TaskAttemptID firstMap = maps.get(0);
-        scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt);
+      for(TaskAttemptID left: remaining) {
+        scheduler.copyFailed(left, host, false, connectExcpt);
       }
-      
+     
       // Add back all the remaining maps, WITHOUT marking them as failed
       for(TaskAttemptID left: remaining) {
         scheduler.putBackKnownMapOutput(host, left);
@@ -366,13 +357,20 @@ class Fetcher<K,V> extends Thread {
         return EMPTY_ATTEMPT_ID_ARRAY;
       } 
       
-      // Go!
-      LOG.info("fetcher#" + id + " about to shuffle output of map " + 
-               mapOutput.getMapId() + " decomp: " +
-               decompressedLength + " len: " + compressedLength + " to " +
-               mapOutput.getDescription());
-      mapOutput.shuffle(host, input, compressedLength, decompressedLength,
-                        metrics, reporter);
+      // The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
+      // on decompression failures. Catching and re-throwing as IOException
+      // to allow fetch failure logic to be processed
+      try {
+        // Go!
+        LOG.info("fetcher#" + id + " about to shuffle output of map "
+            + mapOutput.getMapId() + " decomp: " + decompressedLength
+            + " len: " + compressedLength + " to " + mapOutput.getDescription());
+        mapOutput.shuffle(host, input, compressedLength, decompressedLength,
+            metrics, reporter);
+      } catch (java.lang.InternalError e) {
+        LOG.warn("Failed to shuffle for fetcher#"+id, e);
+        throw new IOException(e);
+      }
       
       // Inform the shuffle scheduler
       long endTime = System.currentTimeMillis();

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Mon Apr  1 16:47:16 2013
@@ -475,9 +475,9 @@ public class MergeManagerImpl<K, V> impl
           combineCollector.setWriter(writer);
           combineAndSpill(rIter, reduceCombineInputCounter);
         }
-        compressAwarePath = new CompressAwarePath(outputPath,
-            writer.getRawLength());
         writer.close();
+        compressAwarePath = new CompressAwarePath(outputPath,
+            writer.getRawLength(), writer.getCompressedLength());
 
         LOG.info(reduceId +  
             " Merge of the " + noInMemorySegments +
@@ -500,7 +500,7 @@ public class MergeManagerImpl<K, V> impl
   private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
     
     public OnDiskMerger(MergeManagerImpl<K, V> manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      super(manager, ioSortFactor, exceptionReporter);
       setName("OnDiskMerger - Thread to merge on-disk map-outputs");
       setDaemon(true);
     }
@@ -552,9 +552,9 @@ public class MergeManagerImpl<K, V> impl
                             mergedMapOutputsCounter, null);
 
         Merger.writeFile(iter, writer, reporter, jobConf);
-        compressAwarePath = new CompressAwarePath(outputPath,
-            writer.getRawLength());
         writer.close();
+        compressAwarePath = new CompressAwarePath(outputPath,
+            writer.getRawLength(), writer.getCompressedLength());
       } catch (IOException e) {
         localFS.delete(outputPath, true);
         throw e;
@@ -713,13 +713,15 @@ public class MergeManagerImpl<K, V> impl
             keyClass, valueClass, memDiskSegments, numMemDiskSegments,
             tmpDir, comparator, reporter, spilledRecordsCounter, null, 
             mergePhase);
-        final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
+        Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
             keyClass, valueClass, codec, null);
         try {
           Merger.writeFile(rIter, writer, reporter, job);
-          // add to list of final disk outputs.
+          writer.close();
           onDiskMapOutputs.add(new CompressAwarePath(outputPath,
-              writer.getRawLength()));
+              writer.getRawLength(), writer.getCompressedLength()));
+          writer = null;
+          // add to list of final disk outputs.
         } catch (IOException e) {
           if (null != outputPath) {
             try {
@@ -789,7 +791,7 @@ public class MergeManagerImpl<K, V> impl
       // merges. See comment where mergePhaseFinished is being set
       Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
       RawKeyValueIterator diskMerge = Merger.merge(
-          job, fs, keyClass, valueClass, diskSegments,
+          job, fs, keyClass, valueClass, codec, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
           reporter, false, spilledRecordsCounter, null, thisPhase);
       diskSegments.clear();
@@ -808,24 +810,45 @@ public class MergeManagerImpl<K, V> impl
 
   static class CompressAwarePath extends Path {
     private long rawDataLength;
+    private long compressedSize;
 
-    public CompressAwarePath(Path path, long rawDataLength) {
+    public CompressAwarePath(Path path, long rawDataLength, long compressSize) {
       super(path.toUri());
       this.rawDataLength = rawDataLength;
+      this.compressedSize = compressSize;
     }
 
     public long getRawDataLength() {
       return rawDataLength;
     }
-    
+
+    public long getCompressedSize() {
+      return compressedSize;
+    }
+
     @Override
     public boolean equals(Object other) {
       return super.equals(other);
     }
-    
+
     @Override
     public int hashCode() {
       return super.hashCode();
     }
+
+    @Override
+    public int compareTo(Object obj) {
+      if(obj instanceof CompressAwarePath) {
+        CompressAwarePath compPath = (CompressAwarePath) obj;
+        if(this.compressedSize < compPath.getCompressedSize()) {
+          return -1;
+        } else if (this.getCompressedSize() > compPath.getCompressedSize()) {
+          return 1;
+        }
+        // Not returning 0 here so that objects with the same size (but
+        // different paths) are still added to the TreeSet.
+      }
+      return super.compareTo(obj);
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Mon Apr  1 16:47:16 2013
@@ -48,6 +48,7 @@ class OnDiskMapOutput<K, V> extends MapO
   private final Path outputPath;
   private final MergeManagerImpl<K, V> merger;
   private final OutputStream disk; 
+  private long compressedSize;
 
   public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
                          MergeManagerImpl<K, V> merger, long size,
@@ -108,13 +109,14 @@ class OnDiskMapOutput<K, V> extends MapO
                             bytesLeft + " bytes missing of " + 
                             compressedLength + ")");
     }
+    this.compressedSize = compressedLength;
   }
 
   @Override
   public void commit() throws IOException {
     localFS.rename(tmpOutputPath, outputPath);
     CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
-        getSize());
+        getSize(), this.compressedSize);
     merger.closeOnDiskFile(compressAwarePath);
   }
   

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Mon Apr  1 16:47:16 2013
@@ -108,7 +108,7 @@ public class Shuffle<K, V> implements Sh
     for (int i=0; i < numFetchers; ++i) {
       fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                      reporter, metrics, this, 
-                                     reduceTask.getJobTokenSecret());
+                                     reduceTask.getShuffleSecret());
       fetchers[i].start();
     }
     

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Mon Apr  1 16:47:16 2013
@@ -521,6 +521,8 @@ public class ConfigUtil {
     });
     Configuration.addDeprecation("mapreduce.user.classpath.first",
       MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST);
+    Configuration.addDeprecation(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE,
+        MRJobConfig.SPLIT_METAINFO_MAXSIZE);
   }
 
   public static void main(String[] args) {

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Mon Apr  1 16:47:16 2013
@@ -296,6 +296,14 @@
 </property>
 
 <property>
+  <name>mapreduce.shuffle.max.connections</name>
+  <value>0</value>
+  <description>Max allowed connections for the shuffle.  Set to 0 (zero)
+               to indicate no limit on the number of connections.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.reduce.markreset.buffer.percent</name>
   <value>0.0</value>
   <description>The percentage of memory -relative to the maximum heap size- to
@@ -798,6 +806,14 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.am.max-attempts</name>
+  <value>1</value>
+  <description>The maximum number of application attempts. It is a
+  application-specific setting. It should not be larger than the global number
+  set by resourcemanager. Otherwise, it will be override.</description>
+</property>
+
 <!-- Job Notification Configuration -->
 <property>
  <name>mapreduce.job.end-notification.url</name>

Propchange: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1446831-1462625

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Mon Apr  1 16:47:16 2013
@@ -25,7 +25,9 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.util.ArrayList;
 
@@ -71,6 +73,54 @@ public class TestFetcher {
   }
   
   @SuppressWarnings("unchecked")
+  @Test(timeout=30000)
+  public void testCopyFromHostConnectionTimeout() throws Exception {
+    LOG.info("testCopyFromHostConnectionTimeout");
+    JobConf job = new JobConf();
+    TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
+    Reporter r = mock(Reporter.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    ExceptionReporter except = mock(ExceptionReporter.class);
+    SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
+    HttpURLConnection connection = mock(HttpURLConnection.class);
+    when(connection.getInputStream()).thenThrow(
+        new SocketTimeoutException("This is a fake timeout :)"));
+    
+    Counters.Counter allErrs = mock(Counters.Counter.class);
+    when(r.getCounter(anyString(), anyString()))
+      .thenReturn(allErrs);
+    
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+        r, metrics, except, key, connection);
+
+    MapHost host = new MapHost("localhost", "http://localhost:8080/");
+    
+    ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+    TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    maps.add(map1ID);
+    TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+    maps.add(map2ID);
+    when(ss.getMapsForHost(host)).thenReturn(maps);
+    
+    String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+    
+    underTest.copyFromHost(host);
+    
+    verify(connection)
+      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
+          encHash);
+    
+    verify(allErrs).increment(1);
+    verify(ss).copyFailed(map1ID, host, false, false);
+    verify(ss).copyFailed(map2ID, host, false, false);
+    
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
+  }
+  
+  @SuppressWarnings("unchecked")
   @Test
   public void testCopyFromHostBogusHeader() throws Exception {
     LOG.info("testCopyFromHostBogusHeader");
@@ -184,4 +234,62 @@ public class TestFetcher {
     verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
   }
   
-}
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000) 
+  public void testCopyFromHostCompressFailure() throws Exception {
+    LOG.info("testCopyFromHostCompressFailure");
+    JobConf job = new JobConf();
+    TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
+    InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+    Reporter r = mock(Reporter.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    ExceptionReporter except = mock(ExceptionReporter.class);
+    SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
+    HttpURLConnection connection = mock(HttpURLConnection.class);
+    
+    Counters.Counter allErrs = mock(Counters.Counter.class);
+    when(r.getCounter(anyString(), anyString()))
+      .thenReturn(allErrs);
+    
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+        r, metrics, except, key, connection);
+    
+
+    MapHost host = new MapHost("localhost", "http://localhost:8080/");
+    
+    ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+    TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    maps.add(map1ID);
+    TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+    maps.add(map2ID);
+    when(ss.getMapsForHost(host)).thenReturn(maps);
+    
+    String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+    
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+      .thenReturn(replyHash);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+    when(connection.getInputStream()).thenReturn(in);
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+      .thenReturn(immo);
+    
+    doThrow(new java.lang.InternalError())
+    .when(immo)
+      .shuffle(any(MapHost.class), any(InputStream.class), anyLong(), 
+               anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+    underTest.copyFromHost(host);
+       
+    verify(connection)
+      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
+          encHash);
+    verify(ss, times(1)).copyFailed(map1ID, host, true, false);
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java Mon Apr  1 16:47:16 2013
@@ -17,28 +17,38 @@
  */
 package org.apache.hadoop.mapreduce.task.reduce;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
 import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestMergeManager {
 
   @Test(timeout=10000)
-  @SuppressWarnings("unchecked")
   public void testMemoryMerge() throws Exception {
     final int TOTAL_MEM_BYTES = 10000;
     final int OUTPUT_SIZE = 7950;
@@ -195,4 +205,59 @@ public class TestMergeManager {
       return exceptions.size();
     }
   }
+
+  @SuppressWarnings({ "unchecked", "deprecation" })
+  @Test(timeout=10000)
+  public void testOnDiskMerger() throws IOException, URISyntaxException,
+    InterruptedException {
+    JobConf jobConf = new JobConf();
+    final int SORT_FACTOR = 5;
+    jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);
+
+    MapOutputFile mapOutputFile = new MROutputFiles();
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    MergeManagerImpl<IntWritable, IntWritable> manager =
+      new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
+        , null, null, null, null, null, null, null, null, null, mapOutputFile);
+
+    MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
+      onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
+        IntWritable, IntWritable>) Whitebox.getInternalState(manager,
+          "onDiskMerger");
+    int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
+      "mergeFactor");
+
+    // make sure the io.sort.factor is set properly
+    assertEquals(mergeFactor, SORT_FACTOR);
+
+    // Stop the onDiskMerger thread so that we can intercept the list of files
+    // waiting to be merged.
+    onDiskMerger.suspend();
+
+    //Send the list of fake files waiting to be merged
+    Random rand = new Random();
+    for(int i = 0; i < 2*SORT_FACTOR; ++i) {
+      Path path = new Path("somePath");
+      CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
+      manager.closeOnDiskFile(cap);
+    }
+
+    //Check that the files pending to be merged are in sorted order.
+    LinkedList<List<CompressAwarePath>> pendingToBeMerged =
+      (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
+        onDiskMerger, "pendingToBeMerged");
+    assertTrue("No inputs were added to list pending to merge",
+      pendingToBeMerged.size() > 0);
+    for(int i = 0; i < pendingToBeMerged.size(); ++i) {
+      List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
+      for(int j = 1; j < inputs.size(); ++j) {
+        assertTrue("Not enough / too many inputs were going to be merged",
+          inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
+        assertTrue("Inputs to be merged were not sorted according to size: ",
+          inputs.get(j).getCompressedSize()
+          >= inputs.get(j-1).getCompressedSize());
+      }
+    }
+
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Mon Apr  1 16:47:16 2013
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -107,6 +108,11 @@ public class CompletedTaskAttempt implem
   }
 
   @Override
+  public Phase getPhase() {
+    return Phase.CLEANUP;
+  }
+
+  @Override
   public TaskAttemptState getState() {
     return state;
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksBlock.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksBlock.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksBlock.java Mon Apr  1 16:47:16 2013
@@ -65,8 +65,12 @@ public class HsTasksBlock extends HtmlBl
     if (!symbol.isEmpty()) {
       type = MRApps.taskType(symbol);
     }
-
-    THEAD<TABLE<Hamlet>> thead = html.table("#tasks").thead();
+    THEAD<TABLE<Hamlet>> thead;
+    if(type != null)
+      thead = html.table("#"+app.getJob().getID() 
+        + type).$class("dt-tasks").thead();
+    else
+      thead = html.table("#tasks").thead();
     //Create the spanning row
     int attemptColSpan = type == TaskType.REDUCE ? 8 : 3;
     thead.tr().

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java Mon Apr  1 16:47:16 2013
@@ -22,7 +22,9 @@ import static org.apache.hadoop.mapreduc
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_SELECTOR;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initSelector;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI.postInitID;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
 
@@ -42,6 +44,8 @@ public class HsTasksPage extends HsView 
   @Override protected void preHead(Page.HTML<_> html) {
     commonPreHead(html);
     set(DATATABLES_ID, "tasks");
+    set(DATATABLES_SELECTOR, ".dt-tasks" );
+    set(initSelector(DATATABLES), tasksTableInit());
     set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
     set(initID(DATATABLES, "tasks"), tasksTableInit());
     set(postInitID(DATATABLES, "tasks"), jobsPostTableInit());

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java Mon Apr  1 16:47:16 2013
@@ -77,13 +77,18 @@ public class MockHistoryJobs extends Moc
     for(Map.Entry<JobId, Job> entry: mocked.entrySet()) {
       JobId id = entry.getKey();
       Job j = entry.getValue();
-      ret.full.put(id, new MockCompletedJob(j));
-      JobReport report = j.getReport();
+      MockCompletedJob mockJob = new MockCompletedJob(j);
+      // use MockCompletedJob to set everything below to make sure
+      // consistent with what history server would do
+      ret.full.put(id, mockJob);
+      JobReport report = mockJob.getReport();
       JobIndexInfo info = new JobIndexInfo(report.getStartTime(), 
-          report.getFinishTime(), j.getUserName(), j.getName(), id, 
-          j.getCompletedMaps(), j.getCompletedReduces(), String.valueOf(j.getState()));
-      info.setQueueName(j.getQueueName());
+          report.getFinishTime(), mockJob.getUserName(), mockJob.getName(), id, 
+          mockJob.getCompletedMaps(), mockJob.getCompletedReduces(),
+          String.valueOf(mockJob.getState()));
+      info.setQueueName(mockJob.getQueueName());
       ret.partial.put(id, new PartialJob(info, id));
+
     }
     return ret;
   }
@@ -99,12 +104,16 @@ public class MockHistoryJobs extends Moc
 
     @Override
     public int getCompletedMaps() {
-      return job.getCompletedMaps();
+      // we always return total since this is history server
+      // and PartialJob also assumes completed - total
+      return job.getTotalMaps();
     }
 
     @Override
     public int getCompletedReduces() {
-      return job.getCompletedReduces();
+      // we always return total since this is history server
+      // and PartialJob also assumes completed - total
+      return job.getTotalReduces();
     }
 
     @Override

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Mon Apr  1 16:47:16 2013
@@ -79,7 +79,7 @@ public class TestJobHistoryEntities {
   }
 
   /* Verify some expected values based on the history file */
-  @Test
+  @Test (timeout=10000)
   public void testCompletedJob() throws Exception {
     HistoryFileInfo info = mock(HistoryFileInfo.class);
     when(info.getConfFile()).thenReturn(fullConfPath);
@@ -104,7 +104,7 @@ public class TestJobHistoryEntities {
     assertEquals(JobState.SUCCEEDED, jobReport.getJobState());
   }
   
-  @Test
+  @Test (timeout=10000)
   public void testCompletedTask() throws Exception {
     HistoryFileInfo info = mock(HistoryFileInfo.class);
     when(info.getConfFile()).thenReturn(fullConfPath);
@@ -133,7 +133,7 @@ public class TestJobHistoryEntities {
     assertEquals(rt1Id, rt1Report.getTaskId());
   }
   
-  @Test
+  @Test (timeout=10000)
   public void testCompletedTaskAttempt() throws Exception {
     HistoryFileInfo info = mock(HistoryFileInfo.class);
     when(info.getConfFile()).thenReturn(fullConfPath);

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Mon Apr  1 16:47:16 2013
@@ -25,7 +25,6 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
@@ -67,8 +66,17 @@ public class TestJobHistoryEvents {
      * completed maps 
     */
     HistoryContext context = new JobHistory();
+    // test start and stop states
     ((JobHistory)context).init(conf);
-    Job parsedJob = context.getJob(jobId);
+    ((JobHistory)context).start();
+    Assert.assertTrue( context.getStartTime()>0);
+    Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED);
+    
+    
+    ((JobHistory)context).stop();
+    Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED);
+      Job parsedJob = context.getJob(jobId);
+    
     Assert.assertEquals("CompletedMaps not correct", 2,
         parsedJob.getCompletedMaps());
     Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName());
@@ -177,9 +185,8 @@ public class TestJobHistoryEvents {
     @Override
     protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
         AppContext context) {
-      JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(
-          context, getStartCount());
-      return eventHandler;
+      return new JobHistoryEventHandler(
+              context, getStartCount());
     }
   }
 

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Mon Apr  1 16:47:16 2013
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -40,6 +42,7 @@ import org.apache.hadoop.mapreduce.TaskI
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.EventReader;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
@@ -60,7 +63,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
 import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -78,21 +80,27 @@ public class TestJobHistoryParsing {
 
   private static final String RACK_NAME = "/MyRackName";
 
+  private  ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+
   public static class MyResolver implements DNSToSwitchMapping {
     @Override
     public List<String> resolve(List<String> names) {
       return Arrays.asList(new String[]{RACK_NAME});
     }
+
+    @Override
+    public void reloadCachedMappings() {
+    }
   }
 
-  @Test
+  @Test (timeout=50000)
   public void testJobInfo() throws Exception {
     JobInfo info = new JobInfo();
     Assert.assertEquals("NORMAL", info.getPriority());
     info.printAll();
   }
 
-  @Test
+  @Test (timeout=50000)
   public void testHistoryParsing() throws Exception {
     LOG.info("STARTING testHistoryParsing()");
     try {
@@ -102,7 +110,7 @@ public class TestJobHistoryParsing {
     }
   }
   
-  @Test
+  @Test (timeout=50000)
   public void testHistoryParsingWithParseErrors() throws Exception {
     LOG.info("STARTING testHistoryParsingWithParseErrors()");
     try {
@@ -317,18 +325,37 @@ public class TestJobHistoryParsing {
         }
       }
     }
+    
+    // test output for HistoryViewer
+    PrintStream stdps=System.out;
+    try {
+      System.setOut(new PrintStream(outContent));
+      HistoryViewer viewer = new HistoryViewer(fc.makeQualified(
+          fileInfo.getHistoryFile()).toString(), conf, true);
+      viewer.print();
+      
+      for (TaskInfo taskInfo : allTasks.values()) { 
+        
+        String test=  (taskInfo.getTaskStatus()==null?"":taskInfo.getTaskStatus())+" "+taskInfo.getTaskType()+" task list for "+taskInfo.getTaskId().getJobID();
+        Assert.assertTrue(outContent.toString().indexOf(test)>0);
+        Assert.assertTrue(outContent.toString().indexOf(taskInfo.getTaskId().toString())>0);
+      }
+    } finally {
+      System.setOut(stdps);
+
+    }
   }
-  
+
   // Computes finished maps similar to RecoveryService...
-  private long computeFinishedMaps(JobInfo jobInfo, 
-      int numMaps, int numSuccessfulMaps) {
+  private long computeFinishedMaps(JobInfo jobInfo, int numMaps,
+      int numSuccessfulMaps) {
     if (numMaps == numSuccessfulMaps) {
       return jobInfo.getFinishedMaps();
     }
-    
+
     long numFinishedMaps = 0;
-    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = 
-        jobInfo.getAllTasks();
+    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+        .getAllTasks();
     for (TaskInfo taskInfo : taskInfos.values()) {
       if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
         ++numFinishedMaps;
@@ -337,7 +364,7 @@ public class TestJobHistoryParsing {
     return numFinishedMaps;
   }
   
-  @Test
+  @Test (timeout=50000)
   public void testHistoryParsingForFailedAttempts() throws Exception {
     LOG.info("STARTING testHistoryParsingForFailedAttempts");
     try {
@@ -404,7 +431,7 @@ public class TestJobHistoryParsing {
     }
   }
   
-  @Test
+  @Test (timeout=5000)
   public void testCountersForFailedTask() throws Exception {
     LOG.info("STARTING testCountersForFailedTask");
     try {
@@ -455,13 +482,16 @@ public class TestJobHistoryParsing {
       CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
       Assert.assertNotNull("completed task report has null counters",
           ct.getReport().getCounters());
+      //Make sure all the completedTask has counters, and the counters are not empty
+      Assert.assertTrue(ct.getReport().getCounters()
+          .getAllCounterGroups().size() > 0);
     }
     } finally {
       LOG.info("FINISHED testCountersForFailedTask");
     }
   }
 
-  @Test
+  @Test (timeout=50000)
   public void testScanningOldDirs() throws Exception {
     LOG.info("STARTING testScanningOldDirs");
     try {

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java Mon Apr  1 16:47:16 2013
@@ -117,6 +117,7 @@ public class TestHsWebServicesJobs exten
       fullJobs = jobs.full;
     }
 
+
     TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
       this(appid, numJobs, numTasks, numAttempts, false);
     }
@@ -411,7 +412,8 @@ public class TestHsWebServicesJobs exten
       JSONObject json = response.getEntity(JSONObject.class);
       assertEquals("incorrect number of elements", 1, json.length());
       JSONObject info = json.getJSONObject("job");
-      VerifyJobsUtils.verifyHsJob(info, jobsMap.get(id));
+
+      VerifyJobsUtils.verifyHsJob(info, appContext.getJob(id));
     }
   }
 
@@ -613,7 +615,7 @@ public class TestHsWebServicesJobs exten
       JSONObject json = response.getEntity(JSONObject.class);
       assertEquals("incorrect number of elements", 1, json.length());
       JSONObject info = json.getJSONObject("jobCounters");
-      verifyHsJobCounters(info, jobsMap.get(id));
+      verifyHsJobCounters(info, appContext.getJob(id));
     }
   }
 
@@ -631,7 +633,7 @@ public class TestHsWebServicesJobs exten
       JSONObject json = response.getEntity(JSONObject.class);
       assertEquals("incorrect number of elements", 1, json.length());
       JSONObject info = json.getJSONObject("jobCounters");
-      verifyHsJobCounters(info, jobsMap.get(id));
+      verifyHsJobCounters(info, appContext.getJob(id));
     }
   }
   
@@ -689,7 +691,7 @@ public class TestHsWebServicesJobs exten
       JSONObject json = response.getEntity(JSONObject.class);
       assertEquals("incorrect number of elements", 1, json.length());
       JSONObject info = json.getJSONObject("jobCounters");
-      verifyHsJobCounters(info, jobsMap.get(id));
+      verifyHsJobCounters(info, appContext.getJob(id));
     }
   }
 
@@ -711,7 +713,7 @@ public class TestHsWebServicesJobs exten
       is.setCharacterStream(new StringReader(xml));
       Document dom = db.parse(is);
       NodeList info = dom.getElementsByTagName("jobCounters");
-      verifyHsJobCountersXML(info, jobsMap.get(id));
+      verifyHsJobCountersXML(info, appContext.getJob(id));
     }
   }
 

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java Mon Apr  1 16:47:16 2013
@@ -284,9 +284,9 @@ public class TestHsWebServicesJobsQuery 
     String type = exception.getString("exception");
     String classname = exception.getString("javaClassName");
     WebServicesTestUtils
-        .checkStringMatch(
+        .checkStringContains(
             "exception message",
-            "No enum const class org.apache.hadoop.mapreduce.v2.api.records.JobState.InvalidState",
+            "org.apache.hadoop.mapreduce.v2.api.records.JobState.InvalidState",
             message);
     WebServicesTestUtils.checkStringMatch("exception type",
         "IllegalArgumentException", type);

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Mon Apr  1 16:47:16 2013
@@ -106,8 +106,9 @@ public class ResourceMgrDelegate extends
 
   public QueueInfo getQueue(String queueName) throws IOException,
   InterruptedException {
-    return TypeConverter.fromYarn(
-        super.getQueueInfo(queueName), this.conf);
+    org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
+        super.getQueueInfo(queueName);
+    return (queueInfo == null) ? null : TypeConverter.fromYarn(queueInfo, conf);
   }
 
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Mon Apr  1 16:47:16 2013
@@ -481,6 +481,9 @@ public class YARNRunner implements Clien
     appContext.setCancelTokensWhenComplete(
         conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
     appContext.setAMContainerSpec(amContainer);         // AM Container
+    appContext.setMaxAppAttempts(
+        conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
+            MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
 
     return appContext;
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java Mon Apr  1 16:47:16 2013
@@ -38,6 +38,11 @@ public class MiniMRClientClusterFactory 
 
   public static MiniMRClientCluster create(Class<?> caller, int noOfNMs,
       Configuration conf) throws IOException {
+    return create(caller, caller.getSimpleName(), noOfNMs, conf);
+  }
+
+  public static MiniMRClientCluster create(Class<?> caller, String identifier,
+      int noOfNMs, Configuration conf) throws IOException {
 
     if (conf == null) {
       conf = new Configuration();
@@ -45,7 +50,7 @@ public class MiniMRClientClusterFactory 
 
     FileSystem fs = FileSystem.get(conf);
 
-    Path testRootDir = new Path("target", caller.getName() + "-tmpDir")
+    Path testRootDir = new Path("target", identifier + "-tmpDir")
         .makeQualified(fs);
     Path appJar = new Path(testRootDir, "MRAppJar.jar");
 
@@ -65,10 +70,10 @@ public class MiniMRClientClusterFactory 
     fs.setPermission(remoteCallerJar, new FsPermission("744"));
     job.addFileToClassPath(remoteCallerJar);
 
-    MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
-        .getName(), noOfNMs);
+    MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(identifier,
+        noOfNMs);
     job.getConfiguration().set("minimrclientcluster.caller.name",
-        caller.getName());
+        identifier);
     job.getConfiguration().setInt("minimrclientcluster.nodemanagers.number",
         noOfNMs);
     miniMRYarnCluster.init(job.getConfiguration());

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java Mon Apr  1 16:47:16 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -177,8 +178,10 @@ public class MiniMRCluster {
       int numTrackerToExclude, Clock clock) throws IOException {
     if (conf == null) conf = new JobConf();
     FileSystem.setDefaultUri(conf, namenode);
+    String identifier = this.getClass().getSimpleName() + "_"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE));
     mrClientCluster = MiniMRClientClusterFactory.create(this.getClass(),
-        numTaskTrackers, conf);
+        identifier, numTaskTrackers, conf);
   }
 
   public UserGroupInformation getUgi() {

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java Mon Apr  1 16:47:16 2013
@@ -26,12 +26,12 @@ import org.apache.hadoop.io.compress.Def
 import org.apache.hadoop.io.compress.GzipCodec;
 
 import org.junit.Test;
-
+import static org.junit.Assert.*;
 public class TestIFile {
 
   @Test
   /**
-   * Create an IFile.Writer using GzipCodec since this codec does not
+   * Create an IFile.Writer using GzipCodec since this code does not
    * have a compressor when run via the tests (ie no native libraries).
    */
   public void testIFileWriterWithCodec() throws Exception {
@@ -63,5 +63,11 @@ public class TestIFile {
     IFile.Reader<Text, Text> reader =
       new IFile.Reader<Text, Text>(conf, rfs, path, codec, null);
     reader.close();
+    
+    // test check sum 
+    byte[] ab= new byte[100];
+    int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
+    assertEquals( readed,reader.checksumIn.getChecksum().length);
+    
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java Mon Apr  1 16:47:16 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ClassUtil;
 
 
 import static org.junit.Assert.*;
@@ -79,7 +80,7 @@ public class TestJobConf {
     Class clazz = Class.forName(CLASSNAME, true, cl);
     assertNotNull(clazz);
 
-    String containingJar = JobConf.findContainingJar(clazz);
+    String containingJar = ClassUtil.findContainingJar(clazz);
     assertEquals(jar.getAbsolutePath(), containingJar);
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java Mon Apr  1 16:47:16 2013
@@ -61,8 +61,12 @@ import org.apache.hadoop.util.Reflection
  */
 public class TestMapProgress extends TestCase {
   public static final Log LOG = LogFactory.getLog(TestMapProgress.class);
-  private static String TEST_ROOT_DIR = new File(System.getProperty(
-           "test.build.data", "/tmp")).getAbsolutePath() + "/mapPahseprogress";
+  private static String TEST_ROOT_DIR;
+  static {
+    String root = new File(System.getProperty("test.build.data", "/tmp"))
+      .getAbsolutePath();
+    TEST_ROOT_DIR = new Path(root, "mapPhaseprogress").toString();
+  }
 
   static class FakeUmbilical implements TaskUmbilicalProtocol {
 

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultiFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultiFileSplit.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultiFileSplit.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultiFileSplit.java Mon Apr  1 16:47:16 2013
@@ -21,13 +21,20 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Arrays;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
-
+/**
+ * 
+ * test MultiFileSplit class
+ */
 public class TestMultiFileSplit extends TestCase{
 
     public void testReadWrite() throws Exception {
@@ -58,4 +65,26 @@ public class TestMultiFileSplit extends 
       assertTrue(Arrays.equals(split.getLengths(), readSplit.getLengths()));
       System.out.println(split.toString());
     }
+    
+    /**
+     * test method getLocations
+     * @throws IOException
+     */
+    public void testgetLocations() throws IOException{
+        JobConf job= new JobConf();
+      
+      File tmpFile = File.createTempFile("test","txt");
+      tmpFile.createNewFile();
+      OutputStream out=new FileOutputStream(tmpFile);
+      out.write("tempfile".getBytes());
+      out.flush();
+      out.close();
+      Path[] path= {new Path(tmpFile.getAbsolutePath())};
+      long[] lengths = {100};
+      
+      MultiFileSplit  split = new MultiFileSplit(job,path,lengths);
+     String [] locations= split.getLocations();
+     assertTrue(locations.length==1);
+     assertEquals(locations[0], "localhost");
+    }
 }