You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2012/04/11 07:47:51 UTC

svn commit: r1324567 [1/2] - in /hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-clie...

Author: todd
Date: Wed Apr 11 05:47:40 2012
New Revision: 1324567

URL: http://svn.apache.org/viewvc?rev=1324567&view=rev
Log:
Merge trunk into auto-failover branch.

Needs a few tweaks to fix compilation - will do in followup commit. This is just a straight merge

Added:
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java
      - copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
      - copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
      - copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
      - copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/util/Hello.java
      - copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/util/Hello.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolverScriptBasedMapping.java
      - copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolverScriptBasedMapping.java
Removed:
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/testjar/Hello.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTTResourceReporting.java
Modified:
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/util/TestRunJar.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-yarn/bin/yarn
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RackResolver.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/c++/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/block_forensics/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/build-contrib.xml   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/build.xml   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/data_join/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/eclipse-plugin/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/index/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/vaidya/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/examples/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/java/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/test/mapred/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/   (props changed)
    hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/webapps/job/   (props changed)

Propchange: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1310902-1324566

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt Wed Apr 11 05:47:40 2012
@@ -218,6 +218,21 @@ Release 2.0.0 - UNRELEASED
     org.apache.hadoop.mapred.TestMiniMRWithDFSWithDistinctUsers (Devaraj K via
     bobby)
 
+    MAPREDUCE-4105. Yarn RackResolver ignores rack configurations. 
+    (Ahmed Radwan via tomwhite)
+
+    MAPREDUCE-3869. Fix classpath for DistributedShell application. (Devaraj K
+    via sseth)
+
+    MAPREDUCE-4057. Update RAID for the HA and fsdataset changes.  (Devaraj K
+    via szetszwo)
+
+    MAPREDUCE-4076. Stream job fails with ZipException when use yarn jar
+    command (Devaraj K via bobby)
+ 
+    MAPREDUCE-4108. Fix tests in org.apache.hadoop.util.TestRunJar
+    (Devaraj K via tgraves)
+
 Release 0.23.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -229,6 +244,9 @@ Release 0.23.3 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-4059. The history server should have a separate pluggable 
+    storage/query interface. (Robert Evans via tgraves)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -280,6 +298,18 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4051. Remove the empty hadoop-mapreduce-project/assembly/all.xml
     file (Ravi Prakash via bobby)
 
+    MAPREDUCE-4117. mapred job -status throws NullPointerException (Devaraj K
+    via bobby)
+
+    MAPREDUCE-4099. ApplicationMaster may fail to remove staging directory
+    (Jason Lowe via bobby)
+
+    MAPREDUCE-4017. Add jobname to jobsummary log (tgraves and Koji Noguchi
+    via bobby)
+
+    MAPREDUCE-4040. History links should use hostname rather than IP address.
+    (Bhallamudi Venkata Siva Kamesh via sseth)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1310902-1324566

Propchange: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1310902-1324566

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Wed Apr 11 05:47:40 2012
@@ -551,6 +551,7 @@ public class JobHistoryEventHandler exte
       summary.setUser(jse.getUserName());
       summary.setQueue(jse.getJobQueueName());
       summary.setJobSubmitTime(jse.getSubmitTime());
+      summary.setJobName(jse.getJobName());
       break;
     case NORMALIZED_RESOURCE:
       NormalizedResourceEvent normalizedResourceEvent = 

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java Wed Apr 11 05:47:40 2012
@@ -45,6 +45,7 @@ public class JobSummary {
   private long mapSlotSeconds; // TODO Not generated yet in MRV2
   private long reduceSlotSeconds; // TODO Not generated yet MRV2
   // private int clusterSlotCapacity;
+  private String jobName;
 
   JobSummary() {
   }
@@ -185,6 +186,14 @@ public class JobSummary {
     this.reduceSlotSeconds = reduceSlotSeconds;
   }
 
+  public String getJobName() {
+    return jobName;
+  }
+
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
   public String getJobSummaryString() {
     SummaryBuilder summary = new SummaryBuilder()
       .add("jobId", jobId)
@@ -201,7 +210,8 @@ public class JobSummary {
       .add("queue", queue)
       .add("status", jobStatus)
       .add("mapSlotSeconds", mapSlotSeconds)
-      .add("reduceSlotSeconds", reduceSlotSeconds);
+      .add("reduceSlotSeconds", reduceSlotSeconds)
+      .add("jobName", jobName);
     return summary.toString();
   }
 

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Apr 11 05:47:40 2012
@@ -405,6 +405,14 @@ public class MRAppMaster extends Composi
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
+
+      // Cleanup staging directory
+      try {
+        cleanupStagingDir();
+      } catch(IOException io) {
+        LOG.warn("Failed to delete staging dir", io);
+      }
+
       try {
         // Stop all services
         // This will also send the final report to the ResourceManager
@@ -415,13 +423,6 @@ public class MRAppMaster extends Composi
         LOG.warn("Graceful stop failed ", t);
       }
 
-      // Cleanup staging directory
-      try {
-        cleanupStagingDir();
-      } catch(IOException io) {
-        LOG.warn("Failed to delete staging dir");
-      }
-
       //Bring the process down by force.
       //Not needed after HADOOP-7140
       LOG.info("Exiting MR AppMaster..GoodBye!");

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Wed Apr 11 05:47:40 2012
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.spy;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import junit.framework.Assert;
@@ -35,11 +36,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
 import org.junit.Test;
 
 /**
@@ -233,6 +237,71 @@ public class TestMRApp {
     }
   }
 
+  private final class MRAppTestCleanup extends MRApp {
+    boolean hasStopped;
+    boolean cleanedBeforeStopped;
+
+    public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+      hasStopped = false;
+      cleanedBeforeStopped = false;
+    }
+
+    @Override
+    protected Job createJob(Configuration conf) {
+      UserGroupInformation currentUser = null;
+      try {
+        currentUser = UserGroupInformation.getCurrentUser();
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+      Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
+          getDispatcher().getEventHandler(),
+          getTaskAttemptListener(), getContext().getClock(),
+          getCommitter(), isNewApiCommitter(),
+          currentUser.getUserName(), getContext());
+      ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+      getDispatcher().register(JobFinishEvent.Type.class,
+          createJobFinishEventHandler());
+
+      return newJob;
+    }
+
+    @Override
+    public void cleanupStagingDir() throws IOException {
+      cleanedBeforeStopped = !hasStopped;
+    }
+
+    @Override
+    public synchronized void stop() {
+      hasStopped = true;
+      super.stop();
+    }
+
+    @Override
+    protected void sysexit() {
+    }
+  }
+
+  @Test
+  public void testStagingCleanupOrder() throws Exception {
+    MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
+        this.getClass().getName(), true);
+    JobImpl job = (JobImpl)app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    int waitTime = 20 * 1000;
+    while (waitTime > 0 && !app.cleanedBeforeStopped) {
+      Thread.sleep(100);
+      waitTime -= 100;
+    }
+    Assert.assertTrue("Staging directory not cleaned before notifying RM",
+        app.cleanedBeforeStopped);
+  }
+
   public static void main(String[] args) throws Exception {
     TestMRApp t = new TestMRApp();
     t.testMapReduce();
@@ -241,5 +310,6 @@ public class TestMRApp {
     t.testCompletedMapsForReduceSlowstart();
     t.testJobError();
     t.testCountersOnJobFinish();
+    t.testStagingCleanupOrder();
   }
 }

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Wed Apr 11 05:47:40 2012
@@ -44,6 +44,9 @@ public class JHAdminConfig {
   /** Run the History Cleaner every X ms.*/
   public static final String MR_HISTORY_CLEANER_INTERVAL_MS = 
     MR_HISTORY_PREFIX + "cleaner.interval-ms";
+  public static final long DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS = 
+    1 * 24 * 60 * 60 * 1000l; //1 day
+  
   
   /** The number of threads to handle client API requests.*/
   public static final String MR_HISTORY_CLIENT_THREAD_COUNT = 
@@ -56,7 +59,9 @@ public class JHAdminConfig {
    */
   public static final String MR_HISTORY_DATESTRING_CACHE_SIZE = 
     MR_HISTORY_PREFIX + "datestring.cache.size";
+  public static final int DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE = 200000;
   
+  //TODO REMOVE debug-mode
   /** Equivalent to 0.20 mapreduce.jobhistory.debug.mode */
   public static final String MR_HISTORY_DEBUG_MODE = 
     MR_HISTORY_PREFIX + "debug-mode";
@@ -75,6 +80,7 @@ public class JHAdminConfig {
   /** Size of the job list cache.*/
   public static final String MR_HISTORY_JOBLIST_CACHE_SIZE =
     MR_HISTORY_PREFIX + "joblist.cache.size";
+  public static final int DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE = 20000;
   
   /** The location of the Kerberos keytab file.*/
   public static final String MR_HISTORY_KEYTAB = MR_HISTORY_PREFIX + "keytab";
@@ -82,6 +88,7 @@ public class JHAdminConfig {
   /** Size of the loaded job cache.*/
   public static final String MR_HISTORY_LOADED_JOB_CACHE_SIZE = 
     MR_HISTORY_PREFIX + "loadedjobs.cache.size";
+  public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5;
   
   /**
    * The maximum age of a job history file before it is deleted from the history
@@ -89,6 +96,8 @@ public class JHAdminConfig {
    */
   public static final String MR_HISTORY_MAX_AGE_MS =
     MR_HISTORY_PREFIX + "max-age-ms";
+  public static final long DEFAULT_MR_HISTORY_MAX_AGE = 
+    7 * 24 * 60 * 60 * 1000L; //1 week
   
   /**
    * Scan for history files to more from intermediate done dir to done dir
@@ -96,10 +105,13 @@ public class JHAdminConfig {
    */
   public static final String MR_HISTORY_MOVE_INTERVAL_MS = 
     MR_HISTORY_PREFIX + "move.interval-ms";
+  public static final long DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS = 
+    3 * 60 * 1000l; //3 minutes
   
   /** The number of threads used to move files.*/
   public static final String MR_HISTORY_MOVE_THREAD_COUNT = 
     MR_HISTORY_PREFIX + "move.thread-count";
+  public static final int DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT = 3;
   
   /** The Kerberos principal for the history server.*/
   public static final String MR_HISTORY_PRINCIPAL = 
@@ -116,4 +128,10 @@ public class JHAdminConfig {
    */
   public static final String MR_HS_SECURITY_SERVICE_AUTHORIZATION =
       "security.mrhs.client.protocol.acl";
+  
+  /**
+   * The HistoryStorage class to use to cache history data.
+   */
+  public static final String MR_HISTORY_STORAGE =
+    MR_HISTORY_PREFIX + ".store.class";
 }

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Wed Apr 11 05:47:40 2012
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.Atomi
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
@@ -50,6 +52,8 @@ import org.apache.hadoop.yarn.api.record
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
 public class JobHistoryUtils {
   
   /**
@@ -503,7 +507,7 @@ public class JobHistoryUtils {
     StringBuffer sb = new StringBuffer();
     if (address.getAddress().isAnyLocalAddress() || 
         address.getAddress().isLoopbackAddress()) {
-      sb.append(InetAddress.getLocalHost().getHostAddress());
+      sb.append(InetAddress.getLocalHost().getCanonicalHostName());
     } else {
       sb.append(address.getHostName());
     }

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Wed Apr 11 05:47:40 2012
@@ -509,6 +509,11 @@ public class Job extends JobContextImpl 
         lastEvent = event;
       }
     }
+    if (lastEvent == null) {
+      return "There are no failed tasks for the job. "
+          + "Job is failed due to some other reason and reason "
+          + "can be found in the logs.";
+    }
     String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
     String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
     return (" task " + taskID + " failed " +

Propchange: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1310902-1324566

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java Wed Apr 11 05:47:40 2012
@@ -24,8 +24,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 
 public interface HistoryContext extends AppContext {
 
   Map<JobId, Job> getAllJobs(ApplicationId appID);
+
+  JobsInfo getPartialJobs(Long offset, Long count, String user,
+      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState);
 }

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Wed Apr 11 05:47:40 2012
@@ -1,36 +1,26 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -41,26 +31,16 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
 import org.apache.hadoop.yarn.YarnException;
@@ -69,106 +49,36 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-
-/*
+/**
  * Loads and manages the Job history cache.
  */
-public class JobHistory extends AbstractService implements HistoryContext   {
-
-  private static final int DEFAULT_JOBLIST_CACHE_SIZE = 20000;
-  private static final int DEFAULT_LOADEDJOB_CACHE_SIZE = 5;
-  private static final int DEFAULT_DATESTRING_CACHE_SIZE = 200000;
-  private static final long DEFAULT_MOVE_THREAD_INTERVAL = 3 * 60 * 1000l; //3 minutes
-  private static final int DEFAULT_MOVE_THREAD_COUNT = 3;
-  
-  static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //1 week
-  static final long DEFAULT_RUN_INTERVAL = 1 * 24 * 60 * 60 * 1000l; //1 day
-  
+public class JobHistory extends AbstractService implements HistoryContext {
   private static final Log LOG = LogFactory.getLog(JobHistory.class);
 
-  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
-  public static final Pattern CONF_FILENAME_REGEX =
-    Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
+  public static final Pattern CONF_FILENAME_REGEX = Pattern.compile("("
+      + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
   public static final String OLD_SUFFIX = ".old";
 
-  private static String DONE_BEFORE_SERIAL_TAIL = 
-    JobHistoryUtils.doneSubdirsBeforeSerialTail();
-  
-  /**
-   * Maps between a serial number (generated based on jobId) and the timestamp
-   * component(s) to which it belongs.
-   * Facilitates jobId based searches.
-   * If a jobId is not found in this list - it will not be found.
-   */
-  private final SortedMap<String, Set<String>> idToDateString = 
-    new ConcurrentSkipListMap<String, Set<String>>();
-
-  //Maintains minimal details for recent jobs (parsed from history file name).
-  //Sorted on Job Completion Time.
-  private final SortedMap<JobId, MetaInfo> jobListCache = 
-    new ConcurrentSkipListMap<JobId, MetaInfo>();
-  
-  
-  // Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
-  // Check for existance of the object when using iterators.
-  private final SortedMap<JobId, MetaInfo> intermediateListCache = 
-    new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>();
-  
-  //Maintains a list of known done subdirectories. Not currently used.
-  private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
-
-  private Map<JobId, Job> loadedJobCache = null;
-
-  /**
-   * Maintains a mapping between intermediate user directories and the last 
-   * known modification time.
-   */
-  private Map<String, Long> userDirModificationTimeMap = 
-    new HashMap<String, Long>();
-  
-  //The number of jobs to maintain in the job list cache.
-  private int jobListCacheSize;
-  
-  private JobACLsManager aclsMgr;
-  
-  //The number of loaded jobs.
-  private int loadedJobCacheSize;
-  
-  //The number of entries in idToDateString
-  private int dateStringCacheSize;
-
-  //Time interval for the move thread.
+  // Time interval for the move thread.
   private long moveThreadInterval;
-  
-  //Number of move threads.
+
+  // Number of move threads.
   private int numMoveThreads;
-  
-  private Configuration conf;
 
-  private boolean debugMode;
-  private int serialNumberLowDigits;
-  private String serialNumberFormat;
-  
-
-  private Path doneDirPrefixPath = null; // folder for completed jobs
-  private FileContext doneDirFc; // done Dir FileContext
-  
-  private Path intermediateDoneDirPath = null; //Intermediate Done Dir Path
-  private FileContext intermediateDoneDirFc; //Intermediate Done Dir FileContext
+  private Configuration conf;
 
   private Thread moveIntermediateToDoneThread = null;
   private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
+
   private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
-  
-  /**
-   * Writes out files to the path
-   * .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist
-   */
 
-  @SuppressWarnings("serial")
+  private HistoryStorage storage = null;
+  private HistoryFileManager hsManager = null;
+
   @Override
   public void init(Configuration conf) throws YarnException {
     LOG.info("JobHistory Init");
@@ -176,121 +86,66 @@ public class JobHistory extends Abstract
     this.appID = RecordFactoryProvider.getRecordFactory(conf)
         .newRecordInstance(ApplicationId.class);
     this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf)
-    .newRecordInstance(ApplicationAttemptId.class);
-
-    debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
-    serialNumberLowDigits = debugMode ? 1 : 3;
-    serialNumberFormat = ("%0"
-        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS 
-            + serialNumberLowDigits) + "d");
+        .newRecordInstance(ApplicationAttemptId.class);
 
-    String doneDirPrefix = null;
-    doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
-    try {
-      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
-          new Path(doneDirPrefix));
-      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
-      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
-          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
-    } catch (IOException e) {
-      throw new YarnException("Error creating done directory: [" +
-          doneDirPrefixPath + "]", e);
-    }
-
-    String intermediateDoneDirPrefix = null;
-    intermediateDoneDirPrefix = JobHistoryUtils
-        .getConfiguredHistoryIntermediateDoneDirPrefix(conf);
-    try {
-      intermediateDoneDirPath = FileContext.getFileContext(conf)
-          .makeQualified(new Path(intermediateDoneDirPrefix));
-      intermediateDoneDirFc = FileContext.getFileContext(
-          intermediateDoneDirPath.toUri(), conf);
-      mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
-          JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
-    } catch (IOException e) {
-      LOG.info("error creating done directory on dfs " + e);
-      throw new YarnException("Error creating intermediate done directory: [" 
-          + intermediateDoneDirPath + "]", e);
-    }
-    
-    this.aclsMgr = new JobACLsManager(conf);
-    
-    jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
-        DEFAULT_JOBLIST_CACHE_SIZE);
-    loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
-        DEFAULT_LOADEDJOB_CACHE_SIZE);
-    dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
-        DEFAULT_DATESTRING_CACHE_SIZE);
-    moveThreadInterval =
-        conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
-            DEFAULT_MOVE_THREAD_INTERVAL);
+    moveThreadInterval = conf.getLong(
+        JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
+        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
     numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
-        DEFAULT_MOVE_THREAD_COUNT);
-    
-    loadedJobCache =
-        Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
-            loadedJobCacheSize + 1, 0.75f, true) {
-          @Override
-          public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
-            return super.size() > loadedJobCacheSize;
-          }
-        });
-    
+        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
+
+    hsManager = new HistoryFileManager();
+    hsManager.init(conf);
     try {
-      initExisting();
+      hsManager.initExisting();
     } catch (IOException e) {
       throw new YarnException("Failed to intialize existing directories", e);
     }
-    super.init(conf);
-  }
-  
-  private void mkdir(FileContext fc, Path path, FsPermission fsp)
-      throws IOException {
-    if (!fc.util().exists(path)) {
-      try {
-        fc.mkdir(path, fsp, true);
 
-        FileStatus fsStatus = fc.getFileStatus(path);
-        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
-            + ", Expected: " + fsp.toShort());
-        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
-          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
-              + ", " + fsp);
-          fc.setPermission(path, fsp);
-        }
-      } catch (FileAlreadyExistsException e) {
-        LOG.info("Directory: [" + path + "] already exists.");
-      }
+    storage = ReflectionUtils.newInstance(conf.getClass(
+        JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
+        HistoryStorage.class), conf);
+    if (storage instanceof Service) {
+      ((Service) storage).init(conf);
     }
+    storage.setHistoryFileManager(hsManager);
+
+    super.init(conf);
   }
 
   @Override
   public void start() {
-    //Start moveIntermediatToDoneThread
-    moveIntermediateToDoneRunnable = 
-      new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
+    hsManager.start();
+    if (storage instanceof Service) {
+      ((Service) storage).start();
+    }
+
+    // Start moveIntermediatToDoneThread
+    moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(
+        moveThreadInterval, numMoveThreads);
     moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
     moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
     moveIntermediateToDoneThread.start();
-    
-    //Start historyCleaner
+
+    // Start historyCleaner
     boolean startCleanerService = conf.getBoolean(
         JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
     if (startCleanerService) {
       long maxAgeOfHistoryFiles = conf.getLong(
-          JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE);
+          JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
+          JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
       cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1,
-          new ThreadFactoryBuilder().setNameFormat("LogCleaner").build()
-      );
+          new ThreadFactoryBuilder().setNameFormat("LogCleaner").build());
       long runInterval = conf.getLong(
-          JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL);
+          JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
+          JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
       cleanerScheduledExecutor
           .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
               30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
     }
     super.start();
   }
-  
+
   @Override
   public void stop() {
     LOG.info("Stopping JobHistory");
@@ -323,281 +178,16 @@ public class JobHistory extends Abstract
         LOG.warn("HistoryCleanerService shutdown may not have succeeded");
       }
     }
+    if (storage instanceof Service) {
+      ((Service) storage).stop();
+    }
+    hsManager.stop();
     super.stop();
   }
-  
+
   public JobHistory() {
     super(JobHistory.class.getName());
   }
-  
-  /**
-   * Populates index data structures.
-   * Should only be called at initialization times.
-   */
-  @SuppressWarnings("unchecked")
-  private void initExisting() throws IOException {
-    LOG.info("Initializing Existing Jobs...");
-    List<FileStatus> timestampedDirList = findTimestampedDirectories();
-    Collections.sort(timestampedDirList);
-    for (FileStatus fs : timestampedDirList) {
-      //TODO Could verify the correct format for these directories.
-      addDirectoryToSerialNumberIndex(fs.getPath());
-      addDirectoryToJobListCache(fs.getPath());
-    }
-  }
-  
-  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
-    String serialPart = serialDirPath.getName();
-    String timeStampPart = 
-      JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
-    if (timeStampPart == null) {
-      LOG.warn("Could not find timestamp portion from path: " + 
-          serialDirPath.toString() +". Continuing with next");
-      return;
-    }
-    if (serialPart == null) {
-      LOG.warn("Could not find serial portion from path: " + 
-          serialDirPath.toString() + ". Continuing with next");
-      return;
-    }
-    if (idToDateString.containsKey(serialPart)) {
-      Set<String> set = idToDateString.get(serialPart);
-      set.remove(timeStampPart);
-      if (set.isEmpty()) {
-        idToDateString.remove(serialPart);
-      }
-    }
-
-  }
-  
-  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+serialDirPath+" to serial index");
-    }
-    String serialPart = serialDirPath.getName();
-    String timestampPart = 
-      JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
-    if (timestampPart == null) {
-      LOG.warn("Could not find timestamp portion from path: " + 
-          serialDirPath.toString() +". Continuing with next");
-      return;
-    }
-    if (serialPart == null) {
-      LOG.warn("Could not find serial portion from path: " + 
-          serialDirPath.toString() + ". Continuing with next");
-    }
-    addToSerialNumberIndex(serialPart, timestampPart);
-  }
-
-  private void addToSerialNumberIndex(String serialPart, String timestampPart) {
-      if (!idToDateString.containsKey(serialPart)) {
-        idToDateString.put(serialPart, new HashSet<String>());
-        if (idToDateString.size() > dateStringCacheSize) {
-          idToDateString.remove(idToDateString.firstKey());
-        }
-      Set<String> datePartSet = idToDateString.get(serialPart);
-      datePartSet.add(timestampPart);
-    }
-  }
-  
-  private void addDirectoryToJobListCache(Path path) throws IOException {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+path+" to job list cache.");
-    }
-    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
-        doneDirFc);
-    for (FileStatus fs : historyFileList) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Adding in history for "+fs.getPath());
-      }
-      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
-          .getName());
-      String confFileName = JobHistoryUtils
-          .getIntermediateConfFileName(jobIndexInfo.getJobId());
-      String summaryFileName = JobHistoryUtils
-          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), new Path(fs.getPath().getParent(),
-          summaryFileName), jobIndexInfo);
-      addToJobListCache(jobIndexInfo.getJobId(), metaInfo);
-    }
-  }
-  
-  private static List<FileStatus> scanDirectory(Path path, FileContext fc,
-      PathFilter pathFilter) throws IOException {
-    path = fc.makeQualified(path);
-    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
-      RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
-      while (fileStatusIter.hasNext()) {
-        FileStatus fileStatus = fileStatusIter.next();
-        Path filePath = fileStatus.getPath();
-        if (fileStatus.isFile() && pathFilter.accept(filePath)) {
-          jhStatusList.add(fileStatus);
-        }
-      }    
-    return jhStatusList;
-  }
-  
-  private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
-      FileContext fc) throws IOException {
-    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
-  }
-  
-  /**
-   * Finds all history directories with a timestamp component by scanning 
-   * the filesystem.
-   * Used when the JobHistory server is started.
-   * @return
-   */
-  private List<FileStatus> findTimestampedDirectories() throws IOException {
-    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 
-        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
-    return fsList;
-  }
-    
-  /**
-   * Adds an entry to the job list cache. Maintains the size.
-   */
-  private void addToJobListCache(JobId jobId, MetaInfo metaInfo) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+jobId+" to job list cache with "
-          +metaInfo.getJobIndexInfo());
-    }
-    jobListCache.put(jobId, metaInfo);
-    if (jobListCache.size() > jobListCacheSize) {
-      jobListCache.remove(jobListCache.firstKey());
-    }
-  }
-
-  /**
-   * Adds an entry to the loaded job cache. Maintains the size.
-   */
-  private void addToLoadedJobCache(Job job) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+job.getID()+" to loaded job cache");
-    }
-    loadedJobCache.put(job.getID(), job);
-  }
-  
-  
-  /**
-   * Scans the intermediate directory to find user directories. Scans these
-   * for history files if the modification time for the directory has changed.
-   * @throws IOException
-   */
-  private void scanIntermediateDirectory() throws IOException {
-    List<FileStatus> userDirList = 
-      JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
-    
-    for (FileStatus userDir : userDirList) {
-      String name = userDir.getPath().getName();
-      long newModificationTime = userDir.getModificationTime();
-      boolean shouldScan = false;
-      synchronized (userDirModificationTimeMap) {
-        if (!userDirModificationTimeMap.containsKey(name) || newModificationTime
-            > userDirModificationTimeMap.get(name)) {
-            shouldScan = true;
-            userDirModificationTimeMap.put(name, newModificationTime);
-        }  
-        }  
-      if (shouldScan) {
-        scanIntermediateDirectory(userDir.getPath());
-      }
-    }
-  }
-
-  /**
-   * Scans the specified path and populates the intermediate cache.
-   * @param absPath
-   * @throws IOException
-   */
-  private void scanIntermediateDirectory(final Path absPath)
-      throws IOException {
-    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
-        intermediateDoneDirFc);
-    for (FileStatus fs : fileStatusList) {
-      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
-          .getName());
-      String confFileName = JobHistoryUtils
-          .getIntermediateConfFileName(jobIndexInfo.getJobId());
-      String summaryFileName = JobHistoryUtils
-          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), new Path(fs.getPath().getParent(),
-          summaryFileName), jobIndexInfo);
-      if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
-        intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
-      }
-    }
-  }
-  
-  /**
-   * Searches the job history file FileStatus list for the specified JobId.
-   * 
-   * @param fileStatusList fileStatus list of Job History Files.
-   * @param jobId The JobId to find.
-   * @param checkForDoneFile whether to check for the existance of a done file.
-   * @return A MetaInfo object for the jobId, null if not found.
-   * @throws IOException
-   */
-  private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId) 
-  throws IOException {
-    for (FileStatus fs : fileStatusList) {
-      JobIndexInfo jobIndexInfo = 
-        FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
-      if (jobIndexInfo.getJobId().equals(jobId)) {
-        String confFileName = JobHistoryUtils
-            .getIntermediateConfFileName(jobIndexInfo.getJobId());
-        String summaryFileName = JobHistoryUtils
-            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-        MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-            .getParent(), confFileName), new Path(fs.getPath().getParent(),
-            summaryFileName), jobIndexInfo);
-        return metaInfo;
-      }
-    }
-    return null;
-  }
-  
-  /**
-   * Scans old directories known by the idToDateString map for the specified 
-   * jobId.
-   * If the number of directories is higher than the supported size of the
-   * idToDateString cache, the jobId will not be found.
-   * @param jobId the jobId.
-   * @return
-   * @throws IOException
-   */
-  private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
-    int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
-    String boxedSerialNumber = String.valueOf(jobSerialNumber);
-    Set<String> dateStringSet = idToDateString.get(boxedSerialNumber);
-    if (dateStringSet == null) {
-      return null;
-    }
-    for (String timestampPart : dateStringSet) {
-      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
-      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
-          doneDirFc);
-      MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
-      if (metaInfo != null) {
-        return metaInfo;
-      }
-    }
-    return null;
-  }
-  
-  /**
-   * Checks for the existence of the job history file in the intermediate 
-   * directory.
-   * @param jobId
-   * @return
-   * @throws IOException
-   */
-  private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
-    scanIntermediateDirectory();
-    return intermediateListCache.get(jobId);
-  }
 
   @Override
   public String getApplicationName() {
@@ -609,486 +199,167 @@ public class JobHistory extends Abstract
     private long sleepTime;
     private ThreadPoolExecutor moveToDoneExecutor = null;
     private boolean running = false;
-    
-    public void stop() {
+
+    public synchronized void stop() {
       running = false;
+      notify();
     }
-    
+
     MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
       this.sleepTime = sleepTime;
-      ThreadFactory tf = new ThreadFactoryBuilder()
-        .setNameFormat("MoveIntermediateToDone Thread #%d")
-        .build();
-      moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, 
+      ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+          "MoveIntermediateToDone Thread #%d").build();
+      moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
           TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
       running = true;
     }
-  
-  @Override
+
+    @Override
     public void run() {
       Thread.currentThread().setName("IntermediateHistoryScanner");
       try {
-        while (running) {
+        while (true) {
           LOG.info("Starting scan to move intermediate done files");
-          scanIntermediateDirectory();
-          for (final MetaInfo metaInfo : intermediateListCache.values()) {
+          for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) {
             moveToDoneExecutor.execute(new Runnable() {
               @Override
               public void run() {
                 try {
-                moveToDone(metaInfo);
+                  hsManager.moveToDone(metaInfo);
                 } catch (IOException e) {
-                  LOG.info("Failed to process metaInfo for job: " + 
-                      metaInfo.jobIndexInfo.getJobId(), e);
+                  LOG.info(
+                      "Failed to process metaInfo for job: "
+                          + metaInfo.getJobId(), e);
                 }
               }
             });
-
           }
-          synchronized (this) { // TODO Is this really required.
+          synchronized (this) {
             try {
               this.wait(sleepTime);
             } catch (InterruptedException e) {
               LOG.info("IntermediateHistoryScannerThread interrupted");
             }
+            if (!running) {
+              break;
+            }
           }
         }
       } catch (IOException e) {
-        LOG.warn("Unable to get a list of intermediate files to be moved from: "
-            + intermediateDoneDirPath);
+        LOG.warn("Unable to get a list of intermediate files to be moved");
+        // TODO Shut down the entire process!!!!
       }
     }
   }
-  
-  private Job loadJob(MetaInfo metaInfo) {
-    synchronized(metaInfo) {
-      try {
-        Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), 
-            metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
-            metaInfo.getConfFile(), this.aclsMgr);
-        addToLoadedJobCache(job);
-        return job;
-      } catch (IOException e) {
-        throw new YarnException("Could not find/load job: " + 
-            metaInfo.getJobIndexInfo().getJobId(), e);
-      }
-    }
-  }
-  
-  private Map<JobId, Job> getAllJobsInternal() {
-    //TODO This should ideally be using getAllJobsMetaInfo
-    // or get rid of that method once Job has APIs for user, finishTime etc.
-    SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
-    try {
-      scanIntermediateDirectory();
-    } catch (IOException e) {
-      LOG.warn("Failed to scan intermediate directory", e);
-      throw new YarnException(e);
-    }
-    for (JobId jobId : intermediateListCache.keySet()) {
-      MetaInfo mi = intermediateListCache.get(jobId);
-      if (mi != null) {
-        result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
-            .getJobIndexInfo().getJobId()));
-      }
-    }
-    for (JobId jobId : jobListCache.keySet()) {
-      MetaInfo mi = jobListCache.get(jobId);
-      if (mi != null) {
-        result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
-            .getJobIndexInfo().getJobId()));
-      }
-    }
-    return result;
-  }
 
   /**
    * Helper method for test cases.
    */
   MetaInfo getJobMetaInfo(JobId jobId) throws IOException {
-    //MetaInfo available in cache.
-    MetaInfo metaInfo = null;
-    if (jobListCache.containsKey(jobId)) {
-      metaInfo = jobListCache.get(jobId);
-    }
-
-    if (metaInfo != null) {
-      return metaInfo;
-    }
-    
-    //MetaInfo not available. Check intermediate directory for meta info.
-    metaInfo = scanIntermediateForJob(jobId);
-    if (metaInfo != null) {
-      return metaInfo;
-    }
-    
-    //Intermediate directory does not contain job. Search through older ones.
-    metaInfo = scanOldDirsForJob(jobId);
-    if (metaInfo != null) {
-      return metaInfo;
-    }
-    return null;
-  }
-  
-  private Job findJob(JobId jobId) throws IOException {
-    //Job already loaded.
-    if (loadedJobCache.containsKey(jobId)) {
-      return loadedJobCache.get(jobId);        
-    }
-    
-    //MetaInfo available in cache.
-    MetaInfo metaInfo = null;
-    if (jobListCache.containsKey(jobId)) {
-      metaInfo = jobListCache.get(jobId);
-    }
-
-    if (metaInfo != null) {
-      return loadJob(metaInfo);
-    }
-    
-    //MetaInfo not available. Check intermediate directory for meta info.
-    metaInfo = scanIntermediateForJob(jobId);
-    if (metaInfo != null) {
-      return loadJob(metaInfo);
-    }
-    
-    //Intermediate directory does not contain job. Search through older ones.
-    metaInfo = scanOldDirsForJob(jobId);
-    if (metaInfo != null) {
-      return loadJob(metaInfo);
-    }
-    return null;
-  }
-  
-  private void moveToDone(MetaInfo metaInfo) throws IOException {
-    long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
-    if (completeTime == 0) completeTime = System.currentTimeMillis();
-    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
-    
-    List<Path> paths = new ArrayList<Path>();
-    Path historyFile = metaInfo.getHistoryFile();
-    if (historyFile == null) {
-      LOG.info("No file for job-history with " + jobId + " found in cache!");
-    } else {
-      paths.add(historyFile);
-    }
-    
-    Path confFile = metaInfo.getConfFile();
-    if (confFile == null) {
-      LOG.info("No file for jobConf with " + jobId + " found in cache!");
-    } else {
-      paths.add(confFile);
-    }
-    
-    //TODO Check all mi getters and setters for the conf path
-    Path summaryFile = metaInfo.getSummaryFile();
-    if (summaryFile == null) {
-      LOG.info("No summary file for job: " + jobId);
-    } else {
-      try {
-        String jobSummaryString = getJobSummary(intermediateDoneDirFc, summaryFile);
-        SUMMARY_LOG.info(jobSummaryString);
-        LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
-        intermediateDoneDirFc.delete(summaryFile, false);
-        metaInfo.setSummaryFile(null);
-      } catch (IOException e) {
-        LOG.warn("Failed to process summary file: [" + summaryFile + "]");
-        throw e;
-      }
-    }
-
-    Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
-    addDirectoryToSerialNumberIndex(targetDir);
-    try {
-      maybeMakeSubdirectory(targetDir);
-    } catch (IOException e) {
-      LOG.warn("Failed creating subdirectory: " + targetDir + 
-          " while attempting to move files for jobId: " + jobId);
-      throw e;
-    }
-    synchronized (metaInfo) {
-      if (historyFile != null) {
-        Path toPath = doneDirFc.makeQualified(new Path(targetDir, 
-            historyFile.getName()));
-        try {
-          moveToDoneNow(historyFile, toPath);
-        } catch (IOException e) {
-          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
-              + jobId);
-          throw e;
-        }
-        metaInfo.setHistoryFile(toPath);
-      }
-      if (confFile != null) {
-        Path toPath = doneDirFc.makeQualified(new Path(targetDir, 
-            confFile.getName()));
-        try {
-          moveToDoneNow(confFile, toPath);
-        } catch (IOException e) {
-          LOG.warn("Failed to move file: " + historyFile + " for jobId: " 
-              + jobId);
-          throw e;
-        }
-        metaInfo.setConfFile(toPath);
-      }
-    }
-    addToJobListCache(jobId, metaInfo);
-    intermediateListCache.remove(jobId);
+    return hsManager.getMetaInfo(jobId);
   }
-  
-  private void moveToDoneNow(final Path src, final Path target)
-      throws IOException {
-    LOG.info("Moving " + src.toString() + " to " + target.toString());
-    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
-    // fc.util().copy(src, target);
-    //fc.delete(src, false);
-    //intermediateDoneDirFc.setPermission(target, new FsPermission(
-    //JobHistoryUtils.HISTORY_DONE_FILE_PERMISSION));
-  }
-  
-  String getJobSummary(FileContext fc, Path path) throws IOException {
-    Path qPath = fc.makeQualified(path);
-    FSDataInputStream in = fc.open(qPath);
-    String jobSummaryString = in.readUTF();
-    in.close();
-    return jobSummaryString;
-  }
-  
-  private void maybeMakeSubdirectory(Path path) throws IOException {
-    boolean existsInExistingCache = false;
-    synchronized(existingDoneSubdirs) {
-      if (existingDoneSubdirs.contains(path)) existsInExistingCache = true;
-    }
-    try {
-      doneDirFc.getFileStatus(path);
-      if (!existsInExistingCache) {
-        existingDoneSubdirs.add(path);
-        if (debugMode) {
-          LOG.info("JobHistory.maybeMakeSubdirectory -- We believed "
-                             + path + " already existed, but it didn't.");
-        }
-      }
-    } catch (FileNotFoundException fnfE) {
-      try {
-        FsPermission fsp = 
-          new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
-        doneDirFc.mkdir(path, fsp, true);
-        FileStatus fsStatus = doneDirFc.getFileStatus(path);
-        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
-            + ", Expected: " + fsp.toShort());
-        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
-          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
-              + ", " + fsp);
-          doneDirFc.setPermission(path, fsp);
-        }
-        synchronized(existingDoneSubdirs) {
-          existingDoneSubdirs.add(path);
-        }
-      } catch (FileAlreadyExistsException faeE) { //Nothing to do.
-      }
-    }
-  }
-  
-  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
-    return new Path(doneDirPrefixPath, 
-        JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
-  }
-  
-  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
-    String timestampComponent = 
-      JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
-    return new Path(doneDirPrefixPath, 
-        JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
-  }  
-  
 
   @Override
-  public synchronized Job getJob(JobId jobId) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Looking for Job "+jobId);
-    }
-    Job job = null;
-    try {
-      job = findJob(jobId);
-      //This could return a null job.
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-    return job;
+  public Job getJob(JobId jobId) {
+    return storage.getFullJob(jobId);
   }
 
   @Override
   public Map<JobId, Job> getAllJobs(ApplicationId appID) {
-    if(LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       LOG.debug("Called getAllJobs(AppId): " + appID);
     }
-//    currently there is 1 to 1 mapping between app and job id
+    // currently there is 1 to 1 mapping between app and job id
     org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID);
     Map<JobId, Job> jobs = new HashMap<JobId, Job>();
     JobId jobID = TypeConverter.toYarn(oldJobID);
     jobs.put(jobID, getJob(jobID));
     return jobs;
-//    return getAllJobs();
   }
-  
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapreduce.v2.hs.HistoryContext#getAllJobs()
-   * 
-   * Returns a recent list of jobs. This may not be the complete set.
-   * If a previous jobId is known - it can be queries via the getJob(JobId)
-   * method.
-   * Size of this list is determined by the size of the job list cache.
-   * This can be fixed when pagination is implemented - return the first set of
-   * jobs via the cache, go to DFS only when an attempt is made to navigate
-   * past the cached list.
-   * This does involve a DFS oepration of scanning the intermediate directory.
-   */
+
+  @Override
   public Map<JobId, Job> getAllJobs() {
-    LOG.debug("Called getAllJobs()");
-    return getAllJobsInternal();
+    return storage.getAllPartialJobs();
   }
 
-  static class MetaInfo {
-    private Path historyFile;
-    private Path confFile; 
-    private Path summaryFile;
-    JobIndexInfo jobIndexInfo;
-
-    MetaInfo(Path historyFile, Path confFile, Path summaryFile, 
-        JobIndexInfo jobIndexInfo) {
-      this.historyFile = historyFile;
-      this.confFile = confFile;
-      this.summaryFile = summaryFile;
-      this.jobIndexInfo = jobIndexInfo;
-    }
-
-    Path getHistoryFile() { return historyFile; }
-    Path getConfFile() { return confFile; }
-    Path getSummaryFile() { return summaryFile; }
-    JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
-    
-    void setHistoryFile(Path historyFile) { this.historyFile = historyFile; }
-    void setConfFile(Path confFile) {this.confFile = confFile; }
-    void setSummaryFile(Path summaryFile) { this.summaryFile = summaryFile; }
+  /**
+   * Look for a set of partial jobs.
+   * 
+   * @param offset
+   *          the offset into the list of jobs.
+   * @param count
+   *          the maximum number of jobs to return.
+   * @param user
+   *          only return jobs for the given user.
+   * @param queue
+   *          only return jobs for in the given queue.
+   * @param sBegin
+   *          only return Jobs that started on or after the given time.
+   * @param sEnd
+   *          only return Jobs that started on or before the given time.
+   * @param fBegin
+   *          only return Jobs that ended on or after the given time.
+   * @param fEnd
+   *          only return Jobs that ended on or before the given time.
+   * @param jobState
+   *          only return jobs that are in the give job state.
+   * @return The list of filtered jobs.
+   */
+  @Override
+  public JobsInfo getPartialJobs(Long offset, Long count, String user,
+      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+      JobState jobState) {
+    return storage.getPartialJobs(offset, count, user, queue, sBegin, sEnd,
+        fBegin, fEnd, jobState);
   }
-  
 
   public class HistoryCleaner implements Runnable {
-    private long currentTime;
-    
     long maxAgeMillis;
-    long filesDeleted = 0;
-    long dirsDeleted = 0;
-    
+
     public HistoryCleaner(long maxAge) {
       this.maxAgeMillis = maxAge;
     }
-    
-    @SuppressWarnings("unchecked")
+
     public void run() {
       LOG.info("History Cleaner started");
-      currentTime = System.currentTimeMillis();
-      boolean halted = false;
-      //TODO Delete YYYY/MM/DD directories.
+      long cutoff = System.currentTimeMillis() - maxAgeMillis;
       try {
-        List<FileStatus> serialDirList = findTimestampedDirectories();
-        //Sort in ascending order. Relies on YYYY/MM/DD/Serial
-        Collections.sort(serialDirList);
-        for (FileStatus serialDir : serialDirList) {
-          List<FileStatus> historyFileList = 
-            scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
-          for (FileStatus historyFile : historyFileList) {
-            JobIndexInfo jobIndexInfo = 
-              FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
-            long effectiveTimestamp = 
-              getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
-            if (shouldDelete(effectiveTimestamp)) {
-              String confFileName = 
-                JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
-              MetaInfo metaInfo = new MetaInfo(historyFile.getPath(),
-                  new Path(historyFile.getPath().getParent(), confFileName), 
-                  null, jobIndexInfo);
-              delete(metaInfo);
-            } else {
-              halted = true;
-              break;
-            }
-          }
-          if (!halted) {
-            deleteDir(serialDir.getPath());
-            removeDirectoryFromSerialNumberIndex(serialDir.getPath());
-            synchronized (existingDoneSubdirs) {
-              existingDoneSubdirs.remove(serialDir.getPath());  
-            }
-            
-          } else {
-            break; //Don't scan any more directories.
-    }
-  }
+        hsManager.clean(cutoff, storage);
       } catch (IOException e) {
-        LOG.warn("Error in History cleaner run", e);
+        LOG.warn("Error trying to clean up ", e);
       }
       LOG.info("History Cleaner complete");
-      LOG.info("FilesDeleted: " + filesDeleted);
-      LOG.info("Directories Deleted: " + dirsDeleted);
-    }
-    
-    private boolean shouldDelete(long ts) {
-      return ((ts + maxAgeMillis) <= currentTime);
-    }
-    
-    private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
-      if (finishTime == 0) {
-        return fileStatus.getModificationTime();
-      }
-      return finishTime;
-    }
-    
-    private void delete(MetaInfo metaInfo) throws IOException {
-      deleteFile(metaInfo.getHistoryFile());
-      deleteFile(metaInfo.getConfFile());
-      jobListCache.remove(metaInfo.getJobIndexInfo().getJobId());
-      loadedJobCache.remove(metaInfo.getJobIndexInfo().getJobId());
-    }
-    
-    private void deleteFile(final Path path) throws IOException {
-      doneDirFc.delete(doneDirFc.makeQualified(path), false);
-      filesDeleted++;
-    }
-    
-    private void deleteDir(Path path) throws IOException {
-      doneDirFc.delete(doneDirFc.makeQualified(path), true);
-      dirsDeleted++;
-    }
     }
-    
-  
-  
-  //TODO AppContext - Not Required
-  private  ApplicationAttemptId appAttemptID;
+  }
+
+  // TODO AppContext - Not Required
+  private ApplicationAttemptId appAttemptID;
+
   @Override
   public ApplicationAttemptId getApplicationAttemptId() {
-  //TODO fixme - bogus appAttemptID for now
+    // TODO fixme - bogus appAttemptID for now
     return appAttemptID;
-  }  
-  
-  //TODO AppContext - Not Required
+  }
+
+  // TODO AppContext - Not Required
   private ApplicationId appID;
+
   @Override
   public ApplicationId getApplicationID() {
-  //TODO fixme - bogus appID for now
+    // TODO fixme - bogus appID for now
     return appID;
   }
-  
-  //TODO AppContext - Not Required
+
+  // TODO AppContext - Not Required
   @Override
   public EventHandler getEventHandler() {
     // TODO Auto-generated method stub
     return null;
   }
-  
-  //TODO AppContext - Not Required
+
+  // TODO AppContext - Not Required
   private String userName;
+
   @Override
   public CharSequence getUser() {
     if (userName != null) {

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Wed Apr 11 05:47:40 2012
@@ -51,6 +51,7 @@ public class PartialJob implements org.a
     jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
     jobReport.setStartTime(jobIndexInfo.getSubmitTime());
     jobReport.setFinishTime(jobIndexInfo.getFinishTime());
+    jobReport.setJobState(getState());
   }
   
   @Override

Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java Wed Apr 11 05:47:40 2012
@@ -44,6 +44,7 @@ public class HsWebApp extends WebApp imp
     bind(JAXBContextResolver.class);
     bind(GenericExceptionHandler.class);
     bind(AppContext.class).toInstance(history);
+    bind(HistoryContext.class).toInstance(history);
     route("/", HsController.class);
     route("/app", HsController.class);
     route(pajoin("/job", JOB_ID), HsController.class, "job");