You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [28/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java Sat Nov 28 20:26:01 2009
@@ -64,7 +64,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -18,11 +19,13 @@
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.IOException;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
@@ -59,17 +62,59 @@
       FileOutputStream fstream = new FileOutputStream(f);
       fstream.write("somestrings".getBytes());
       fstream.close();
-      String[] args = new String[8];
+      File f1 = new File(thisbuildDir, "files_tmp1");
+      fstream = new FileOutputStream(f1);
+      fstream.write("somestrings".getBytes());
+      fstream.close();
+      
+      // copy files to dfs
+      Path cachePath = new Path("/cacheDir");
+      if (!fs.mkdirs(cachePath)) {
+        throw new IOException(
+            "Mkdirs failed to create " + cachePath.toString());
+      }
+      Path localCachePath = new Path(System.getProperty("test.cache.data"));
+      Path txtPath = new Path(localCachePath, new Path("test.txt"));
+      Path jarPath = new Path(localCachePath, new Path("test.jar"));
+      Path zipPath = new Path(localCachePath, new Path("test.zip"));
+      Path tarPath = new Path(localCachePath, new Path("test.tar"));
+      Path tgzPath = new Path(localCachePath, new Path("test.tgz"));
+      fs.copyFromLocalFile(txtPath, cachePath);
+      fs.copyFromLocalFile(jarPath, cachePath);
+      fs.copyFromLocalFile(zipPath, cachePath);
+
+      // construct options for -files
+      String[] files = new String[3];
+      files[0] = f.toString();
+      files[1] = f1.toString() + "#localfilelink";
+      files[2] = 
+        fs.getUri().resolve(cachePath + "/test.txt#dfsfilelink").toString();
+
+      // construct options for -libjars
+      String[] libjars = new String[2];
+      libjars[0] = "build/test/mapred/testjar/testjob.jar";
+      libjars[1] = fs.getUri().resolve(cachePath + "/test.jar").toString();
+      
+      // construct options for archives
+      String[] archives = new String[3];
+      archives[0] = tgzPath.toString();
+      archives[1] = tarPath + "#tarlink";
+      archives[2] = 
+        fs.getUri().resolve(cachePath + "/test.zip#ziplink").toString();
+      
+      String[] args = new String[10];
       args[0] = "-files";
-      args[1] = f.toString();
+      args[1] = StringUtils.arrayToString(files);
       args[2] = "-libjars";
       // the testjob.jar as a temporary jar file 
       // rather than creating its own
-      args[3] = "build/test/mapred/testjar/testjob.jar";
-      args[4] = "-D";
-      args[5] = "mapred.output.committer.class=testjar.CustomOutputCommitter";
-      args[6] = input.toString();
-      args[7] = output.toString();
+      args[3] = StringUtils.arrayToString(libjars);
+      args[4] = "-archives";
+      args[5] = StringUtils.arrayToString(archives);
+      args[6] = "-D";
+      args[7] = "mapred.output.committer.class=testjar.CustomOutputCommitter";
+      args[8] = input.toString();
+      args[9] = output.toString();
       
       JobConf jobConf = mr.createJobConf();
       //before running the job, verify that libjar is not in client classpath

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java Sat Nov 28 20:26:01 2009
@@ -127,7 +127,7 @@
     public void reduce(IntWritable key, Iterator<Writable> values, 
                        OutputCollector<IntWritable, Text> out,
                        Reporter reporter) throws IOException {
-      int currentKey = ((IntWritable)(key)).get();
+      int currentKey = key.get();
       // keys should be in descending order
       if (currentKey > lastKey) {
         fail("Keys not in sorted descending order");

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCompressedEmptyMapOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCompressedEmptyMapOutputs.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCompressedEmptyMapOutputs.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCompressedEmptyMapOutputs.java Sat Nov 28 20:26:01 2009
@@ -49,8 +49,8 @@
   throws Exception {
     // Scale down the default settings for RandomWriter for the test-case
     // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP -> 1MB
-    job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP);
-    job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST);
+    job.setInt(RandomWriter.BYTES_PER_MAP, RW_BYTES_PER_MAP);
+    job.setInt(RandomWriter.MAPS_PER_HOST, RW_MAPS_PER_HOST);
     String[] rwArgs = {sortInput.toString()};
     
     // Run RandomWriter

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestControlledMapReduceJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestControlledMapReduceJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestControlledMapReduceJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestControlledMapReduceJob.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.ControlledMapReduceJob.ControlledMapReduceJobRunner;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 
 /**
  * Test to verify the controlled behavior of a ControlledMapReduceJob.
@@ -42,8 +43,8 @@
       throws Exception {
 
     Properties props = new Properties();
-    props.setProperty("mapred.tasktracker.map.tasks.maximum", "2");
-    props.setProperty("mapred.tasktracker.reduce.tasks.maximum", "2");
+    props.setProperty(TTConfig.TT_MAP_SLOTS, "2");
+    props.setProperty(TTConfig.TT_REDUCE_SLOTS, "2");
     startCluster(true, props);
     LOG.info("Started the cluster");
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java Sat Nov 28 20:26:01 2009
@@ -34,6 +34,8 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 
 /**
  * A JUnit test to test Map-Reduce empty jobs.
@@ -48,11 +50,11 @@
 
   MiniMRCluster mr = null;
 
-  /** Committer with cleanup waiting on a signal
+  /** Committer with commit waiting on a signal
    */
-  static class CommitterWithDelayCleanup extends FileOutputCommitter {
+  static class CommitterWithDelayCommit extends FileOutputCommitter {
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       Configuration conf = context.getConfiguration();
       Path share = new Path(conf.get("share"));
       FileSystem fs = FileSystem.get(conf);
@@ -64,7 +66,7 @@
         }
         UtilsForTests.waitFor(100);
       }
-      super.cleanupJob(context);
+      super.commitJob(context);
     }
   }
 
@@ -101,7 +103,7 @@
     conf.setJobName("empty");
     // use an InputFormat which returns no split
     conf.setInputFormat(EmptyInputFormat.class);
-    conf.setOutputCommitter(CommitterWithDelayCleanup.class);
+    conf.setOutputCommitter(CommitterWithDelayCommit.class);
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);
     conf.setMapperClass(IdentityMapper.class);
@@ -195,7 +197,8 @@
         + " and not 1.0", runningJob.cleanupProgress() == 1.0);
 
     assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
-    FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
+    FileStatus[] list = fs.listStatus(outDir, 
+                          new Utils.OutputFileUtils.OutputFilesFilter());
     assertTrue("Number of part-files is " + list.length + " and not "
         + numReduces, list.length == numReduces);
 
@@ -221,10 +224,10 @@
       JobConf conf = new JobConf();
       fileSys = FileSystem.get(conf);
 
-      conf.set("mapred.job.tracker.handler.count", "1");
-      conf.set("mapred.job.tracker", "127.0.0.1:0");
-      conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
-      conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
+      conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
+      conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
+      conf.set(JTConfig.JT_HTTP_ADDRESS, "127.0.0.1:0");
+      conf.set(TTConfig.TT_HTTP_ADDRESS, "127.0.0.1:0");
 
       mr =
           new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java Sat Nov 28 20:26:01 2009
@@ -72,9 +72,9 @@
     job.setOutputFormat(TextOutputFormat.class);
     job.setNumReduceTasks(1);
 
-    job.set("mapred.data.field.separator", "-");
-    job.set("map.output.key.value.fields.spec", "6,5,1-3:0-");
-    job.set("reduce.output.key.value.fields.spec", ":4,3,2,1,0,0-");
+    job.set("mapreduce.fieldsel.data.field.separator", "-");
+    job.set("mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec", "6,5,1-3:0-");
+    job.set("mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec", ":4,3,2,1,0,0-");
 
     JobClient.runJob(job);
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,8 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 
 public class TestFileOutputCommitter extends TestCase {
   private static Path outDir = new Path(
@@ -35,11 +37,11 @@
   @SuppressWarnings("unchecked")
   public void testCommitter() throws Exception {
     JobConf job = new JobConf();
-    job.set("mapred.task.id", attempt);
+    job.set(JobContext.TASK_ATTEMPT_ID, attempt);
     job.setOutputCommitter(FileOutputCommitter.class);
     FileOutputFormat.setOutputPath(job, outDir);
-    JobContext jContext = new JobContext(job, taskID.getJobID());
-    TaskAttemptContext tContext = new TaskAttemptContext(job, taskID);
+    JobContext jContext = new JobContextImpl(job, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
     FileOutputFormat.setWorkOutputPath(job, 
       committer.getTempTaskOutputPath(tContext));
@@ -73,7 +75,7 @@
       theRecordWriter.close(reporter);
     }
     committer.commitTask(tContext);
-    committer.cleanupJob(jContext);
+    committer.commitJob(jContext);
     
     File expectedFile = new File(new Path(outDir, file).toString());
     StringBuffer expectedOutput = new StringBuffer();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java Sat Nov 28 20:26:01 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 
 import junit.framework.TestCase;
 
@@ -44,7 +45,7 @@
     Path p = new Path(System.getProperty("test.build.data", "/tmp"),
         "cache").makeQualified(fs);
     fs.delete(p, true);
-    conf.setInt("mapred.tasktracker.indexcache.mb", 1);
+    conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
     final int partsPerMap = 1000;
     final int bytesPerFile = partsPerMap * 24;
     IndexCache cache = new IndexCache(conf);
@@ -111,7 +112,7 @@
     Path p = new Path(System.getProperty("test.build.data", "/tmp"),
         "cache").makeQualified(fs);
     fs.delete(p, true);
-    conf.setInt("mapred.tasktracker.indexcache.mb", 1);
+    conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
     IndexCache cache = new IndexCache(conf);
 
     Path f = new Path(p, "badindex");

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,8 @@
 import java.io.IOException;
 import java.util.UUID;
 
+import javax.security.auth.login.LoginException;
+
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -32,7 +34,9 @@
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /** 
  * Re-runs a map task using the IsolationRunner. 
@@ -104,15 +108,20 @@
   }
 
   private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)
-      throws IOException {
+      throws IOException,
+      LoginException {
     String taskid =
         new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString();
-    return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
-        TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf);
+    return new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
+        TaskTracker.getTaskConfFile(UserGroupInformation.login(conf)
+            .getUserName(), jobId.toString(), taskid, false), conf);
   }
 
-  public void testIsolationRunOfMapTask() throws 
-      IOException, InterruptedException, ClassNotFoundException {
+  public void testIsolationRunOfMapTask()
+      throws IOException,
+      InterruptedException,
+      ClassNotFoundException,
+      LoginException {
     MiniMRCluster mr = null;
     try {
       mr = new MiniMRCluster(1, "file:///", 4);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java Sat Nov 28 20:26:01 2009
@@ -116,7 +116,8 @@
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, 
+                      new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = fs.open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -161,7 +162,8 @@
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, 
+                      new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java Sat Nov 28 20:26:01 2009
@@ -17,32 +17,24 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.io.PrintStream;
 import java.io.Writer;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TestMRJobClient;
+import org.apache.hadoop.mapreduce.tools.CLI;
 import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
 
-public class TestJobClient extends ClusterMapReduceTestCase {
-  
-  private static final Log LOG = LogFactory.getLog(TestJobClient.class);
+public class TestJobClient extends TestMRJobClient {
   
   private String runJob() throws Exception {
-    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+    OutputStream os = getFileSystem().create(new Path(getInputDir(),
+                        "text.txt"));
     Writer wr = new OutputStreamWriter(os);
     wr.write("hello1\n");
     wr.write("hello2\n");
@@ -71,60 +63,27 @@
     return JobClient.runJob(conf).getID().toString();
   }
   
-  static int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws Exception {
-    PrintStream oldOut = System.out;
-    PrintStream newOut = new PrintStream(out, true);
-    try {
-      System.setOut(newOut);
-      return ToolRunner.run(conf, tool, args);
-    } finally {
-      System.setOut(oldOut);
-    }
+  public static int runTool(Configuration conf, Tool tool, String[] args,
+      OutputStream out) throws Exception {
+    return TestMRJobClient.runTool(conf, tool, args, out);
   }
-
-  public void testGetCounter() throws Exception {
-    String jobId = runJob();
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    int exitCode = runTool(createJobConf(), new JobClient(),
-        new String[] { "-counter", jobId,
-        "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS" },
-        out);
-    assertEquals("Exit code", 0, exitCode);
-    assertEquals("Counter", "3", out.toString().trim());
+  
+  static void verifyJobPriority(String jobId, String priority,
+      JobConf conf)  throws Exception {
+    TestJobClient test = new TestJobClient();
+    test.verifyJobPriority(jobId, priority, conf, test.createJobClient());
   }
-
-  public void testJobList() throws Exception {
+  
+  public void testJobClient() throws Exception {
+    Configuration conf = createJobConf();
     String jobId = runJob();
-    verifyJobPriority(jobId, "HIGH", createJobConf());
-  }
-
-  static void verifyJobPriority(String jobId, String priority, JobConf conf)
-                            throws Exception {
-    PipedInputStream pis = new PipedInputStream();
-    PipedOutputStream pos = new PipedOutputStream(pis);
-    int exitCode = runTool(conf, new JobClient(),
-        new String[] { "-list", "all" },
-        pos);
-    assertEquals("Exit code", 0, exitCode);
-    BufferedReader br = new BufferedReader(new InputStreamReader(pis));
-    String line = null;
-    while ((line=br.readLine()) != null) {
-      LOG.info("line = " + line);
-      if (!line.startsWith(jobId)) {
-        continue;
-      }
-      assertTrue(line.contains(priority));
-      break;
-    }
-    pis.close();
+    testGetCounter(jobId, conf);
+    testJobList(jobId, conf);
+    testChangingJobPriority(jobId, conf);
   }
   
-  public void testChangingJobPriority() throws Exception {
-    String jobId = runJob();
-    int exitCode = runTool(createJobConf(), new JobClient(),
-        new String[] { "-set-priority", jobId, "VERY_LOW" },
-        new ByteArrayOutputStream());
-    assertEquals("Exit code", 0, exitCode);
-    verifyJobPriority(jobId, "VERY_LOW", createJobConf());
+  protected CLI createJobClient() 
+      throws IOException {
+    return new JobClient();
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java Sat Nov 28 20:26:01 2009
@@ -25,9 +25,10 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ToolRunner;
 
 public class TestJobDirCleanup extends TestCase {
@@ -51,7 +52,7 @@
       final int taskTrackers = 10;
       Configuration conf = new Configuration();
       JobConf mrConf = new JobConf();
-      mrConf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
+      mrConf.set(TTConfig.TT_REDUCE_SLOTS, "1");
       dfs = new MiniDFSCluster(conf, 1, true, null);
       fileSys = dfs.getFileSystem();
       namenode = fileSys.getUri().toString();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Sat Nov 28 20:26:01 2009
@@ -20,9 +20,9 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
@@ -64,7 +64,7 @@
 
     // Run a job with jvm reuse
     JobConf myConf = getClusterConf();
-    myConf.set("mapred.job.reuse.jvm.num.tasks", "-1");
+    myConf.set(JobContext.JVM_NUMTASKS_TORUN, "-1");
     String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
     assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args));
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Sat Nov 28 20:26:01 2009
@@ -20,46 +20,51 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.text.ParseException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.JobHistory.*;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 
 /**
- * Tests the JobHistory files - to catch any changes to JobHistory that can
- * cause issues for the execution of JobTracker.RecoveryManager, HistoryViewer.
  *
  * testJobHistoryFile
  * Run a job that will be succeeded and validate its history file format and
  * content.
  *
- * testJobHistoryUserLogLocation
- * Run jobs with the given values of hadoop.job.history.user.location as
- *   (1)null(default case), (2)"none", and (3)some user specified dir.
- *   Validate user history file location in each case.
- *
  * testJobHistoryJobStatus
  * Run jobs that will be (1) succeeded (2) failed (3) killed.
  *   Validate job status read from history file in each case.
  *
- * Future changes to job history are to be reflected here in this file.
  */
 public class TestJobHistory extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestJobHistory.class);
@@ -67,9 +72,8 @@
   private static String TEST_ROOT_DIR = new File(System.getProperty(
       "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
 
-  private static final Pattern digitsPattern =
-                                     Pattern.compile(JobHistory.DIGITS);
-
+  private static final String DIGITS = "[0-9]+";
+  
   // hostname like   /default-rack/host1.foo.com OR host1.foo.com
   private static final Pattern hostNamePattern = Pattern.compile(
                                        "(/(([\\w\\-\\.]+)/)+)?([\\w\\-\\.]+)");
@@ -77,10 +81,10 @@
   private static final String IP_ADDR =
                        "\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?";
 
-  // hostname like   /default-rack/host1.foo.com OR host1.foo.com
   private static final Pattern trackerNamePattern = Pattern.compile(
-                         "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" +
-                         IP_ADDR + ":" + JobHistory.DIGITS);
+      "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" +
+      IP_ADDR + ":" + DIGITS);
+  
 
   private static final Pattern splitsPattern = Pattern.compile(
                               hostNamePattern + "(," + hostNamePattern + ")*");
@@ -91,202 +95,25 @@
   //Each Task End seen from history file is added here
   private static List<String> taskEnds = new ArrayList<String>();
 
-  // List of tasks that appear in history file after JT reatart. This is to
-  // allow START_TIME=0 for these tasks.
-  private static List<String> ignoreStartTimeOfTasks = new ArrayList<String>();
-
-  // List of potential tasks whose start time can be 0 because of JT restart
-  private static List<String> tempIgnoreStartTimeOfTasks = new ArrayList<String>();
-
-  /**
-   * Listener for history log file, it populates JobHistory.JobInfo
-   * object with data from log file and validates the data.
-   */
-  static class TestListener
-                    extends DefaultJobHistoryParser.JobTasksParseListener {
-    int lineNum;//line number of history log file
-    boolean isJobLaunched;
-    boolean isJTRestarted;
-
-    TestListener(JobInfo job) {
-      super(job);
-      lineNum = 0;
-      isJobLaunched = false;
-      isJTRestarted = false;
-    }
-
-    // TestListener implementation
-    public void handle(RecordTypes recType, Map<Keys, String> values)
-    throws IOException {
-
-      lineNum++;
-
-      // Check if the record is of type Meta
-      if (recType == JobHistory.RecordTypes.Meta) {
-        long version = Long.parseLong(values.get(Keys.VERSION));
-        assertTrue("Unexpected job history version ",
-                   (version >= 0 && version <= JobHistory.VERSION));
-      }
-      else if (recType.equals(RecordTypes.Job)) {
-        String jobid = values.get(Keys.JOBID);
-        assertTrue("record type 'Job' is seen without JOBID key" +
-        		" in history file at line " + lineNum, jobid != null);
-        JobID id = JobID.forName(jobid);
-        assertTrue("JobID in history file is in unexpected format " +
-                  "at line " + lineNum, id != null);
-        String time = values.get(Keys.LAUNCH_TIME);
-        if (time != null) {
-          if (isJobLaunched) {
-            // We assume that if we see LAUNCH_TIME again, it is because of JT restart
-            isJTRestarted = true;
-          }
-          else {// job launched first time
-            isJobLaunched = true;
-          }
-        }
-        time = values.get(Keys.FINISH_TIME);
-        if (time != null) {
-          assertTrue ("Job FINISH_TIME is seen in history file at line " +
-                      lineNum + " before LAUNCH_TIME is seen", isJobLaunched);
-        }
-      }
-      else if (recType.equals(RecordTypes.Task)) {
-        String taskid = values.get(Keys.TASKID);
-        assertTrue("record type 'Task' is seen without TASKID key" +
-        		" in history file at line " + lineNum, taskid != null);
-        TaskID id = TaskID.forName(taskid);
-        assertTrue("TaskID in history file is in unexpected format " +
-                  "at line " + lineNum, id != null);
-        
-        String time = values.get(Keys.START_TIME);
-        if (time != null) {
-          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-          assertTrue("Duplicate START_TIME seen for task " + taskid +
-                     " in history file at line " + lineNum, attemptIDs == null);
-          attemptIDs = new ArrayList<String>();
-          taskIDsToAttemptIDs.put(taskid, attemptIDs);
-
-          if (isJTRestarted) {
-            // This maintains a potential ignoreStartTimeTasks list
-            tempIgnoreStartTimeOfTasks.add(taskid);
-          }
-        }
-
-        time = values.get(Keys.FINISH_TIME);
-        if (time != null) {
-          String s = values.get(Keys.TASK_STATUS);
-          if (s != null) {
-            List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-            assertTrue ("Task FINISH_TIME is seen in history file at line " +
-                    lineNum + " before START_TIME is seen", attemptIDs != null);
-
-            // Check if all the attemptIDs of this task are finished
-            assertTrue("TaskId " + taskid + " is finished at line " +
-                       lineNum + " but its attemptID is not finished.",
-                       (attemptIDs.size() <= 1));
-
-            // Check if at least 1 attempt of this task is seen
-            assertTrue("TaskId " + taskid + " is finished at line " +
-                       lineNum + " but no attemptID is seen before this.",
-                       attemptIDs.size() == 1);
-
-            if (s.equals("KILLED") || s.equals("FAILED")) {
-              // Task End with KILLED/FAILED status in history file is
-              // considered as TaskEnd, TaskStart. This is useful in checking
-              // the order of history lines.
-              attemptIDs = new ArrayList<String>();
-              taskIDsToAttemptIDs.put(taskid, attemptIDs);
-            }
-            else {
-              taskEnds.add(taskid);
-            }
-          }
-          else {
-            // This line of history file could be just an update to finish time
-          }
-        }
-      }
-      else if (recType.equals(RecordTypes.MapAttempt) ||
-                 recType.equals(RecordTypes.ReduceAttempt)) {
-        String taskid =  values.get(Keys.TASKID);
-        assertTrue("record type " + recType + " is seen without TASKID key" +
-        		" in history file at line " + lineNum, taskid != null);
-        
-        String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-        TaskAttemptID id = TaskAttemptID.forName(attemptId);
-        assertTrue("AttemptID in history file is in unexpected format " +
-                   "at line " + lineNum, id != null);
-        
-        String time = values.get(Keys.START_TIME);
-        if (time != null) {
-          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-          assertTrue ("TaskAttempt is seen in history file at line " + lineNum +
-                      " before Task is seen", attemptIDs != null);
-          assertFalse ("Duplicate TaskAttempt START_TIME is seen in history " +
-                      "file at line " + lineNum, attemptIDs.remove(attemptId));
-
-          if (attemptIDs.isEmpty()) {
-            //just a boolean whether any attempt is seen or not
-            attemptIDs.add("firstAttemptIsSeen");
-          }
-          attemptIDs.add(attemptId);
-
-          if (tempIgnoreStartTimeOfTasks.contains(taskid) &&
-              (id.getId() < 1000)) {
-            // If Task line of this attempt is seen in history file after
-            // JT restart and if this attempt is < 1000(i.e. attempt is noti
-            // started after JT restart) - assuming single JT restart happened
-            ignoreStartTimeOfTasks.add(taskid);
-          }
-        }
-
-        time = values.get(Keys.FINISH_TIME);
-        if (time != null) {
-          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-          assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
-                      + lineNum + " before Task is seen", attemptIDs != null);
-
-          assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
-                      + lineNum + " before TaskAttempt START_TIME is seen",
-                      attemptIDs.remove(attemptId));
-        }
-      }
-      super.handle(recType, values);
-    }
-  }
-
-  // Check if the time is in the expected format
-  private static boolean isTimeValid(String time) {
-    Matcher m = digitsPattern.matcher(time);
-    return m.matches() && (Long.parseLong(time) > 0);
-  }
-
-  private static boolean areTimesInOrder(String time1, String time2) {
-    return (Long.parseLong(time1) <= Long.parseLong(time2));
-  }
 
   // Validate Format of Job Level Keys, Values read from history file
-  private static void validateJobLevelKeyValuesFormat(Map<Keys, String> values,
+  private static void validateJobLevelKeyValuesFormat(JobInfo jobInfo,
                                                       String status) {
-    String time = values.get(Keys.SUBMIT_TIME);
-    assertTrue("Job SUBMIT_TIME is in unexpected format:" + time +
-               " in history file", isTimeValid(time));
-
-    time = values.get(Keys.LAUNCH_TIME);
-    assertTrue("Job LAUNCH_TIME is in unexpected format:" + time +
-               " in history file", isTimeValid(time));
-
-    String time1 = values.get(Keys.FINISH_TIME);
-    assertTrue("Job FINISH_TIME is in unexpected format:" + time1 +
-               " in history file", isTimeValid(time1));
-    assertTrue("Job FINISH_TIME is < LAUNCH_TIME in history file",
-               areTimesInOrder(time, time1));
+    long submitTime = jobInfo.getSubmitTime();
+    long launchTime = jobInfo.getLaunchTime();
+    long finishTime = jobInfo.getFinishTime();
+    
+    assertTrue("Invalid submit time", submitTime > 0);
+    assertTrue("SubmitTime > LaunchTime", submitTime <= launchTime);
+    assertTrue("LaunchTime > FinishTime", launchTime <= finishTime);
+    
+    String stat = jobInfo.getJobStatus();
 
-    String stat = values.get(Keys.JOB_STATUS);
     assertTrue("Unexpected JOB_STATUS \"" + stat + "\" is seen in" +
                " history file", (status.equals(stat)));
+    String priority = jobInfo.getPriority();
 
-    String priority = values.get(Keys.JOB_PRIORITY);
+    assertNotNull(priority);
     assertTrue("Unknown priority for the job in history file",
                (priority.equals("HIGH") ||
                 priority.equals("LOW")  || priority.equals("NORMAL") ||
@@ -296,34 +123,30 @@
   // Validate Format of Task Level Keys, Values read from history file
   private static void validateTaskLevelKeyValuesFormat(JobInfo job,
                                   boolean splitsCanBeEmpty) {
-    Map<String, JobHistory.Task> tasks = job.getAllTasks();
+    Map<TaskID, TaskInfo> tasks = job.getAllTasks();
 
     // validate info of each task
-    for (JobHistory.Task task : tasks.values()) {
+    for (TaskInfo task : tasks.values()) {
 
-      String tid = task.get(Keys.TASKID);
-      String time = task.get(Keys.START_TIME);
-      // We allow START_TIME=0 for tasks seen in history after JT restart
-      if (!ignoreStartTimeOfTasks.contains(tid) || (Long.parseLong(time) != 0)) {
-        assertTrue("Task START_TIME of " + tid + " is in unexpected format:" +
-                 time + " in history file", isTimeValid(time));
-      }
-
-      String time1 = task.get(Keys.FINISH_TIME);
-      assertTrue("Task FINISH_TIME of " + tid + " is in unexpected format:" +
-                 time1 + " in history file", isTimeValid(time1));
+      TaskID tid = task.getTaskId();
+      long startTime = task.getStartTime();
+      assertTrue("Invalid Start time", startTime > 0);
+      
+      long finishTime = task.getFinishTime();
       assertTrue("Task FINISH_TIME is < START_TIME in history file",
-                 areTimesInOrder(time, time1));
+                 startTime < finishTime);
 
       // Make sure that the Task type exists and it is valid
-      String type = task.get(Keys.TASK_TYPE);
+      TaskType type = task.getTaskType();
       assertTrue("Unknown Task type \"" + type + "\" is seen in " +
                  "history file for task " + tid,
-                 (type.equals("MAP") || type.equals("REDUCE") ||
-                  type.equals("SETUP") || type.equals("CLEANUP")));
+                 (type.equals(TaskType.MAP) || 
+                  type.equals(TaskType.REDUCE) ||
+                  type.equals(TaskType.JOB_CLEANUP) || 
+                  type.equals(TaskType.JOB_SETUP)));
 
-      if (type.equals("MAP")) {
-        String splits = task.get(Keys.SPLITS);
+      if (type.equals(TaskType.MAP)) {
+        String splits = task.getSplitLocations();
         //order in the condition OR check is important here
         if (!splitsCanBeEmpty || splits.length() != 0) {
           Matcher m = splitsPattern.matcher(splits);
@@ -333,103 +156,85 @@
       }
 
       // Validate task status
-      String status = task.get(Keys.TASK_STATUS);
+      String status = task.getTaskStatus();
       assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
-                 " history file for task " + tid, (status.equals("SUCCESS") ||
+                 " history file for task " + tid, (status.equals("SUCCEEDED") ||
                  status.equals("FAILED") || status.equals("KILLED")));
     }
   }
 
   // Validate foramt of Task Attempt Level Keys, Values read from history file
   private static void validateTaskAttemptLevelKeyValuesFormat(JobInfo job) {
-    Map<String, JobHistory.Task> tasks = job.getAllTasks();
+    Map<TaskID, TaskInfo> tasks = job.getAllTasks();
 
     // For each task
-    for (JobHistory.Task task : tasks.values()) {
+    for (TaskInfo task : tasks.values()) {
       // validate info of each attempt
-      for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
+      for (TaskAttemptInfo attempt : task.getAllTaskAttempts().values()) {
+
+        TaskAttemptID id = attempt.getAttemptId();
+        assertNotNull(id);
+        
+        long startTime = attempt.getStartTime();
+        assertTrue("Invalid Start time", startTime > 0);
 
-        String id = attempt.get(Keys.TASK_ATTEMPT_ID);
-        String time = attempt.get(Keys.START_TIME);
-        assertTrue("START_TIME of task attempt " + id +
-                   " is in unexpected format:" + time +
-                   " in history file", isTimeValid(time));
-
-        String time1 = attempt.get(Keys.FINISH_TIME);
-        assertTrue("FINISH_TIME of task attempt " + id +
-                   " is in unexpected format:" + time1 +
-                   " in history file", isTimeValid(time1));
+        long finishTime = attempt.getFinishTime();
         assertTrue("Task FINISH_TIME is < START_TIME in history file",
-                   areTimesInOrder(time, time1));
+            startTime < finishTime);
 
         // Make sure that the Task type exists and it is valid
-        String type = attempt.get(Keys.TASK_TYPE);
+        TaskType type = attempt.getTaskType();
         assertTrue("Unknown Task type \"" + type + "\" is seen in " +
                    "history file for task attempt " + id,
-                   (type.equals("MAP") || type.equals("REDUCE") ||
-                    type.equals("SETUP") || type.equals("CLEANUP")));
+                   (type.equals(TaskType.MAP) || type.equals(TaskType.REDUCE) ||
+                    type.equals(TaskType.JOB_CLEANUP) || 
+                    type.equals(TaskType.JOB_SETUP)));
 
         // Validate task status
-        String status = attempt.get(Keys.TASK_STATUS);
+        String status = attempt.getTaskStatus();
         assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
                    " history file for task attempt " + id,
-                   (status.equals("SUCCESS") || status.equals("FAILED") ||
-                    status.equals("KILLED")));
+                   (status.equals(TaskStatus.State.SUCCEEDED.toString()) ||
+                    status.equals(TaskStatus.State.FAILED.toString()) ||
+                    status.equals(TaskStatus.State.KILLED.toString())));
 
         // Successful Reduce Task Attempts should have valid SHUFFLE_FINISHED
         // time and SORT_FINISHED time
-        if (type.equals("REDUCE") && status.equals("SUCCESS")) {
-          time1 = attempt.get(Keys.SHUFFLE_FINISHED);
-          assertTrue("SHUFFLE_FINISHED time of task attempt " + id +
-                     " is in unexpected format:" + time1 +
-                     " in history file", isTimeValid(time1));
-          assertTrue("Reduce Task SHUFFLE_FINISHED time is < START_TIME " +
-                     "in history file", areTimesInOrder(time, time1));
-          time = attempt.get(Keys.SORT_FINISHED);
-          assertTrue("SORT_FINISHED of task attempt " + id +
-                     " is in unexpected format:" + time +
-                     " in history file", isTimeValid(time));
-          assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" +
-                     " in history file", areTimesInOrder(time1, time));
+        if (type.equals(TaskType.REDUCE) && 
+            status.equals(TaskStatus.State.SUCCEEDED.toString())) {
+          long shuffleFinishTime = attempt.getShuffleFinishTime();
+          assertTrue(startTime < shuffleFinishTime);
+          
+          long sortFinishTime = attempt.getSortFinishTime();
+          assertTrue(shuffleFinishTime < sortFinishTime);
         }
-        else if (type.equals("MAP") && status.equals("SUCCESS")) {
+        else if (type.equals(TaskType.MAP) && 
+            status.equals(TaskStatus.State.SUCCEEDED.toString())) {
           // Successful MAP Task Attempts should have valid MAP_FINISHED time
-          time1 = attempt.get(Keys.MAP_FINISHED);
-          assertTrue("MAP_FINISHED time of task attempt " + id +
-                     " is in unexpected format:" + time1 +
-                     " in history file", isTimeValid(time1));
-          assertTrue("MAP_FINISHED time of map task is < START_TIME " +
-                     "in history file", areTimesInOrder(time, time1));
+         long mapFinishTime = attempt.getMapFinishTime();
+         assertTrue(startTime < mapFinishTime);
         }
 
         // check if hostname is valid
-        String hostname = attempt.get(Keys.HOSTNAME);
+        String hostname = attempt.getHostname();
         Matcher m = hostNamePattern.matcher(hostname);
         assertTrue("Unexpected Host name of task attempt " + id, m.matches());
 
         // check if trackername is valid
-        String trackerName = attempt.get(Keys.TRACKER_NAME);
+        String trackerName = attempt.getTrackerName();
         m = trackerNamePattern.matcher(trackerName);
         assertTrue("Unexpected tracker name of task attempt " + id,
                    m.matches());
 
         if (!status.equals("KILLED")) {
           // check if http port is valid
-          String httpPort = attempt.get(Keys.HTTP_PORT);
-          m = digitsPattern.matcher(httpPort);
-          assertTrue("Unexpected http port of task attempt " + id, m.matches());
+          int httpPort = attempt.getHttpPort();
+          assertTrue(httpPort > 0);
         }
         
         // check if counters are parsable
-        String counters = attempt.get(Keys.COUNTERS);
-        try {
-          Counters readCounters = Counters.fromEscapedCompactString(counters);
-          assertTrue("Counters of task attempt " + id + " are not parsable",
-                     readCounters != null);
-        } catch (ParseException pe) {
-          LOG.warn("While trying to parse counters of task attempt " + id +
-                   ", " + pe);
-        }
+        Counters counters = attempt.getCounters();
+        assertNotNull(counters);
       }
     }
   }
@@ -443,9 +248,8 @@
     String parts[] = path.getName().split("_");
     //TODO this is a hack :(
     // jobtracker-hostname_jobtracker-identifier_
-    String id = parts[2] + "_" + parts[3] + "_" + parts[4];
-    String jobUniqueString = parts[0] + "_" + parts[1] + "_" +  id;
-    return new Path(dir, jobUniqueString + "_conf.xml");
+    String id = parts[0] + "_" + parts[1] + "_" + parts[2];
+    return new Path(dir, id + "_conf.xml");
   }
 
   /**
@@ -470,12 +274,13 @@
    * @param id job id
    * @param conf job conf
    */
-  static void validateJobHistoryFileFormat(JobID id, JobConf conf,
+  public static void validateJobHistoryFileFormat(JobHistory jobHistory,
+      JobID id, JobConf conf,
                  String status, boolean splitsCanBeEmpty) throws IOException  {
 
     // Get the history file name
-    Path dir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = getDoneFile(conf, id, dir);
+    Path dir = jobHistory.getCompletedJobHistoryLocation();
+    String logFileName = getDoneFile(jobHistory, conf, id, dir);
 
     // Framework history log file location
     Path logFile = new Path(dir, logFileName);
@@ -484,20 +289,12 @@
     // Check if the history file exists
     assertTrue("History file does not exist", fileSys.exists(logFile));
 
-
-    // check if the history file is parsable
-    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
-    		                                   logFileName).split("_");
-
-    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
-    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
-    TestListener l = new TestListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
-
+    JobHistoryParser parser = new JobHistoryParser(fileSys, 
+        logFile.toUri().getPath());
+    JobHistoryParser.JobInfo jobInfo = parser.parse();
 
     // validate format of job level key, values
-    validateJobLevelKeyValuesFormat(jobInfo.getValues(), status);
+    validateJobLevelKeyValuesFormat(jobInfo, status);
 
     // validate format of task level key, values
     validateTaskLevelKeyValuesFormat(jobInfo, splitsCanBeEmpty);
@@ -507,9 +304,10 @@
 
     // check if all the TaskAttempts, Tasks started are finished for
     // successful jobs
-    if (status.equals("SUCCESS")) {
+    if (status.equals("SUCCEEDED")) {
       // Make sure that the lists in taskIDsToAttemptIDs are empty.
-      for(Iterator<String> it = taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) {
+      for(Iterator<String> it = 
+        taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) {
         String taskid = it.next();
         assertTrue("There are some Tasks which are not finished in history " +
                    "file.", taskEnds.contains(taskid));
@@ -530,73 +328,68 @@
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
 
-    Map<Keys, String> values = jobInfo.getValues();
-
     assertTrue("SUBMIT_TIME of job obtained from history file did not " +
                "match the expected value", jip.getStartTime() ==
-               Long.parseLong(values.get(Keys.SUBMIT_TIME)));
+               jobInfo.getSubmitTime());
 
     assertTrue("LAUNCH_TIME of job obtained from history file did not " +
                "match the expected value", jip.getLaunchTime() ==
-               Long.parseLong(values.get(Keys.LAUNCH_TIME)));
+               jobInfo.getLaunchTime());
 
     assertTrue("FINISH_TIME of job obtained from history file did not " +
                "match the expected value", jip.getFinishTime() ==
-               Long.parseLong(values.get(Keys.FINISH_TIME)));
+               jobInfo.getFinishTime());
 
     assertTrue("Job Status of job obtained from history file did not " +
                "match the expected value",
-               values.get(Keys.JOB_STATUS).equals("SUCCESS"));
+               jobInfo.getJobStatus().equals("SUCCEEDED"));
 
     assertTrue("Job Priority of job obtained from history file did not " +
                "match the expected value", jip.getPriority().toString().equals(
-               values.get(Keys.JOB_PRIORITY)));
+               jobInfo.getPriority()));
 
     assertTrue("Job Name of job obtained from history file did not " +
-               "match the expected value", JobInfo.getJobName(conf).equals(
-               values.get(Keys.JOBNAME)));
+               "match the expected value", 
+               conf.getJobName().equals(
+               jobInfo.getJobname()));
 
     assertTrue("User Name of job obtained from history file did not " +
-               "match the expected value", JobInfo.getUserName(conf).equals(
-               values.get(Keys.USER)));
+               "match the expected value", 
+               conf.getUser().equals(
+               jobInfo.getUsername()));
 
     // Validate job counters
-    Counters c = jip.getCounters();
+    Counters c = new Counters(jip.getCounters());
+    Counters jiCounters = jobInfo.getTotalCounters();
     assertTrue("Counters of job obtained from history file did not " +
                "match the expected value",
-               c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS)));
+               c.equals(jiCounters));
 
     // Validate number of total maps, total reduces, finished maps,
     // finished reduces, failed maps, failed recudes
-    String totalMaps = values.get(Keys.TOTAL_MAPS);
     assertTrue("Unexpected number of total maps in history file",
-               Integer.parseInt(totalMaps) == jip.desiredMaps());
+               jobInfo.getTotalMaps() == jip.desiredMaps());
 
-    String totalReduces = values.get(Keys.TOTAL_REDUCES);
     assertTrue("Unexpected number of total reduces in history file",
-               Integer.parseInt(totalReduces) == jip.desiredReduces());
+               jobInfo.getTotalReduces() == jip.desiredReduces());
 
-    String finMaps = values.get(Keys.FINISHED_MAPS);
     assertTrue("Unexpected number of finished maps in history file",
-               Integer.parseInt(finMaps) == jip.finishedMaps());
+               jobInfo.getFinishedMaps() == jip.finishedMaps());
 
-    String finReduces = values.get(Keys.FINISHED_REDUCES);
     assertTrue("Unexpected number of finished reduces in history file",
-               Integer.parseInt(finReduces) == jip.finishedReduces());
+               jobInfo.getFinishedReduces() == jip.finishedReduces());
 
-    String failedMaps = values.get(Keys.FAILED_MAPS);
     assertTrue("Unexpected number of failed maps in history file",
-               Integer.parseInt(failedMaps) == jip.failedMapTasks);
+               jobInfo.getFailedMaps() == jip.failedMapTasks);
 
-    String failedReduces = values.get(Keys.FAILED_REDUCES);
     assertTrue("Unexpected number of failed reduces in history file",
-               Integer.parseInt(failedReduces) == jip.failedReduceTasks);
+               jobInfo.getFailedReduces() == jip.failedReduceTasks);
   }
 
   // Validate Task Level Keys, Values read from history file by
   // comparing them with the actual values from JT.
   private static void validateTaskLevelKeyValues(MiniMRCluster mr,
-                      RunningJob job, JobInfo jobInfo) throws IOException  {
+      RunningJob job, JobInfo jobInfo) throws IOException  {
 
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
@@ -606,7 +399,7 @@
     TaskID mapTaskId = new TaskID(job.getID(), TaskType.MAP, 0);
     TaskID reduceTaskId = new TaskID(job.getID(), TaskType.REDUCE, 0);
 
-    TaskInProgress cleanups[] = jip.getCleanupTasks();
+    TaskInProgress cleanups[] = jip.cleanup;
     TaskID cleanupTaskId;
     if (cleanups[0].isComplete()) {
       cleanupTaskId = cleanups[0].getTIPId();
@@ -615,7 +408,7 @@
       cleanupTaskId = cleanups[1].getTIPId();
     }
 
-    TaskInProgress setups[] = jip.getSetupTasks();
+    TaskInProgress setups[] = jip.setup;
     TaskID setupTaskId;
     if (setups[0].isComplete()) {
       setupTaskId = setups[0].getTIPId();
@@ -624,53 +417,45 @@
       setupTaskId = setups[1].getTIPId();
     }
 
-    Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
+    Map<TaskID, TaskInfo> tasks = jobInfo.getAllTasks();
 
-    // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce)
-    for (JobHistory.Task task : tasks.values()) {
+    // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce)    
 
-      String tid = task.get(Keys.TASKID);
-      if (tid.equals(mapTaskId.toString()) ||
-          tid.equals(reduceTaskId.toString()) ||
-          tid.equals(cleanupTaskId.toString()) ||
-          tid.equals(setupTaskId.toString())) {
-
-        TaskID taskId = null;
-        if (tid.equals(mapTaskId.toString())) {
-          taskId = mapTaskId;
-        }
-        else if (tid.equals(reduceTaskId.toString())) {
-          taskId = reduceTaskId;
-        }
-        else if (tid.equals(cleanupTaskId.toString())) {
-          taskId = cleanupTaskId;
-        }
-        else if (tid.equals(setupTaskId.toString())) {
-          taskId = setupTaskId;
-        }
-        TaskInProgress tip = jip.getTaskInProgress(taskId);
+    for (TaskInfo task : tasks.values()) {
+      TaskID tid = task.getTaskId();
+
+      if (tid.equals(mapTaskId) ||
+          tid.equals(reduceTaskId) ||
+          tid.equals(cleanupTaskId) ||
+          tid.equals(setupTaskId)) {
+
+        TaskInProgress tip = jip.getTaskInProgress
+        (org.apache.hadoop.mapred.TaskID.downgrade(tid));
         assertTrue("START_TIME of Task " + tid + " obtained from history " +
-             "file did not match the expected value", tip.getExecStartTime() ==
-             Long.parseLong(task.get(Keys.START_TIME)));
+            "file did not match the expected value", 
+            tip.getExecStartTime() ==
+              task.getStartTime());
 
         assertTrue("FINISH_TIME of Task " + tid + " obtained from history " +
-             "file did not match the expected value", tip.getExecFinishTime() ==
-             Long.parseLong(task.get(Keys.FINISH_TIME)));
+            "file did not match the expected value",
+            tip.getExecFinishTime() ==
+              task.getFinishTime());
 
-        if (taskId == mapTaskId) {//check splits only for map task
+        if (tid == mapTaskId) {//check splits only for map task
           assertTrue("Splits of Task " + tid + " obtained from history file " +
-                     " did not match the expected value",
-                     tip.getSplitNodes().equals(task.get(Keys.SPLITS)));
+              " did not match the expected value",
+              tip.getSplitNodes().equals(task.getSplitLocations()));
         }
 
         TaskAttemptID attemptId = tip.getSuccessfulTaskid();
-        TaskStatus ts = tip.getTaskStatus(attemptId);
+        TaskStatus ts = tip.getTaskStatus(
+            org.apache.hadoop.mapred.TaskAttemptID.downgrade(attemptId));
 
         // Validate task counters
-        Counters c = ts.getCounters();
+        Counters c = new Counters(ts.getCounters());
         assertTrue("Counters of Task " + tid + " obtained from history file " +
-                   " did not match the expected value",
-                  c.makeEscapedCompactString().equals(task.get(Keys.COUNTERS)));
+            " did not match the expected value",
+            c.equals(task.getCounters()));
       }
     }
   }
@@ -683,78 +468,83 @@
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
 
-    Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
+    Map<TaskID, TaskInfo> tasks = jobInfo.getAllTasks();
 
     // For each task
-    for (JobHistory.Task task : tasks.values()) {
+    for (TaskInfo task : tasks.values()) {
       // validate info of each attempt
-      for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
+      for (TaskAttemptInfo attempt : task.getAllTaskAttempts().values()) {
 
-        String idStr = attempt.get(Keys.TASK_ATTEMPT_ID);
-        TaskAttemptID attemptId = TaskAttemptID.forName(idStr);
+        TaskAttemptID attemptId = attempt.getAttemptId();
         TaskID tid = attemptId.getTaskID();
 
-        // Validate task id
-        assertTrue("Task id of Task Attempt " + idStr + " obtained from " +
-                   "history file did not match the expected value",
-                   tid.toString().equals(attempt.get(Keys.TASKID)));
-
-        TaskInProgress tip = jip.getTaskInProgress(tid);
-        TaskStatus ts = tip.getTaskStatus(attemptId);
+        TaskInProgress tip = jip.getTaskInProgress
+        (org.apache.hadoop.mapred.TaskID.downgrade(tid));
+        
+        TaskStatus ts = tip.getTaskStatus(
+            org.apache.hadoop.mapred.TaskAttemptID.downgrade(attemptId));
 
         // Validate task attempt start time
-        assertTrue("START_TIME of Task attempt " + idStr + " obtained from " +
-                   "history file did not match the expected value",
-            ts.getStartTime() == Long.parseLong(attempt.get(Keys.START_TIME)));
+        assertTrue("START_TIME of Task attempt " + attemptId +
+            " obtained from " +
+            "history file did not match the expected value",
+            ts.getStartTime() == attempt.getStartTime());
 
         // Validate task attempt finish time
-        assertTrue("FINISH_TIME of Task attempt " + idStr + " obtained from " +
-                   "history file did not match the expected value",
-            ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME)));
+        assertTrue("FINISH_TIME of Task attempt " + attemptId +
+                   " obtained from " +
+                   "history file " + ts.getFinishTime() + 
+                   " did not match the expected value, " +
+                   attempt.getFinishTime(),
+            ts.getFinishTime() == attempt.getFinishTime());
 
 
-        TaskTrackerStatus ttStatus = jt.getTaskTrackerStatus(ts.getTaskTracker());
+        TaskTrackerStatus ttStatus =
+          jt.getTaskTrackerStatus(ts.getTaskTracker());
 
         if (ttStatus != null) {
-          assertTrue("http port of task attempt " + idStr + " obtained from " +
+          assertTrue("http port of task attempt " + attemptId +
+                     " obtained from " +
                      "history file did not match the expected value",
                      ttStatus.getHttpPort() ==
-                     Integer.parseInt(attempt.get(Keys.HTTP_PORT)));
+                     attempt.getHttpPort());
 
-          if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
+          if (attempt.getTaskStatus().equals("SUCCEEDED")) {
             String ttHostname = jt.getNode(ttStatus.getHost()).toString();
 
             // check if hostname is valid
-            assertTrue("Host name of task attempt " + idStr + " obtained from" +
+            assertTrue("Host name of task attempt " + attemptId +
+                       " obtained from" +
                        " history file did not match the expected value",
-                       ttHostname.equals(attempt.get(Keys.HOSTNAME)));
+                       ttHostname.equals(attempt.getHostname()));
           }
         }
-        if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
+        if (attempt.getTaskStatus().equals("SUCCEEDED")) {
           // Validate SHUFFLE_FINISHED time and SORT_FINISHED time of
           // Reduce Task Attempts
-          if (attempt.get(Keys.TASK_TYPE).equals("REDUCE")) {
-            assertTrue("SHUFFLE_FINISHED time of task attempt " + idStr +
+          if (attempt.getTaskType().equals("REDUCE")) {
+            assertTrue("SHUFFLE_FINISHED time of task attempt " + attemptId +
                      " obtained from history file did not match the expected" +
                      " value", ts.getShuffleFinishTime() ==
-                     Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED)));
-            assertTrue("SORT_FINISHED time of task attempt " + idStr +
+                     attempt.getShuffleFinishTime());
+            assertTrue("SORT_FINISHED time of task attempt " + attemptId +
                      " obtained from history file did not match the expected" +
                      " value", ts.getSortFinishTime() ==
-                     Long.parseLong(attempt.get(Keys.SORT_FINISHED)));
+                     attempt.getSortFinishTime());
           }
 
           //Validate task counters
-          Counters c = ts.getCounters();
-          assertTrue("Counters of Task Attempt " + idStr + " obtained from " +
+          Counters c = new Counters(ts.getCounters());
+          assertTrue("Counters of Task Attempt " + attemptId + " obtained from " +
                      "history file did not match the expected value",
-               c.makeEscapedCompactString().equals(attempt.get(Keys.COUNTERS)));
+               c.equals(attempt.getCounters()));
         }
         
         // check if tracker name is valid
-        assertTrue("Tracker name of task attempt " + idStr + " obtained from " +
+        assertTrue("Tracker name of task attempt " + attemptId +
+                   " obtained from " +
                    "history file did not match the expected value",
-                   ts.getTaskTracker().equals(attempt.get(Keys.TRACKER_NAME)));
+                   ts.getTaskTracker().equals(attempt.getTrackerName()));
       }
     }
   }
@@ -766,13 +556,15 @@
    * @param job RunningJob object of the job whose history is to be validated
    * @param conf job conf
    */
-  static void validateJobHistoryFileContent(MiniMRCluster mr,
+  public static void validateJobHistoryFileContent(MiniMRCluster mr,
                               RunningJob job, JobConf conf) throws IOException  {
 
     JobID id = job.getID();
-    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+    JobHistory jobHistory = 
+      mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+    Path doneDir = jobHistory.getCompletedJobHistoryLocation();
     // Get the history file name
-    String logFileName = getDoneFile(conf, id, doneDir);
+    String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
 
     // Framework history log file location
     Path logFile = new Path(doneDir, logFileName);
@@ -781,18 +573,10 @@
     // Check if the history file exists
     assertTrue("History file does not exist", fileSys.exists(logFile));
 
-
-    // check if the history file is parsable
-    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
-    		                                   logFileName).split("_");
-
-    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
-    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
-    DefaultJobHistoryParser.JobTasksParseListener l =
-                   new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
-
+    JobHistoryParser parser = new JobHistoryParser(fileSys,
+        logFile.toUri().getPath());
+    
+    JobHistoryParser.JobInfo jobInfo = parser.parse();
     // Now the history file contents are available in jobInfo. Let us compare
     // them with the actual values from JT.
     validateJobLevelKeyValues(mr, job, jobInfo, conf);
@@ -800,7 +584,7 @@
     validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
   }
 
-  public void testDoneFolderOnHDFS() throws IOException {
+  public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
     MiniMRCluster mr = null;
     try {
       JobConf conf = new JobConf();
@@ -810,12 +594,42 @@
 
       //set the done folder location
       String doneFolder = "history_done";
-      conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+      conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
 
+      String logDir =
+        "file:///" + new File(System.getProperty("hadoop.log.dir")).
+        getAbsolutePath() + File.separator + "history";
+
+      Path logDirPath = new Path(logDir);
+      FileSystem logDirFs = logDirPath.getFileSystem(conf);
+      //there may be some stale files, clean them
+      if (logDirFs.exists(logDirPath)) {
+        boolean deleted = logDirFs.delete(logDirPath, true);
+        LOG.info(logDirPath + " deleted " + deleted);
+      }
+
+      logDirFs.mkdirs(logDirPath);
+      assertEquals("No of file in logDir not correct", 0,
+          logDirFs.listStatus(logDirPath).length);
+      logDirFs.create(new Path(logDirPath, "f1"));
+      logDirFs.create(new Path(logDirPath, "f2"));
+      assertEquals("No of file in logDir not correct", 2,
+          logDirFs.listStatus(logDirPath).length);
+      
       MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
       mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
           3, null, null, conf);
 
+      assertEquals("Files in logDir did not move to DONE folder",
+          0, logDirFs.listStatus(logDirPath).length);
+
+      JobHistory jobHistory = 
+        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+      Path doneDir = jobHistory.getCompletedJobHistoryLocation();
+
+      assertEquals("Files in DONE dir not correct",
+          2, doneDir.getFileSystem(conf).listStatus(doneDir).length);
+
       // run the TCs
       conf = mr.createJobConf();
 
@@ -836,15 +650,18 @@
       // Run a job that will be succeeded and validate its history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
       
-      Path doneDir = JobHistory.getCompletedJobHistoryLocation();
       assertEquals("History DONE folder not correct", 
           doneFolder, doneDir.getName());
       JobID id = job.getID();
-      String logFileName = getDoneFile(conf, id, doneDir);
+      String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
 
       // Framework history log file location
       Path logFile = new Path(doneDir, logFileName);
       FileSystem fileSys = logFile.getFileSystem(conf);
+
+      Cluster cluster = new Cluster(conf);
+      assertEquals("Client returned wrong history url", logFile.toString(), 
+          cluster.getJobHistoryUrl(id));
    
       // Check if the history file exists
       assertTrue("History file does not exist", fileSys.exists(logFile));
@@ -874,7 +691,8 @@
       assertFalse("Config for completed jobs not deleted from running folder", 
                   fileSys.exists(runningJobConfFilename));
 
-      validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
+      validateJobHistoryFileFormat(jobHistory,
+          job.getID(), conf, "SUCCEEDED", false);
       validateJobHistoryFileContent(mr, job, conf);
 
       // get the job conf filename
@@ -899,7 +717,7 @@
 
       //set the done folder location
       String doneFolder = TEST_ROOT_DIR + "history_done";
-      conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+      conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
       
       mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
 
@@ -922,12 +740,13 @@
 
       // Run a job that will be succeeded and validate its history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      
-      Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+      JobHistory jobHistory = 
+        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+      Path doneDir = jobHistory.getCompletedJobHistoryLocation();
       assertEquals("History DONE folder not correct", 
           doneFolder, doneDir.toString());
       JobID id = job.getID();
-      String logFileName = getDoneFile(conf, id, doneDir);
+      String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
 
       // Framework history log file location
       Path logFile = new Path(doneDir, logFileName);
@@ -941,14 +760,13 @@
       assertTrue("Config for completed jobs doesnt exist", 
                  fileSys.exists(confFile));
 
-      // check if the file exists in a done folder
+      // check if the conf file exists in a done folder
       assertTrue("Completed job config doesnt exist in the done folder", 
                  doneDir.getName().equals(confFile.getParent().getName()));
 
       // check if the file exists in a done folder
       assertTrue("Completed jobs doesnt exist in the done folder", 
                  doneDir.getName().equals(logFile.getParent().getName()));
-      
 
       // check if the job file is removed from the history location 
       Path runningJobsHistoryFolder = logFile.getParent().getParent();
@@ -961,11 +779,13 @@
       assertFalse("Config for completed jobs not deleted from running folder", 
                   fileSys.exists(runningJobConfFilename));
 
-      validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
+      validateJobHistoryFileFormat(jobHistory, job.getID(), conf, 
+          "SUCCEEDED", false);
       validateJobHistoryFileContent(mr, job, conf);
 
       // get the job conf filename
-      String name = JobHistory.JobInfo.getLocalJobFilePath(job.getID());
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      String name = jt.getLocalJobFilePath(job.getID());
       File file = new File(name);
 
       // check if the file get deleted
@@ -983,123 +803,21 @@
 
   //Returns the file in the done folder
   //Waits for sometime to get the file moved to done
-  private static String getDoneFile(JobConf conf, JobID id, 
+  private static String getDoneFile(JobHistory jobHistory, 
+      JobConf conf, JobID id, 
       Path doneDir) throws IOException {
     String name = null;
     for (int i = 0; name == null && i < 20; i++) {
-      name = JobHistory.JobInfo.getDoneJobHistoryFileName(conf, id);
+      Path path = JobHistory.getJobHistoryFile(
+          jobHistory.getCompletedJobHistoryLocation(), id, conf.getUser());
+      if (path.getFileSystem(conf).exists(path)) {
+        name = path.toString();
+      }
       UtilsForTests.waitFor(1000);
     }
+    assertNotNull("Job history file not created", name);
     return name;
   }
-  // Returns the output path where user history log file is written to with
-  // default configuration setting for hadoop.job.history.user.location
-  private static Path getLogLocationInOutputPath(String logFileName,
-                                                      JobConf conf) {
-    JobConf jobConf = new JobConf(true);//default JobConf
-    FileOutputFormat.setOutputPath(jobConf,
-                     FileOutputFormat.getOutputPath(conf));
-    return JobHistory.JobInfo.getJobHistoryLogLocationForUser(
-                                             logFileName, jobConf);
-  }
-
-  /**
-   * Checks if the user history file exists in the correct dir
-   * @param id job id
-   * @param conf job conf
-   */
-  static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) 
-          throws IOException  {
-    // Get the history file name
-    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = getDoneFile(conf, id, doneDir);
-
-    // User history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
-                                                     logFileName, conf);
-    if(logFile == null) {
-      // get the output path where history file is written to when
-      // hadoop.job.history.user.location is not set
-      logFile = getLogLocationInOutputPath(logFileName, conf);
-    }
-    FileSystem fileSys = null;
-    fileSys = logFile.getFileSystem(conf);
-
-    // Check if the user history file exists in the correct dir
-    if (conf.get("hadoop.job.history.user.location") == null) {
-      assertTrue("User log file " + logFile + " does not exist",
-                 fileSys.exists(logFile));
-    }
-    else if ("none".equals(conf.get("hadoop.job.history.user.location"))) {
-      // history file should not exist in the output path
-      assertFalse("Unexpected. User log file exists in output dir when " +
-                 "hadoop.job.history.user.location is set to \"none\"",
-                 fileSys.exists(logFile));
-    }
-    else {
-      //hadoop.job.history.user.location is set to a specific location.
-      // User log file should exist in that location
-      assertTrue("User log file " + logFile + " does not exist",
-                 fileSys.exists(logFile));
-
-      // User log file should not exist in output path.
-
-      // get the output path where history file is written to when
-      // hadoop.job.history.user.location is not set
-      Path logFile1 = getLogLocationInOutputPath(logFileName, conf);
-      
-      if (logFile != logFile1) {
-        fileSys = logFile1.getFileSystem(conf);
-        assertFalse("Unexpected. User log file exists in output dir when " +
-              "hadoop.job.history.user.location is set to a different location",
-              fileSys.exists(logFile1));
-      }
-    }
-  }
-
-  // Validate user history file location for the given values of
-  // hadoop.job.history.user.location as
-  // (1)null(default case), (2)"none", and (3)some user specified dir.
-  public void testJobHistoryUserLogLocation() throws IOException {
-    MiniMRCluster mr = null;
-    try {
-      mr = new MiniMRCluster(2, "file:///", 3);
-
-      // run the TCs
-      JobConf conf = mr.createJobConf();
-
-      FileSystem fs = FileSystem.get(conf);
-      // clean up
-      fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
-
-      Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input1");
-      Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output1");
-
-      // validate for the case of null(default)
-      RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryUserLogLocation(job.getID(), conf);
-
-      inDir = new Path(TEST_ROOT_DIR + "/succeed/input2");
-      outDir = new Path(TEST_ROOT_DIR + "/succeed/output2");
-      // validate for the case of "none"
-      conf.set("hadoop.job.history.user.location", "none");
-      job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryUserLogLocation(job.getID(), conf);
- 
-      inDir = new Path(TEST_ROOT_DIR + "/succeed/input3");
-      outDir = new Path(TEST_ROOT_DIR + "/succeed/output3");
-      // validate for the case of any dir
-      conf.set("hadoop.job.history.user.location", TEST_ROOT_DIR + "/succeed");
-      job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryUserLogLocation(job.getID(), conf);
-
-    } finally {
-      if (mr != null) {
-        cleanupLocalFiles(mr);
-        mr.shutdown();
-      }
-    }
-  }
 
   private void cleanupLocalFiles(MiniMRCluster mr) 
   throws IOException {
@@ -1108,7 +826,9 @@
     Path sysDir = new Path(jt.getSystemDir());
     FileSystem fs = sysDir.getFileSystem(conf);
     fs.delete(sysDir, true);
-    Path jobHistoryDir = JobHistory.getJobHistoryLocation();
+    Path jobHistoryDir = 
+      mr.getJobTrackerRunner().getJobTracker().getJobHistory().
+      getJobHistoryLocation();
     fs = jobHistoryDir.getFileSystem(conf);
     fs.delete(jobHistoryDir, true);
   }
@@ -1118,13 +838,13 @@
    * @param id job id
    * @param conf job conf
    */
-  private static void validateJobHistoryJobStatus(JobID id, JobConf conf,
-          String status) throws IOException  {
+  private static void validateJobHistoryJobStatus(JobHistory jobHistory,
+      JobID id, JobConf conf, String status) throws IOException  {
 
     // Get the history file name
-    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = getDoneFile(conf, id, doneDir);
-
+    Path doneDir = jobHistory.getCompletedJobHistoryLocation();
+    String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+    
     // Framework history log file location
     Path logFile = new Path(doneDir, logFileName);
     FileSystem fileSys = logFile.getFileSystem(conf);
@@ -1137,19 +857,13 @@
     fileSys.getFileStatus(logFile).getPermission().equals(
        new FsPermission(JobHistory.HISTORY_FILE_PERMISSION)));
     
-    // check if the history file is parsable
-    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
-    		                                   logFileName).split("_");
-
-    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
-    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
-    DefaultJobHistoryParser.JobTasksParseListener l =
-                  new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
+    JobHistoryParser parser = new JobHistoryParser(fileSys, 
+        logFile.toUri().getPath());
+    JobHistoryParser.JobInfo jobInfo = parser.parse();
+    
 
     assertTrue("Job Status read from job history file is not the expected" +
-         " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS)));
+         " status", status.equals(jobInfo.getJobStatus()));
   }
 
   // run jobs that will be (1) succeeded (2) failed (3) killed
@@ -1172,22 +886,86 @@
       // Run a job that will be succeeded and validate its job status
       // existing in history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS");
-      long historyCleanerRanAt = JobHistory.HistoryCleaner.getLastRan();
-      assertTrue(historyCleanerRanAt != 0);
+      
+      JobHistory jobHistory = 
+        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+      validateJobHistoryJobStatus(jobHistory, job.getID(), conf, 
+          JobStatus.getJobRunState(JobStatus.SUCCEEDED));
       
       // Run a job that will be failed and validate its job status
       // existing in history file
       job = UtilsForTests.runJobFail(conf, inDir, outDir);
-      validateJobHistoryJobStatus(job.getID(), conf, "FAILED");
-      assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
+      validateJobHistoryJobStatus(jobHistory, job.getID(), conf, 
+          JobStatus.getJobRunState(JobStatus.FAILED));
       
       // Run a job that will be killed and validate its job status
       // existing in history file
       job = UtilsForTests.runJobKill(conf, inDir, outDir);
-      validateJobHistoryJobStatus(job.getID(), conf, "KILLED");
-      assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
+      validateJobHistoryJobStatus(jobHistory, job.getID(), conf,
+          JobStatus.getJobRunState(JobStatus.KILLED));
+      
+    } finally {
+      if (mr != null) {
+        cleanupLocalFiles(mr);
+        mr.shutdown();
+      }
+    }
+  }
+
+  public void testHistoryInitWithCorruptFiles() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      JobConf conf = new JobConf();
+      Path historyDir = new Path(System.getProperty("test.build.data", "."),
+      "history");
+      conf.set(JTConfig.JT_JOBHISTORY_LOCATION,
+          historyDir.toString());
+      conf.setUser("user");
+
+      FileSystem localFs = FileSystem.getLocal(conf);
       
+      //there may be some stale files, clean them
+      if (localFs.exists(historyDir)) {
+        boolean deleted = localFs.delete(historyDir, true);
+        LOG.info(historyDir + " deleted " + deleted);
+      }
+
+      // Start the cluster, create a history file
+      mr = new MiniMRCluster(0, "file:///", 3, null, null, conf);
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      JobHistory jh = jt.getJobHistory();
+      final JobID jobId = JobID.forName("job_200809171136_0001");
+      jh.setupEventWriter(jobId, conf);
+      JobSubmittedEvent jse =
+        new JobSubmittedEvent(jobId, "job", "user", 12345, "path");
+      jh.logEvent(jse, jobId);
+      jh.closeWriter(jobId);
+
+      // Corrupt the history file. User RawLocalFileSystem so that we
+      // do keep the original CRC file intact.
+      String historyFileName = jobId.toString() + "_" + "user";
+      Path historyFilePath = new Path (historyDir.toString(), historyFileName);
+
+      RawLocalFileSystem fs = (RawLocalFileSystem)
+        FileSystem.getLocal(conf).getRaw();
+
+      FSDataOutputStream out = fs.create(historyFilePath, true);
+      byte[] corruptData = new byte[32];
+      new Random().nextBytes(corruptData);
+      out.write (corruptData, 0, 32);
+      out.close();
+
+      // Stop and start the tracker. The tracker should come up nicely
+      mr.stopJobTracker();
+      mr.startJobTracker();
+      jt = mr.getJobTrackerRunner().getJobTracker();
+      assertNotNull("JobTracker did not come up", jt );
+      jh = jt.getJobHistory();
+      assertNotNull("JobHistory did not get initialized correctly", jh);
+
+      // Only the done folder should remain in the history directory
+      assertEquals("Files in logDir did not move to DONE folder",
+          1, historyDir.getFileSystem(conf).listStatus(historyDir).length);
     } finally {
       if (mr != null) {
         cleanupLocalFiles(mr);