You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:15 UTC

svn commit: r1513258 [3/4] - in /hadoop/common/branches/YARN-321/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Mon Aug 12 21:25:49 2013
@@ -60,7 +60,7 @@ class Fetcher<K,V> extends Thread {
   /* Default read timeout (in milliseconds) */
   private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
 
-  private final Reporter reporter;
+  protected final Reporter reporter;
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
@@ -71,13 +71,13 @@ class Fetcher<K,V> extends Thread {
   private final Counters.Counter badIdErrs;
   private final Counters.Counter wrongMapErrs;
   private final Counters.Counter wrongReduceErrs;
-  private final MergeManager<K,V> merger;
-  private final ShuffleSchedulerImpl<K,V> scheduler;
-  private final ShuffleClientMetrics metrics;
-  private final ExceptionReporter exceptionReporter;
-  private final int id;
+  protected final MergeManager<K,V> merger;
+  protected final ShuffleSchedulerImpl<K,V> scheduler;
+  protected final ShuffleClientMetrics metrics;
+  protected final ExceptionReporter exceptionReporter;
+  protected final int id;
   private static int nextId = 0;
-  private final int reduce;
+  protected final int reduce;
   
   private final int connectionTimeout;
   private final int readTimeout;
@@ -407,7 +407,14 @@ class Fetcher<K,V> extends Thread {
       }
       
       // Get the location for the map output - either in-memory or on-disk
-      mapOutput = merger.reserve(mapId, decompressedLength, id);
+      try {
+        mapOutput = merger.reserve(mapId, decompressedLength, id);
+      } catch (IOException ioe) {
+        // kill this reduce attempt
+        ioErrs.increment(1);
+        scheduler.reportLocalError(ioe);
+        return EMPTY_ATTEMPT_ID_ARRAY;
+      }
       
       // Check if we can shuffle *now* ...
       if (mapOutput == null) {

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java Mon Aug 12 21:25:49 2013
@@ -24,6 +24,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -39,11 +40,11 @@ public class InMemoryReader<K, V> extend
   DataInputBuffer memDataIn = new DataInputBuffer();
   private int start;
   private int length;
-
+  
   public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
-                        byte[] data, int start, int length)
+                        byte[] data, int start, int length, Configuration conf)
   throws IOException {
-    super(null, null, length - start, null, null);
+    super(conf, null, length - start, null, null);
     this.merger = merger;
     this.taskAttemptId = taskAttemptId;
 

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Mon Aug 12 21:25:49 2013
@@ -613,7 +613,7 @@ public class MergeManagerImpl<K, V> impl
       fullSize -= size;
       Reader<K,V> reader = new InMemoryReader<K,V>(MergeManagerImpl.this, 
                                                    mo.getMapId(),
-                                                   data, 0, (int)size);
+                                                   data, 0, (int)size, jobConf);
       inMemorySegments.add(new Segment<K,V>(reader, true, 
                                             (mo.isPrimaryMapOutput() ? 
                                             mergedMapOutputsCounter : null)));

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Mon Aug 12 21:25:49 2013
@@ -18,10 +18,12 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Task;
@@ -56,6 +58,7 @@ public class Shuffle<K, V> implements Sh
   private Progress copyPhase;
   private TaskStatus taskStatus;
   private Task reduceTask; //Used for status updates
+  private Map<TaskAttemptID, MapOutputFile> localMapFiles;
 
   @Override
   public void init(ShuffleConsumerPlugin.Context context) {
@@ -69,6 +72,7 @@ public class Shuffle<K, V> implements Sh
     this.copyPhase = context.getCopyPhase();
     this.taskStatus = context.getStatus();
     this.reduceTask = context.getReduceTask();
+    this.localMapFiles = context.getLocalMapFiles();
     
     scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
         this, copyPhase, context.getShuffledMapsCounter(),
@@ -103,13 +107,22 @@ public class Shuffle<K, V> implements Sh
     eventFetcher.start();
     
     // Start the map-output fetcher threads
-    final int numFetchers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
+    boolean isLocal = localMapFiles != null;
+    final int numFetchers = isLocal ? 1 :
+      jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
     Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
-    for (int i=0; i < numFetchers; ++i) {
-      fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
-                                     reporter, metrics, this, 
-                                     reduceTask.getShuffleSecret());
-      fetchers[i].start();
+    if (isLocal) {
+      fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
+          merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
+          localMapFiles);
+      fetchers[0].start();
+    } else {
+      for (int i=0; i < numFetchers; ++i) {
+        fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
+                                       reporter, metrics, this, 
+                                       reduceTask.getShuffleSecret());
+        fetchers[i].start();
+      }
     }
     
     // Wait for shuffle to complete successfully

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java Mon Aug 12 21:25:49 2013
@@ -19,7 +19,9 @@ package org.apache.hadoop.mapreduce.task
 
 import java.io.IOException;
 
+import java.net.InetAddress;
 import java.net.URI;
+import java.net.UnknownHostException;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -252,6 +254,16 @@ public class ShuffleSchedulerImpl<K,V> i
 
     failedShuffleCounter.increment(1);
   }
+  
+  public void reportLocalError(IOException ioe) {
+    try {
+      LOG.error("Shuffle failed : local error on this node: "
+          + InetAddress.getLocalHost());
+    } catch (UnknownHostException e) {
+      LOG.error("Shuffle failed : local error on this node");
+    }
+    reporter.reportException(ioe);
+  }
 
   // Notify the JobTracker
   // after every read error, if 'reportReadErrorImmediately' is true or

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Mon Aug 12 21:25:49 2013
@@ -1566,4 +1566,16 @@
   <description>Whether to use fixed ports with the minicluster</description>
 </property>
 
+<property>
+  <name>mapreduce.jobhistory.admin.address</name>
+  <value>0.0.0.0:10033</value>
+  <description>The address of the History server admin interface.</description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.admin.acl</name>
+  <value>*</value>
+  <description>ACL of who can be admin of the History server.</description>
+</property>
+
 </configuration>

Propchange: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1503799-1513205
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1380921,1507259

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java Mon Aug 12 21:25:49 2013
@@ -155,7 +155,7 @@ public class TestShufflePlugin<K, V> {
                                                 mockCounter, mockCounter, mockCounter,
                                                 mockCounter, mockCounter, mockCounter,
                                                 mockTaskStatus, mockProgress, mockProgress,
-                                                mockTask, mockMapOutputFile);
+                                                mockTask, mockMapOutputFile, null);
       shuffleConsumerPlugin.init(context);
       shuffleConsumerPlugin.run();
       shuffleConsumerPlugin.close();

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Mon Aug 12 21:25:49 2013
@@ -24,11 +24,14 @@ import java.util.List;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.Test;
@@ -77,6 +80,23 @@ public class TestFileInputFormat {
         .toString());
   }
 
+  @Test
+  public void testListLocatedStatus() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.setBoolean("fs.test.impl.disable.cache", false);
+    conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2");
+    MockFileSystem mockFs =
+        (MockFileSystem) new Path("test:///").getFileSystem(conf);
+    Assert.assertEquals("listLocatedStatus already called",
+        0, mockFs.numListLocatedStatusCalls);
+    Job job = Job.getInstance(conf);
+    FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
+    List<InputSplit> splits = fileInputFormat.getSplits(job);
+    Assert.assertEquals("Input splits are not correct", 2, splits.size());
+    Assert.assertEquals("listLocatedStatuss calls",
+        1, mockFs.numListLocatedStatusCalls);
+  }
+
   private Configuration getConfiguration() {
     Configuration conf = new Configuration();
     conf.set("fs.test.impl.disable.cache", "true");
@@ -86,13 +106,14 @@ public class TestFileInputFormat {
   }
 
   static class MockFileSystem extends RawLocalFileSystem {
+    int numListLocatedStatusCalls = 0;
 
     @Override
     public FileStatus[] listStatus(Path f) throws FileNotFoundException,
         IOException {
       if (f.toString().equals("test:/a1")) {
         return new FileStatus[] {
-            new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")),
+            new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")),
             new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) };
       } else if (f.toString().equals("test:/a1/a2")) {
         return new FileStatus[] {
@@ -116,5 +137,20 @@ public class TestFileInputFormat {
         throws FileNotFoundException, IOException {
       return this.listStatus(f);
     }
+
+    @Override
+    public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
+        throws IOException {
+      return new BlockLocation[] {
+          new BlockLocation(new String[] { "localhost:50010" },
+              new String[] { "localhost" }, 0, len) };
+    }
+
+    @Override
+    protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
+        PathFilter filter) throws FileNotFoundException, IOException {
+      ++numListLocatedStatusCalls;
+      return super.listLocatedStatus(f, filter);
+    }
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java Mon Aug 12 21:25:49 2013
@@ -170,6 +170,7 @@ public class TestTokenCache {
     assertNull(conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY));
   }
   
+  @SuppressWarnings("deprecation")
   @Test
   public void testGetTokensForNamenodes() throws IOException,
       URISyntaxException {

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Mon Aug 12 21:25:49 2013
@@ -58,6 +58,7 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.junit.Test;
 
 import org.mockito.invocation.InvocationOnMock;
@@ -114,6 +115,36 @@ public class TestFetcher {
     LOG.info("<<<< " + name.getMethodName());
   }
   
+  @Test
+  public void testReduceOutOfDiskSpace() throws Throwable {
+    LOG.info("testReduceOutOfDiskSpace");
+    
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+        r, metrics, except, key, connection);
+
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+    
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+    .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+    .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+    .thenReturn(replyHash);
+    when(connection.getInputStream()).thenReturn(in);
+    
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+    .thenThrow(new DiskErrorException("No disk space available"));
+  
+    underTest.copyFromHost(host);
+    verify(ss).reportLocalError(any(IOException.class));
+  }
+  
   @Test(timeout=30000)
   public void testCopyFromHostConnectionTimeout() throws Exception {
     when(connection.getInputStream()).thenThrow(

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs-plugins/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs-plugins/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs-plugins/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs-plugins/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
   <parent>
     <artifactId>hadoop-mapreduce-client</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.3.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-hs-plugins</artifactId>
-  <version>2.2.0-SNAPSHOT</version>
+  <version>2.3.0-SNAPSHOT</version>
   <name>hadoop-mapreduce-client-hs-plugins</name>
 
   <properties>

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
   <parent>
     <artifactId>hadoop-mapreduce-client</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.3.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-hs</artifactId>
-  <version>2.2.0-SNAPSHOT</version>
+  <version>2.3.0-SNAPSHOT</version>
   <name>hadoop-mapreduce-client-hs</name>
 
   <properties>
@@ -67,6 +67,34 @@
           </excludes>
         </configuration>
       </plugin>
+      <plugin>
+		<groupId>org.apache.hadoop</groupId>
+		<artifactId>hadoop-maven-plugins</artifactId>
+		<executions>
+			<execution>
+				<id>compile-protoc</id>
+				<phase>generate-sources</phase>
+				<goals>
+				<goal>protoc</goal>
+				</goals>
+				<configuration>
+				<imports>
+					<param>
+						${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto
+					</param>
+					<param>${basedir}/src/main/proto</param>
+				</imports>
+				<source>
+					<directory>${basedir}/src/main/proto</directory>
+					<includes>
+						<include>HSAdminRefreshProtocol.proto</include>
+					</includes>
+				</source>
+				<output>${project.build.directory}/generated-sources/java</output>
+				</configuration>
+			</execution>
+		</executions>
+		</plugin>
     </plugins>
   </build>
 

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java Mon Aug 12 21:25:49 2013
@@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.v2.jo
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Manages an in memory cache of parsed Job History files.
  */
@@ -58,12 +60,16 @@ public class CachedHistoryStorage extend
     this.hsManager = hsManager;
   }
 
-  @SuppressWarnings("serial")
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
     LOG.info("CachedHistoryStorage Init");
 
+    createLoadedJobCache(conf);
+  }
+
+  @SuppressWarnings("serial")
+  private void createLoadedJobCache(Configuration conf) {
     loadedJobCacheSize = conf.getInt(
         JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
         JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
@@ -76,11 +82,25 @@ public class CachedHistoryStorage extend
       }
     });
   }
-
+  
+  public void refreshLoadedJobCache() {
+    if (getServiceState() == STATE.STARTED) {
+      setConfig(createConf());
+      createLoadedJobCache(getConfig());
+    } else {
+      LOG.warn("Failed to execute refreshLoadedJobCache: CachedHistoryStorage is not started");
+    }
+  }
+  
+  @VisibleForTesting
+  Configuration createConf() {
+    return new Configuration();
+  }
+  
   public CachedHistoryStorage() {
     super(CachedHistoryStorage.class.getName());
   }
-
+  
   private Job loadJob(HistoryFileInfo fileInfo) {
     try {
       Job job = fileInfo.loadJob();
@@ -98,6 +118,11 @@ public class CachedHistoryStorage extend
     }
   }
 
+  @VisibleForTesting
+  Map<JobId, Job> getLoadedJobCache() {
+    return loadedJobCache;
+  }
+  
   @Override
   public Job getFullJob(JobId jobId) {
     if (LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Mon Aug 12 21:25:49 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
@@ -61,7 +62,9 @@ import org.apache.hadoop.mapreduce.v2.jo
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ShutdownThreadsHelper;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -416,7 +419,7 @@ public class HistoryFileManager extends 
       return historyFile;
     }
     
-    private synchronized void delete() throws IOException {
+    protected synchronized void delete() throws IOException {
       if (LOG.isDebugEnabled()) {
         LOG.debug("deleting " + historyFile + " and " + confFile);
       }
@@ -471,8 +474,8 @@ public class HistoryFileManager extends 
   private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
   private FileContext intermediateDoneDirFc; // Intermediate Done Dir
                                              // FileContext
-
-  private ThreadPoolExecutor moveToDoneExecutor = null;
+  @VisibleForTesting
+  protected ThreadPoolExecutor moveToDoneExecutor = null;
   private long maxHistoryAge = 0;
   
   public HistoryFileManager() {
@@ -524,10 +527,7 @@ public class HistoryFileManager extends 
     maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
         JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
     
-    jobListCache = new JobListCache(conf.getInt(
-        JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
-        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE),
-        maxHistoryAge);
+    jobListCache = createJobListCache();
 
     serialNumberIndex = new SerialNumberIndex(conf.getInt(
         JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
@@ -544,6 +544,18 @@ public class HistoryFileManager extends 
     super.serviceInit(conf);
   }
 
+  @Override
+  public void serviceStop() throws Exception {
+    ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
+    super.serviceStop();
+  }
+
+  protected JobListCache createJobListCache() {
+    return new JobListCache(conf.getInt(
+        JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
+        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge);
+  }
+
   private void mkdir(FileContext fc, Path path, FsPermission fsp)
       throws IOException {
     if (!fc.util().exists(path)) {
@@ -656,18 +668,18 @@ public class HistoryFileManager extends 
     return jhStatusList;
   }
 
-  private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
+  protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
       FileContext fc) throws IOException {
     return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
   }
-
+  
   /**
    * Finds all history directories with a timestamp component by scanning the
    * filesystem. Used when the JobHistory server is started.
    * 
-   * @return
+   * @return list of history directories
    */
-  private List<FileStatus> findTimestampedDirectories() throws IOException {
+  protected List<FileStatus> findTimestampedDirectories() throws IOException {
     List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
         doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
     return fsList;
@@ -954,7 +966,7 @@ public class HistoryFileManager extends 
         }
       }
       if (!halted) {
-        doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
+        deleteDir(serialDir);
         removeDirectoryFromSerialNumberIndex(serialDir.getPath());
         existingDoneSubdirs.remove(serialDir.getPath());
       } else {
@@ -962,6 +974,13 @@ public class HistoryFileManager extends 
       }
     }
   }
+  
+  protected boolean deleteDir(FileStatus serialDir)
+      throws AccessControlException, FileNotFoundException,
+      UnsupportedFileSystemException, IOException {
+    return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
+  }
+
   @VisibleForTesting
   protected void setMaxHistoryAge(long newValue){
     maxHistoryAge=newValue;

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Mon Aug 12 21:25:49 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
 import org.apache.hadoop.yarn.util.Clock;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -71,7 +73,11 @@ public class JobHistory extends Abstract
 
   private HistoryStorage storage = null;
   private HistoryFileManager hsManager = null;
-
+  ScheduledFuture<?> futureHistoryCleaner = null;
+  
+  //History job cleaner interval
+  private long cleanerInterval;
+  
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     LOG.info("JobHistory Init");
@@ -84,7 +90,7 @@ public class JobHistory extends Abstract
         JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
         JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
 
-    hsManager = new HistoryFileManager();
+    hsManager = createHistoryFileManager();
     hsManager.init(conf);
     try {
       hsManager.initExisting();
@@ -92,9 +98,8 @@ public class JobHistory extends Abstract
       throw new YarnRuntimeException("Failed to intialize existing directories", e);
     }
 
-    storage = ReflectionUtils.newInstance(conf.getClass(
-        JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
-        HistoryStorage.class), conf);
+    storage = createHistoryStorage();
+    
     if (storage instanceof Service) {
       ((Service) storage).init(conf);
     }
@@ -103,6 +108,16 @@ public class JobHistory extends Abstract
     super.serviceInit(conf);
   }
 
+  protected HistoryStorage createHistoryStorage() {
+    return ReflectionUtils.newInstance(conf.getClass(
+        JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
+        HistoryStorage.class), conf);
+  }
+  
+  protected HistoryFileManager createHistoryFileManager() {
+    return new HistoryFileManager();
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     hsManager.start();
@@ -118,19 +133,14 @@ public class JobHistory extends Abstract
         moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
 
     // Start historyCleaner
-    boolean startCleanerService = conf.getBoolean(
-        JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
-    if (startCleanerService) {
-      long runInterval = conf.getLong(
-          JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
-          JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
-      scheduledExecutor
-          .scheduleAtFixedRate(new HistoryCleaner(),
-              30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
-    }
+    scheduleHistoryCleaner();
     super.serviceStart();
   }
 
+  protected int getInitDelaySecs() {
+    return 30;
+  }
+  
   @Override
   protected void serviceStop() throws Exception {
     LOG.info("Stopping JobHistory");
@@ -225,6 +235,25 @@ public class JobHistory extends Abstract
     return storage.getAllPartialJobs();
   }
 
+  public void refreshLoadedJobCache() {
+    if (getServiceState() == STATE.STARTED) {
+      if (storage instanceof CachedHistoryStorage) {
+        ((CachedHistoryStorage) storage).refreshLoadedJobCache();
+      } else {
+        throw new UnsupportedOperationException(storage.getClass().getName()
+            + " is expected to be an instance of "
+            + CachedHistoryStorage.class.getName());
+      }
+    } else {
+      LOG.warn("Failed to execute refreshLoadedJobCache: JobHistory service is not started");
+    }
+  }
+
+  @VisibleForTesting
+  HistoryStorage getHistoryStorage() {
+    return storage;
+  }
+  
   /**
    * Look for a set of partial jobs.
    * 
@@ -256,6 +285,43 @@ public class JobHistory extends Abstract
         fBegin, fEnd, jobState);
   }
 
+  public void refreshJobRetentionSettings() {
+    if (getServiceState() == STATE.STARTED) {
+      conf = createConf();
+      long maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
+          JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
+      hsManager.setMaxHistoryAge(maxHistoryAge);
+      if (futureHistoryCleaner != null) {
+        futureHistoryCleaner.cancel(false);
+      }
+      futureHistoryCleaner = null;
+      scheduleHistoryCleaner();
+    } else {
+      LOG.warn("Failed to execute refreshJobRetentionSettings : Job History service is not started");
+    }
+  }
+
+  private void scheduleHistoryCleaner() {
+    boolean startCleanerService = conf.getBoolean(
+        JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
+    if (startCleanerService) {
+      cleanerInterval = conf.getLong(
+          JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
+          JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
+
+      futureHistoryCleaner = scheduledExecutor.scheduleAtFixedRate(
+          new HistoryCleaner(), getInitDelaySecs() * 1000l, cleanerInterval,
+          TimeUnit.MILLISECONDS);
+    }
+  }
+
+  protected Configuration createConf() {
+    return new Configuration();
+  }
+  
+  public long getCleanerInterval() {
+    return cleanerInterval;
+  }
   // TODO AppContext - Not Required
   private ApplicationAttemptId appAttemptID;
 

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Mon Aug 12 21:25:49 2013
@@ -26,11 +26,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.v2.hs.server.HSAdminServer;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
@@ -59,6 +61,7 @@ public class JobHistoryServer extends Co
   private JobHistory jobHistoryService;
   private JHSDelegationTokenSecretManager jhsDTSecretManager;
   private AggregatedLogDeletionService aggLogDelService;
+  private HSAdminServer hsAdminServer;
 
   public JobHistoryServer() {
     super(JobHistoryServer.class.getName());
@@ -81,9 +84,11 @@ public class JobHistoryServer extends Co
     clientService = new HistoryClientService(historyContext, 
         this.jhsDTSecretManager);
     aggLogDelService = new AggregatedLogDeletionService();
+    hsAdminServer = new HSAdminServer(aggLogDelService, jobHistoryService);
     addService(jobHistoryService);
     addService(clientService);
     addService(aggLogDelService);
+    addService(hsAdminServer);
     super.serviceInit(config);
   }
 
@@ -135,11 +140,13 @@ public class JobHistoryServer extends Co
     return this.clientService;
   }
 
-  public static void main(String[] args) {
-    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+  static JobHistoryServer launchJobHistoryServer(String[] args) {
+    Thread.
+        setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
+    JobHistoryServer jobHistoryServer = null;
     try {
-      JobHistoryServer jobHistoryServer = new JobHistoryServer();
+      jobHistoryServer = new JobHistoryServer();
       ShutdownHookManager.get().addShutdownHook(
           new CompositeServiceShutdownHook(jobHistoryServer),
           SHUTDOWN_HOOK_PRIORITY);
@@ -148,7 +155,12 @@ public class JobHistoryServer extends Co
       jobHistoryServer.start();
     } catch (Throwable t) {
       LOG.fatal("Error starting JobHistoryServer", t);
-      System.exit(-1);
+      ExitUtil.terminate(-1, "Error starting JobHistoryServer");
     }
+    return jobHistoryServer;
+  }
+
+  public static void main(String[] args) {
+    launchJobHistoryServer(args);
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Mon Aug 12 21:25:49 2013
@@ -70,12 +70,15 @@ public class TestJobHistoryEvents {
     ((JobHistory)context).start();
     Assert.assertTrue( context.getStartTime()>0);
     Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED);
-    
-    
+
+    // get job before stopping JobHistory
+    Job parsedJob = context.getJob(jobId);
+
+    // stop JobHistory
     ((JobHistory)context).stop();
     Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED);
-      Job parsedJob = context.getJob(jobId);
-    
+
+
     Assert.assertEquals("CompletedMaps not correct", 2,
         parsedJob.getCompletedMaps());
     Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName());

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Mon Aug 12 21:25:49 2013
@@ -535,7 +535,10 @@ public class TestJobHistoryParsing {
       Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
 
       fileInfo = hfm.getFileInfo(jobId);
+      hfm.stop();
       Assert.assertNotNull("Unable to locate old job history", fileInfo);
+      Assert.assertTrue("HistoryFileManager not shutdown properly",
+          hfm.moveToDoneExecutor.isTerminated());
     } finally {
       LOG.info("FINISHED testScanningOldDirs");
     }
@@ -636,6 +639,9 @@ public class TestJobHistoryParsing {
       // correct live time
       hfm.setMaxHistoryAge(-1);
       hfm.clean();
+      hfm.stop();
+      Assert.assertTrue("Thread pool shutdown",
+          hfm.moveToDoneExecutor.isTerminated());
       // should be deleted !
       Assert.assertTrue("file should be deleted ", fileInfo.isDeleted());
 

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java Mon Aug 12 21:25:49 2013
@@ -63,19 +63,16 @@ public class TestJobHistoryServer {
   private static RecordFactory recordFactory = RecordFactoryProvider
           .getRecordFactory(null);
 
-
-  
   JobHistoryServer historyServer=null;
+
   // simple test init/start/stop   JobHistoryServer. Status should change.
-  
   @Test (timeout= 50000 )
   public void testStartStopServer() throws Exception {
-
     historyServer = new JobHistoryServer();
     Configuration config = new Configuration();
     historyServer.init(config);
     assertEquals(STATE.INITED, historyServer.getServiceState());
-    assertEquals(3, historyServer.getServices().size());
+    assertEquals(4, historyServer.getServices().size());
     HistoryClientService historyService = historyServer.getClientService();
     assertNotNull(historyServer.getClientService());
     assertEquals(STATE.INITED, historyService.getServiceState());
@@ -86,15 +83,9 @@ public class TestJobHistoryServer {
     historyServer.stop();
     assertEquals(STATE.STOPPED, historyServer.getServiceState());
     assertNotNull(historyService.getClientHandler().getConnectAddress());
-
-    
-    
   }
 
-
-
   //Test reports of  JobHistoryServer. History server should get log files from  MRApp and read them
-  
   @Test (timeout= 50000 )
   public void testReports() throws Exception {
     Configuration config = new Configuration();
@@ -128,7 +119,6 @@ public class TestJobHistoryServer {
     assertEquals(1, jobs.size());
     assertEquals("job_0_0000",jobs.keySet().iterator().next().toString());
     
-    
     Task task = job.getTasks().values().iterator().next();
     TaskAttempt attempt = task.getAttempts().values().iterator().next();
 
@@ -188,14 +178,14 @@ public class TestJobHistoryServer {
     assertEquals("", diagnosticResponse.getDiagnostics(0));
 
   }
- // test main method
+
+  // test launch method
   @Test (timeout =60000)
-  public void testMainMethod() throws Exception {
+  public void testLaunch() throws Exception {
 
     ExitUtil.disableSystemExit();
     try {
-      JobHistoryServer.main(new String[0]);
-
+      historyServer = JobHistoryServer.launchJobHistoryServer(new String[0]);
     } catch (ExitUtil.ExitException e) {
       assertEquals(0,e.status);
       ExitUtil.resetFirstExitException();

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
   <parent>
     <artifactId>hadoop-mapreduce-client</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.3.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-  <version>2.2.0-SNAPSHOT</version>
+  <version>2.3.0-SNAPSHOT</version>
   <name>hadoop-mapreduce-client-jobclient</name>
 
   <properties>

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Mon Aug 12 21:25:49 2013
@@ -90,7 +90,7 @@ public class NotRunningJob implements MR
     return ApplicationReport.newInstance(unknownAppId, unknownAttemptId,
       "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
       "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
-      YarnConfiguration.DEFAULT_APPLICATION_TYPE);
+      YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
   }
 
   NotRunningJob(ApplicationReport applicationReport, JobState jobState) {

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Mon Aug 12 21:25:49 2013
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.client.api
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -75,36 +76,18 @@ public class ResourceMgrDelegate extends
    * @param conf the configuration object.
    */
   public ResourceMgrDelegate(YarnConfiguration conf) {
-    this(conf, null);
-  }
-
-  /**
-   * Delegate responsible for communicating with the Resource Manager's
-   * {@link ApplicationClientProtocol}.
-   * @param conf the configuration object.
-   * @param rmAddress the address of the Resource Manager
-   */
-  public ResourceMgrDelegate(YarnConfiguration conf,
-      InetSocketAddress rmAddress) {
     super(ResourceMgrDelegate.class.getName());
     this.conf = conf;
-    this.rmAddress = rmAddress;
-    if (rmAddress == null) {
-      client = YarnClient.createYarnClient();
-    } else {
-      client = YarnClient.createYarnClient(rmAddress);
-    }
+    this.client = YarnClient.createYarnClient();
     init(conf);
     start();
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    if (rmAddress == null) {
-      this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+    this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
           YarnConfiguration.DEFAULT_RM_ADDRESS,
           YarnConfiguration.DEFAULT_RM_PORT);
-    }
     client.init(conf);
     super.serviceInit(conf);
   }
@@ -304,6 +287,12 @@ public class ResourceMgrDelegate extends
   }
 
   @Override
+  public Token<AMRMTokenIdentifier> getAMRMToken(ApplicationId appId) 
+    throws YarnException, IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public List<ApplicationReport> getApplications() throws YarnException,
       IOException {
     return client.getApplications();

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java Mon Aug 12 21:25:49 2013
@@ -63,7 +63,9 @@ public class TestSlive {
   /** gets the test write location according to the coding guidelines */
   private static File getWriteLoc() {
     String writeLoc = System.getProperty(TEST_DATA_PROP, "build/test/data/");
-    return new File(writeLoc, "slive");
+    File writeDir = new File(writeLoc, "slive");
+    writeDir.mkdirs();
+    return writeDir;
   }
 
   /** gets where the MR job places its data + output + results */

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestAuditLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestAuditLogger.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestAuditLogger.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestAuditLogger.java Mon Aug 12 21:25:49 2013
@@ -137,11 +137,12 @@ public class TestAuditLogger extends Tes
   /**
    * Test {@link AuditLogger} with IP set.
    */
-  @SuppressWarnings("deprecation")
   public void testAuditLoggerWithIP() throws Exception {
     Configuration conf = new Configuration();
     // start the IPC server
-    Server server = RPC.getServer(new MyTestRPCServer(), "0.0.0.0", 0, conf);
+    Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+            .setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
+            .setPort(0).build();
     server.start();
 
     InetSocketAddress addr = NetUtils.getConnectAddress(server);

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Mon Aug 12 21:25:49 2013
@@ -430,7 +430,7 @@ public class TestClientServiceDelegate {
     return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
       "appname", "host", 124, null, YarnApplicationState.FINISHED,
       "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
-      "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE);
+      "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
   }
 
   private ApplicationReport getRunningApplicationReport(String host, int port) {
@@ -440,7 +440,7 @@ public class TestClientServiceDelegate {
     return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
       "appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics",
       "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
-      YarnConfiguration.DEFAULT_APPLICATION_TYPE);
+      YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
   }
 
   private ResourceMgrDelegate getRMDelegate() throws IOException {

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java Mon Aug 12 21:25:49 2013
@@ -276,18 +276,16 @@ public class TestJobCounters {
     // there are too few spills to combine (2 < 3)
     // Each map spills 2^14 records, so maps spill 49152 records, combined.
 
-    // The reduce spill count is composed of the read from one segment and
-    // the intermediate merge of the other two. The intermediate merge
+    // The combiner has emitted 24576 records to the reducer; these are all
+    // fetched straight to memory from the map side. The intermediate merge
     // adds 8192 records per segment read; again, there are too few spills to
-    // combine, so all 16834 are written to disk (total 32768 spilled records
-    // for the intermediate merge). The merge into the reduce includes only
-    // the unmerged segment, size 8192. Total spilled records in the reduce
-    // is 32768 from the merge + 8192 unmerged segment = 40960 records
+    // combine, so all Total spilled records in the reduce
+    // is 8192 records / map * 3 maps = 24576.
 
-    // Total: map + reduce = 49152 + 40960 = 90112
+    // Total: map + reduce = 49152 + 24576 = 73728
     // 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 61440 output records
-    validateCounters(c1, 90112, 15360, 61440);
+    validateCounters(c1, 73728, 15360, 61440);
     validateFileCounters(c1, inputSize, 0, 0, 0);
     validateOldFileCounters(c1, inputSize, 61928, 0, 0);
   }
@@ -316,12 +314,12 @@ public class TestJobCounters {
     // 1st merge: read + write = 8192 * 4
     // 2nd merge: read + write = 8192 * 4
     // final merge: 0
-    // Total reduce: 65536
+    // Total reduce: 32768
 
-    // Total: map + reduce = 2^16 + 2^16 = 131072
+    // Total: map + reduce = 2^16 + 2^15 = 98304
     // 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 81920 output records
-    validateCounters(c1, 131072, 20480, 81920);
+    validateCounters(c1, 98304, 20480, 81920);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 
@@ -349,7 +347,7 @@ public class TestJobCounters {
     // Total reduce: 45056
     // 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 102400 output records
-    validateCounters(c1, 147456, 25600, 102400);
+    validateCounters(c1, 122880, 25600, 102400);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 
@@ -394,7 +392,7 @@ public class TestJobCounters {
         job, new Path(OUT_DIR, "outputN0"));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
-    validateCounters(c1, 90112, 15360, 61440);
+    validateCounters(c1, 73728, 15360, 61440);
     validateFileCounters(c1, inputSize, 0, 0, 0);    
   }
 
@@ -416,7 +414,7 @@ public class TestJobCounters {
         job, new Path(OUT_DIR, "outputN1"));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
-    validateCounters(c1, 131072, 20480, 81920);
+    validateCounters(c1, 98304, 20480, 81920);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 
@@ -439,7 +437,7 @@ public class TestJobCounters {
         job, new Path(OUT_DIR, "outputN2"));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
-    validateCounters(c1, 147456, 25600, 102400);
+    validateCounters(c1, 122880, 25600, 102400);
     validateFileCounters(c1, inputSize, 0, 0, 0);
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java Mon Aug 12 21:25:49 2013
@@ -63,7 +63,7 @@ public class TestKeyFieldBasedComparator
     conf.setOutputValueClass(LongWritable.class);
 
     conf.setNumMapTasks(1);
-    conf.setNumReduceTasks(2);
+    conf.setNumReduceTasks(1);
 
     conf.setOutputFormat(TextOutputFormat.class);
     conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
@@ -101,9 +101,7 @@ public class TestKeyFieldBasedComparator
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));
       String line = reader.readLine();
       //make sure we get what we expect as the first line, and also
-      //that we have two lines (both the lines must end up in the same
-      //reducer since the partitioner takes the same key spec for all
-      //lines
+      //that we have two lines
       if (expect == 1) {
         assertTrue(line.startsWith(line1));
       } else if (expect == 2) {

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java Mon Aug 12 21:25:49 2013
@@ -31,9 +31,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapred.LocalJobRunner;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -410,6 +410,7 @@ public class TestLocalRunner extends Tes
   }
 
   /** Test case for zero mappers */
+  @Test
   public void testEmptyMaps() throws Exception {
     Job job = Job.getInstance();
     Path outputPath = getOutputPath();
@@ -428,5 +429,145 @@ public class TestLocalRunner extends Tes
     boolean success = job.waitForCompletion(true);
     assertTrue("Empty job should work", success);
   }
+
+  /** @return the directory where numberfiles are written (mapper inputs)  */
+  private Path getNumberDirPath() {
+    return new Path(getInputPath(), "numberfiles");
+  }
+
+  /**
+   * Write out an input file containing an integer.
+   *
+   * @param fileNum the file number to write to.
+   * @param value the value to write to the file
+   * @return the path of the written file.
+   */
+  private Path makeNumberFile(int fileNum, int value) throws IOException {
+    Path workDir = getNumberDirPath();
+    Path filePath = new Path(workDir, "file" + fileNum);
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    OutputStream os = fs.create(filePath);
+    BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+    w.write("" + value);
+    w.close();
+
+    return filePath;
+  }
+
+  /**
+   * Each record received by this mapper is a number 'n'.
+   * Emit the values [0..n-1]
+   */
+  public static class SequenceMapper
+      extends Mapper<LongWritable, Text, Text, NullWritable> {
+
+    public void map(LongWritable k, Text v, Context c)
+        throws IOException, InterruptedException {
+      int max = Integer.valueOf(v.toString());
+      for (int i = 0; i < max; i++) {
+        c.write(new Text("" + i), NullWritable.get());
+      }
+    }
+  }
+
+  private final static int NUMBER_FILE_VAL = 100;
+
+  /**
+   * Tally up the values and ensure that we got as much data
+   * out as we put in.
+   * Each mapper generated 'NUMBER_FILE_VAL' values (0..NUMBER_FILE_VAL-1).
+   * Verify that across all our reducers we got exactly this much
+   * data back.
+   */
+  private void verifyNumberJob(int numMaps) throws Exception {
+    Path outputDir = getOutputPath();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    FileStatus [] stats = fs.listStatus(outputDir);
+    int valueSum = 0;
+    for (FileStatus f : stats) {
+      FSDataInputStream istream = fs.open(f.getPath());
+      BufferedReader r = new BufferedReader(new InputStreamReader(istream));
+      String line = null;
+      while ((line = r.readLine()) != null) {
+        valueSum += Integer.valueOf(line.trim());
+      }
+      r.close();
+    }
+
+    int maxVal = NUMBER_FILE_VAL - 1;
+    int expectedPerMapper = maxVal * (maxVal + 1) / 2;
+    int expectedSum = expectedPerMapper * numMaps;
+    LOG.info("expected sum: " + expectedSum + ", got " + valueSum);
+    assertEquals("Didn't get all our results back", expectedSum, valueSum);
+  }
+
+  /**
+   * Run a test which creates a SequenceMapper / IdentityReducer
+   * job over a set of generated number files.
+   */
+  private void doMultiReducerTest(int numMaps, int numReduces,
+      int parallelMaps, int parallelReduces) throws Exception {
+
+    Path in = getNumberDirPath();
+    Path out = getOutputPath();
+
+    // Clear data from any previous tests.
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(out)) {
+      fs.delete(out, true);
+    }
+
+    if (fs.exists(in)) {
+      fs.delete(in, true);
+    }
+
+    for (int i = 0; i < numMaps; i++) {
+      makeNumberFile(i, 100);
+    }
+
+    Job job = Job.getInstance();
+    job.setNumReduceTasks(numReduces);
+
+    job.setMapperClass(SequenceMapper.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(NullWritable.class);
+    FileInputFormat.addInputPath(job, in);
+    FileOutputFormat.setOutputPath(job, out);
+
+    LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps);
+    LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces);
+
+    boolean result = job.waitForCompletion(true);
+    assertTrue("Job failed!!", result);
+
+    verifyNumberJob(numMaps);
+  }
+  
+  @Test
+  public void testOneMapMultiReduce() throws Exception {
+    doMultiReducerTest(1, 2, 1, 1);
+  }
+
+  @Test
+  public void testOneMapMultiParallelReduce() throws Exception {
+    doMultiReducerTest(1, 2, 1, 2);
+  }
+
+  @Test
+  public void testMultiMapOneReduce() throws Exception {
+    doMultiReducerTest(4, 1, 2, 1);
+  }
+
+  @Test
+  public void testMultiMapMultiReduce() throws Exception {
+    doMultiReducerTest(4, 4, 2, 2);
+  }
+
 }
 

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Mon Aug 12 21:25:49 2013
@@ -20,23 +20,31 @@ package org.apache.hadoop.mapreduce.lib.
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.Map;
 import java.util.Set;
-import java.util.zip.GZIPOutputStream;
+import java.util.TreeMap;
 import java.util.concurrent.TimeoutException;
+import java.util.zip.GZIPOutputStream;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -715,6 +723,69 @@ public class TestCombineFileInputFormat 
     out.close();
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
+
+  public void testNodeDistribution() throws IOException, InterruptedException {
+    DummyInputFormat inFormat = new DummyInputFormat();
+    int numBlocks = 60;
+    long totLength = 0;
+    long blockSize = 100;
+    int numNodes = 10;
+
+    long minSizeNode = 50;
+    long minSizeRack = 50;
+    int maxSplitSize = 200; // 4 blocks per split.
+
+    String[] locations = new String[numNodes];
+    for (int i = 0; i < numNodes; i++) {
+      locations[i] = "h" + i;
+    }
+    String[] racks = new String[0];
+    Path path = new Path("hdfs://file");
+
+    OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
+
+    int hostCountBase = 0;
+    // Generate block list. Replication 3 per block.
+    for (int i = 0; i < numBlocks; i++) {
+      int localHostCount = hostCountBase;
+      String[] blockHosts = new String[3];
+      for (int j = 0; j < 3; j++) {
+        int hostNum = localHostCount % numNodes;
+        blockHosts[j] = "h" + hostNum;
+        localHostCount++;
+      }
+      hostCountBase++;
+      blocks[i] = new OneBlockInfo(path, i * blockSize, blockSize, blockHosts,
+          racks);
+      totLength += blockSize;
+    }
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    HashMap<String, Set<String>> rackToNodes = new HashMap<String, Set<String>>();
+    HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>();
+    HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>();
+    Map<String, Set<OneBlockInfo>> nodeToBlocks = new TreeMap<String, Set<OneBlockInfo>>();
+
+    OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
+        nodeToBlocks, rackToNodes);
+    
+    inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
+        maxSplitSize, minSizeNode, minSizeRack, splits);
+
+    int expectedSplitCount = (int) (totLength / maxSplitSize);
+    Assert.assertEquals(expectedSplitCount, splits.size());
+
+    // Ensure 90+% of the splits have node local blocks.
+    // 100% locality may not always be achieved.
+    int numLocalSplits = 0;
+    for (InputSplit inputSplit : splits) {
+      Assert.assertEquals(maxSplitSize, inputSplit.getLength());
+      if (inputSplit.getLocations().length == 1) {
+        numLocalSplits++;
+      }
+    }
+    Assert.assertTrue(numLocalSplits >= 0.9 * splits.size());
+  }
   
   public void testNodeInputSplit() throws IOException, InterruptedException {
     // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on 
@@ -744,8 +815,8 @@ public class TestCombineFileInputFormat 
                               new HashMap<String, List<OneBlockInfo>>();
     HashMap<OneBlockInfo, String[]> blockToNodes = 
                               new HashMap<OneBlockInfo, String[]>();
-    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
-                              new HashMap<String, List<OneBlockInfo>>();
+    HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 
+                              new HashMap<String, Set<OneBlockInfo>>();
     
     OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
                              nodeToBlocks, rackToNodes);

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java Mon Aug 12 21:25:49 2013
@@ -56,7 +56,7 @@ public class TestMRKeyFieldBasedComparat
     conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
     conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
 
-    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 2,
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
                 line1 +"\n" + line2 + "\n"); 
     job.setMapperClass(InverseMapper.class);
     job.setReducerClass(Reducer.class);

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java Mon Aug 12 21:25:49 2013
@@ -91,8 +91,10 @@ public class TestUmbilicalProtocolWithJo
       .when(mockTT).getProtocolSignature(anyString(), anyLong(), anyInt());
 
     JobTokenSecretManager sm = new JobTokenSecretManager();
-    final Server server = RPC.getServer(TaskUmbilicalProtocol.class, mockTT,
-        ADDRESS, 0, 5, true, conf, sm);
+    final Server server = new RPC.Builder(conf)
+        .setProtocol(TaskUmbilicalProtocol.class).setInstance(mockTT)
+        .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+        .setSecretManager(sm).build();
 
     server.start();
 

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Mon Aug 12 21:25:49 2013
@@ -151,11 +151,14 @@ public class MiniMRYarnCluster extends M
         if (!getConfig().getBoolean(
             JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS,
             JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) {
+          String hostname = MiniYARNCluster.getHostname();
           // pick free random ports.
           getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
-              MiniYARNCluster.getHostname() + ":0");
+            hostname + ":0");
           getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
-              MiniYARNCluster.getHostname() + ":0");
+            hostname + ":0");
+          getConfig().set(JHAdminConfig.JHS_ADMIN_ADDRESS,
+            hostname + ":0");
         }
         historyServer = new JobHistoryServer();
         historyServer.init(getConfig());

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Mon Aug 12 21:25:49 2013
@@ -422,16 +422,20 @@ public class TestMRJobs {
     @Override
     public void setup(Context context) throws IOException {
       Configuration conf = context.getConfiguration();
-      Path[] files = context.getLocalCacheFiles();
-      Path[] archives = context.getLocalCacheArchives();
+      Path[] localFiles = context.getLocalCacheFiles();
+      URI[] files = context.getCacheFiles();
+      Path[] localArchives = context.getLocalCacheArchives();
+      URI[] archives = context.getCacheArchives();
 
       // Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files 
       // and 2 archives are present
+      Assert.assertEquals(4, localFiles.length);
       Assert.assertEquals(4, files.length);
+      Assert.assertEquals(2, localArchives.length);
       Assert.assertEquals(2, archives.length);
 
       // Check lengths of the files
-      Map<String, Path> filesMap = pathsToMap(files);
+      Map<String, Path> filesMap = pathsToMap(localFiles);
       Assert.assertTrue(filesMap.containsKey("distributed.first.symlink"));
       Assert.assertEquals(1, localFs.getFileStatus(
         filesMap.get("distributed.first.symlink")).getLen());
@@ -440,7 +444,7 @@ public class TestMRJobs {
         filesMap.get("distributed.second.jar")).getLen() > 1);
 
       // Check extraction of the archive
-      Map<String, Path> archivesMap = pathsToMap(archives);
+      Map<String, Path> archivesMap = pathsToMap(localArchives);
       Assert.assertTrue(archivesMap.containsKey("distributed.third.jar"));
       Assert.assertTrue(localFs.exists(new Path(
         archivesMap.get("distributed.third.jar"), "distributed.jar.inside3")));

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestNonExistentJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestNonExistentJob.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestNonExistentJob.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestNonExistentJob.java Mon Aug 12 21:25:49 2013
@@ -22,22 +22,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 
+import java.io.IOException;
 import java.net.InetAddress;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.security.PrivilegedExceptionAction;
 
 public class TestNonExistentJob extends TestCase {
 
@@ -96,8 +89,13 @@ public class TestNonExistentJob extends 
   }
 
   public void testGetInvalidJob() throws Exception {
-    RunningJob runJob = new JobClient(getJobConf()).getJob(JobID.forName("job_0_0"));
-    assertNull(runJob);
+    try {
+      RunningJob runJob = new JobClient(getJobConf()).getJob(JobID.forName("job_0_0"));
+      fail("Exception is expected to thrown ahead!");
+    } catch (Exception e) {
+      assertTrue(e instanceof IOException);
+      assertTrue(e.getMessage().contains("ApplicationNotFoundException"));
+    }
   }
 
 }