You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [16/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java Sat Nov 28 20:26:01 2009
@@ -27,7 +27,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,6 +41,8 @@
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Level;
@@ -108,12 +110,12 @@
   private static final Log LOG = LogFactory.getLog(JobConf.class);
 
   static{
-    Configuration.addDefaultResource("mapred-default.xml");
-    Configuration.addDefaultResource("mapred-site.xml");
+    ConfigUtil.loadResources();
   }
 
   /**
-   * @deprecated
+   * @deprecated Use {@link #MAPRED_JOB_MAP_MEMORY_MB_PROPERTY} and
+   * {@link #MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY}
    */
   @Deprecated
   public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
@@ -145,18 +147,23 @@
    * indicates that the options are turned off.
    */
   public static final long DISABLED_MEMORY_LIMIT = -1L;
-  
+
+  /**
+   * Property name for the configuration property mapreduce.cluster.local.dir
+   */
+  public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
+
   /**
    * Name of the queue to which jobs will be submitted, if no queue
    * name is mentioned.
    */
   public static final String DEFAULT_QUEUE_NAME = "default";
 
-  static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
-      "mapred.job.map.memory.mb";
+  static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 
+      JobContext.MAP_MEMORY_MB;
 
   static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
-      "mapred.job.reduce.memory.mb";
+    JobContext.REDUCE_MEMORY_MB;
 
   /**
    * Configuration key to set the java command line options for the child
@@ -200,7 +207,7 @@
    * other environment variables to the map processes.
    */
   public static final String MAPRED_MAP_TASK_JAVA_OPTS = 
-    "mapred.map.child.java.opts";
+    JobContext.MAP_JAVA_OPTS;
   
   /**
    * Configuration key to set the java command line options for the reduce tasks.
@@ -220,7 +227,7 @@
    * pass process environment variables to the reduce processes.
    */
   public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
-    "mapred.reduce.child.java.opts";
+    JobContext.REDUCE_JAVA_OPTS;
   
   public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
   
@@ -244,7 +251,7 @@
    * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
    *       via {@link #MAPRED_MAP_TASK_JAVA_OPTS}, else the VM might not start.
    */
-  public static final String MAPRED_MAP_TASK_ULIMIT = "mapred.map.child.ulimit";
+  public static final String MAPRED_MAP_TASK_ULIMIT = JobContext.MAP_ULIMIT;
   
   /**
    * Configuration key to set the maximum virutal memory available to the
@@ -253,8 +260,9 @@
    * Note: This must be greater than or equal to the -Xmx passed to the JavaVM
    *       via {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}, else the VM might not start.
    */
-  public static final String MAPRED_REDUCE_TASK_ULIMIT =
-    "mapred.reduce.child.ulimit";
+  public static final String MAPRED_REDUCE_TASK_ULIMIT = 
+    JobContext.REDUCE_ULIMIT;
+
 
   /**
    * Configuration key to set the environment of the child map/reduce tasks.
@@ -287,7 +295,7 @@
    *   <li> B=$X:c This is inherit tasktracker's X env variable. </li>
    * </ul>
    */
-  public static final String MAPRED_MAP_TASK_ENV = "mapred.map.child.env";
+  public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
   
   /**
    * Configuration key to set the maximum virutal memory available to the
@@ -302,8 +310,7 @@
    *   <li> B=$X:c This is inherit tasktracker's X env variable. </li>
    * </ul>
    */
-  public static final String MAPRED_REDUCE_TASK_ENV =
-    "mapred.reduce.child.env";
+  public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
 
   /**
    * Configuration key to set the logging {@link Level} for the map task.
@@ -312,7 +319,7 @@
    * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
    */
   public static final String MAPRED_MAP_TASK_LOG_LEVEL = 
-    "mapred.map.child.log.level";
+    JobContext.MAP_LOG_LEVEL;
   
   /**
    * Configuration key to set the logging {@link Level} for the reduce task.
@@ -321,7 +328,7 @@
    * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
    */
   public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 
-    "mapred.reduce.child.log.level";
+    JobContext.REDUCE_LOG_LEVEL;
   
   /**
    * Default logging level for map/reduce tasks.
@@ -403,14 +410,14 @@
    * 
    * @return the user jar for the map-reduce job.
    */
-  public String getJar() { return get("mapred.jar"); }
+  public String getJar() { return get(JobContext.JAR); }
   
   /**
    * Set the user jar for the map-reduce job.
    * 
    * @param jar the user jar for the map-reduce job.
    */
-  public void setJar(String jar) { set("mapred.jar", jar); }
+  public void setJar(String jar) { set(JobContext.JAR, jar); }
   
   /**
    * Set the job's jar file by finding an example class location.
@@ -425,7 +432,7 @@
   }
 
   public String[] getLocalDirs() throws IOException {
-    return getStrings("mapred.local.dir");
+    return getStrings(MRConfig.LOCAL_DIR);
   }
 
   public void deleteLocalFiles() throws IOException {
@@ -447,7 +454,7 @@
    * local directories.
    */
   public Path getLocalPath(String pathString) throws IOException {
-    return getLocalPath("mapred.local.dir", pathString);
+    return getLocalPath(MRConfig.LOCAL_DIR, pathString);
   }
 
   /**
@@ -456,7 +463,7 @@
    * @return the username
    */
   public String getUser() {
-    return get("user.name");
+    return get(JobContext.USER_NAME);
   }
   
   /**
@@ -465,7 +472,7 @@
    * @param user the username for this job.
    */
   public void setUser(String user) {
-    set("user.name", user);
+    set(JobContext.USER_NAME, user);
   }
 
 
@@ -479,7 +486,7 @@
    * 
    */
   public void setKeepFailedTaskFiles(boolean keep) {
-    setBoolean("keep.failed.task.files", keep);
+    setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
   }
   
   /**
@@ -488,7 +495,7 @@
    * @return should the files be kept?
    */
   public boolean getKeepFailedTaskFiles() {
-    return getBoolean("keep.failed.task.files", false);
+    return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
   }
   
   /**
@@ -500,7 +507,7 @@
    *        task names.
    */
   public void setKeepTaskFilesPattern(String pattern) {
-    set("keep.task.files.pattern", pattern);
+    set(JobContext.PRESERVE_FILES_PATTERN, pattern);
   }
   
   /**
@@ -510,7 +517,7 @@
    * @return the pattern as a string, if it was set, othewise null.
    */
   public String getKeepTaskFilesPattern() {
-    return get("keep.task.files.pattern");
+    return get(JobContext.PRESERVE_FILES_PATTERN);
   }
   
   /**
@@ -520,7 +527,7 @@
    */
   public void setWorkingDirectory(Path dir) {
     dir = new Path(getWorkingDirectory(), dir);
-    set("mapred.working.dir", dir.toString());
+    set(JobContext.WORKING_DIR, dir.toString());
   }
   
   /**
@@ -529,13 +536,13 @@
    * @return the directory name.
    */
   public Path getWorkingDirectory() {
-    String name = get("mapred.working.dir");
+    String name = get(JobContext.WORKING_DIR);
     if (name != null) {
       return new Path(name);
     } else {
       try {
         Path dir = FileSystem.get(this).getWorkingDirectory();
-        set("mapred.working.dir", dir.toString());
+        set(JobContext.WORKING_DIR, dir.toString());
         return dir;
       } catch (IOException e) {
         throw new RuntimeException(e);
@@ -550,14 +557,14 @@
    * -1 signifies no limit
    */
   public void setNumTasksToExecutePerJvm(int numTasks) {
-    setInt("mapred.job.reuse.jvm.num.tasks", numTasks);
+    setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
   }
   
   /**
    * Get the number of tasks that a spawned JVM should execute
    */
   public int getNumTasksToExecutePerJvm() {
-    return getInt("mapred.job.reuse.jvm.num.tasks", 1);
+    return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
   }
   
   /**
@@ -635,7 +642,7 @@
    * @param compress should the map outputs be compressed?
    */
   public void setCompressMapOutput(boolean compress) {
-    setBoolean("mapred.compress.map.output", compress);
+    setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
   }
   
   /**
@@ -645,7 +652,7 @@
    *         <code>false</code> otherwise.
    */
   public boolean getCompressMapOutput() {
-    return getBoolean("mapred.compress.map.output", false);
+    return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
   }
 
   /**
@@ -657,7 +664,7 @@
   public void 
   setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
     setCompressMapOutput(true);
-    setClass("mapred.map.output.compression.codec", codecClass, 
+    setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 
              CompressionCodec.class);
   }
   
@@ -672,7 +679,7 @@
   public Class<? extends CompressionCodec> 
   getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
     Class<? extends CompressionCodec> codecClass = defaultValue;
-    String name = get("mapred.map.output.compression.codec");
+    String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
     if (name != null) {
       try {
         codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
@@ -692,7 +699,7 @@
    * @return the map output key class.
    */
   public Class<?> getMapOutputKeyClass() {
-    Class<?> retv = getClass("mapred.mapoutput.key.class", null, Object.class);
+    Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
     if (retv == null) {
       retv = getOutputKeyClass();
     }
@@ -707,7 +714,7 @@
    * @param theClass the map output key class.
    */
   public void setMapOutputKeyClass(Class<?> theClass) {
-    setClass("mapred.mapoutput.key.class", theClass, Object.class);
+    setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
   }
   
   /**
@@ -718,7 +725,7 @@
    * @return the map output value class.
    */
   public Class<?> getMapOutputValueClass() {
-    Class<?> retv = getClass("mapred.mapoutput.value.class", null,
+    Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
         Object.class);
     if (retv == null) {
       retv = getOutputValueClass();
@@ -734,7 +741,7 @@
    * @param theClass the map output value class.
    */
   public void setMapOutputValueClass(Class<?> theClass) {
-    setClass("mapred.mapoutput.value.class", theClass, Object.class);
+    setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
   }
   
   /**
@@ -743,7 +750,7 @@
    * @return the key class for the job output data.
    */
   public Class<?> getOutputKeyClass() {
-    return getClass("mapred.output.key.class",
+    return getClass(JobContext.OUTPUT_KEY_CLASS,
                     LongWritable.class, Object.class);
   }
   
@@ -753,7 +760,7 @@
    * @param theClass the key class for the job output data.
    */
   public void setOutputKeyClass(Class<?> theClass) {
-    setClass("mapred.output.key.class", theClass, Object.class);
+    setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
   }
 
   /**
@@ -762,8 +769,8 @@
    * @return the {@link RawComparator} comparator used to compare keys.
    */
   public RawComparator getOutputKeyComparator() {
-    Class<? extends RawComparator> theClass = getClass("mapred.output.key.comparator.class",
-	        null, RawComparator.class);
+    Class<? extends RawComparator> theClass = getClass(
+      JobContext.KEY_COMPARATOR, null, RawComparator.class);
     if (theClass != null)
       return ReflectionUtils.newInstance(theClass, this);
     return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
@@ -777,7 +784,7 @@
    * @see #setOutputValueGroupingComparator(Class)                 
    */
   public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
-    setClass("mapred.output.key.comparator.class",
+    setClass(JobContext.KEY_COMPARATOR,
              theClass, RawComparator.class);
   }
 
@@ -798,14 +805,14 @@
    */
   public void setKeyFieldComparatorOptions(String keySpec) {
     setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
-    set("mapred.text.key.comparator.options", keySpec);
+    set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
   }
   
   /**
    * Get the {@link KeyFieldBasedComparator} options
    */
   public String getKeyFieldComparatorOption() {
-    return get("mapred.text.key.comparator.options");
+    return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
   }
 
   /**
@@ -823,14 +830,14 @@
    */
   public void setKeyFieldPartitionerOptions(String keySpec) {
     setPartitionerClass(KeyFieldBasedPartitioner.class);
-    set("mapred.text.key.partitioner.options", keySpec);
+    set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
   }
   
   /**
    * Get the {@link KeyFieldBasedPartitioner} options
    */
   public String getKeyFieldPartitionerOption() {
-    return get("mapred.text.key.partitioner.options");
+    return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
   }
 
   /** 
@@ -841,8 +848,8 @@
    * @see #setOutputValueGroupingComparator(Class) for details.  
    */
   public RawComparator getOutputValueGroupingComparator() {
-    Class<? extends RawComparator> theClass = getClass("mapred.output.value.groupfn.class", null,
-        RawComparator.class);
+    Class<? extends RawComparator> theClass = getClass(
+      JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
     if (theClass == null) {
       return getOutputKeyComparator();
     }
@@ -876,8 +883,8 @@
    * @see #setOutputKeyComparatorClass(Class)                 
    */
   public void setOutputValueGroupingComparator(
-		  Class<? extends RawComparator> theClass) {
-    setClass("mapred.output.value.groupfn.class",
+      Class<? extends RawComparator> theClass) {
+    setClass(JobContext.GROUP_COMPARATOR_CLASS,
              theClass, RawComparator.class);
   }
 
@@ -921,7 +928,7 @@
    * @return the value class for job outputs.
    */
   public Class<?> getOutputValueClass() {
-    return getClass("mapred.output.value.class", Text.class, Object.class);
+    return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
   }
   
   /**
@@ -930,7 +937,7 @@
    * @param theClass the value class for job outputs.
    */
   public void setOutputValueClass(Class<?> theClass) {
-    setClass("mapred.output.value.class", theClass, Object.class);
+    setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
   }
 
   /**
@@ -1081,7 +1088,7 @@
    *         <code>false</code> otherwise.
    */
   public boolean getMapSpeculativeExecution() { 
-    return getBoolean("mapred.map.tasks.speculative.execution", true);
+    return getBoolean(JobContext.MAP_SPECULATIVE, true);
   }
   
   /**
@@ -1092,7 +1099,7 @@
    *                             else <code>false</code>.
    */
   public void setMapSpeculativeExecution(boolean speculativeExecution) {
-    setBoolean("mapred.map.tasks.speculative.execution", speculativeExecution);
+    setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
   }
 
   /**
@@ -1104,7 +1111,7 @@
    *         <code>false</code> otherwise.
    */
   public boolean getReduceSpeculativeExecution() { 
-    return getBoolean("mapred.reduce.tasks.speculative.execution", true);
+    return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
   }
   
   /**
@@ -1115,7 +1122,7 @@
    *                             else <code>false</code>.
    */
   public void setReduceSpeculativeExecution(boolean speculativeExecution) {
-    setBoolean("mapred.reduce.tasks.speculative.execution", 
+    setBoolean(JobContext.REDUCE_SPECULATIVE, 
                speculativeExecution);
   }
 
@@ -1125,7 +1132,7 @@
    * 
    * @return the number of reduce tasks for this job.
    */
-  public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }
+  public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
   
   /**
    * Set the number of map tasks for this job.
@@ -1152,8 +1159,8 @@
    * bytes, of input files. However, the {@link FileSystem} blocksize of the 
    * input files is treated as an upper bound for input splits. A lower bound 
    * on the split size can be set via 
-   * <a href="{@docRoot}/../mapred-default.html#mapred.min.split.size">
-   * mapred.min.split.size</a>.</p>
+   * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
+   * mapreduce.input.fileinputformat.split.minsize</a>.</p>
    *  
    * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
    * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
@@ -1165,7 +1172,7 @@
    * @see FileSystem#getDefaultBlockSize()
    * @see FileStatus#getBlockSize()
    */
-  public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); }
+  public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
 
   /**
    * Get configured the number of reduce tasks for this job. Defaults to 
@@ -1173,7 +1180,7 @@
    * 
    * @return the number of reduce tasks for this job.
    */
-  public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); }
+  public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
   
   /**
    * Set the requisite number of reduce tasks for this job.
@@ -1182,8 +1189,8 @@
    * 
    * <p>The right number of reduces seems to be <code>0.95</code> or 
    * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
-   * <a href="{@docRoot}/../mapred-default.html#mapred.tasktracker.reduce.tasks.maximum">
-   * mapred.tasktracker.reduce.tasks.maximum</a>).
+   * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
+   * mapreduce.tasktracker.reduce.tasks.maximum</a>).
    * </p>
    * 
    * <p>With <code>0.95</code> all of the reduces can launch immediately and 
@@ -1209,17 +1216,17 @@
    * 
    * @param n the number of reduce tasks for this job.
    */
-  public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); }
+  public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
   
   /** 
    * Get the configured number of maximum attempts that will be made to run a
-   * map task, as specified by the <code>mapred.map.max.attempts</code>
+   * map task, as specified by the <code>mapreduce.map.maxattempts</code>
    * property. If this property is not already set, the default is 4 attempts.
    *  
    * @return the max number of attempts per map task.
    */
   public int getMaxMapAttempts() {
-    return getInt("mapred.map.max.attempts", 4);
+    return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
   }
   
   /** 
@@ -1229,18 +1236,18 @@
    * @param n the number of attempts per map task.
    */
   public void setMaxMapAttempts(int n) {
-    setInt("mapred.map.max.attempts", n);
+    setInt(JobContext.MAP_MAX_ATTEMPTS, n);
   }
 
   /** 
    * Get the configured number of maximum attempts  that will be made to run a
-   * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+   * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
    * property. If this property is not already set, the default is 4 attempts.
    * 
    * @return the max number of attempts per reduce task.
    */
   public int getMaxReduceAttempts() {
-    return getInt("mapred.reduce.max.attempts", 4);
+    return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
   }
   /** 
    * Expert: Set the number of maximum attempts that will be made to run a
@@ -1249,7 +1256,7 @@
    * @param n the number of attempts per reduce task.
    */
   public void setMaxReduceAttempts(int n) {
-    setInt("mapred.reduce.max.attempts", n);
+    setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
   }
   
   /**
@@ -1259,7 +1266,7 @@
    * @return the job's name, defaulting to "".
    */
   public String getJobName() {
-    return get("mapred.job.name", "");
+    return get(JobContext.JOB_NAME, "");
   }
   
   /**
@@ -1268,7 +1275,7 @@
    * @param name the job's new name.
    */
   public void setJobName(String name) {
-    set("mapred.job.name", name);
+    set(JobContext.JOB_NAME, name);
   }
   
   /**
@@ -1286,6 +1293,7 @@
    *
    * @return the session identifier, defaulting to "".
    */
+  @Deprecated
   public String getSessionId() {
       return get("session.id", "");
   }
@@ -1295,6 +1303,7 @@
    *
    * @param sessionId the new session id.
    */
+  @Deprecated
   public void setSessionId(String sessionId) {
       set("session.id", sessionId);
   }
@@ -1307,7 +1316,7 @@
    * @param noFailures maximum no. of failures of a given job per tasktracker.
    */
   public void setMaxTaskFailuresPerTracker(int noFailures) {
-    setInt("mapred.max.tracker.failures", noFailures);
+    setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
   }
   
   /**
@@ -1318,7 +1327,7 @@
    * @return the maximum no. of failures of a given job per tasktracker.
    */
   public int getMaxTaskFailuresPerTracker() {
-    return getInt("mapred.max.tracker.failures", 4); 
+    return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 4); 
   }
 
   /**
@@ -1335,7 +1344,7 @@
    *         the job being aborted.
    */
   public int getMaxMapTaskFailuresPercent() {
-    return getInt("mapred.max.map.failures.percent", 0);
+    return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
   }
 
   /**
@@ -1349,7 +1358,7 @@
    *                the job being aborted.
    */
   public void setMaxMapTaskFailuresPercent(int percent) {
-    setInt("mapred.max.map.failures.percent", percent);
+    setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
   }
   
   /**
@@ -1366,7 +1375,7 @@
    *         the job being aborted.
    */
   public int getMaxReduceTaskFailuresPercent() {
-    return getInt("mapred.max.reduce.failures.percent", 0);
+    return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
   }
   
   /**
@@ -1380,7 +1389,7 @@
    *                the job being aborted.
    */
   public void setMaxReduceTaskFailuresPercent(int percent) {
-    setInt("mapred.max.reduce.failures.percent", percent);
+    setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
   }
   
   /**
@@ -1389,7 +1398,7 @@
    * @param prio the {@link JobPriority} for this job.
    */
   public void setJobPriority(JobPriority prio) {
-    set("mapred.job.priority", prio.toString());
+    set(JobContext.PRIORITY, prio.toString());
   }
   
   /**
@@ -1398,7 +1407,7 @@
    * @return the {@link JobPriority} for this job.
    */
   public JobPriority getJobPriority() {
-    String prio = get("mapred.job.priority");
+    String prio = get(JobContext.PRIORITY);
     if(prio == null) {
       return JobPriority.NORMAL;
     }
@@ -1411,7 +1420,7 @@
    * @return true if some tasks will be profiled
    */
   public boolean getProfileEnabled() {
-    return getBoolean("mapred.task.profile", false);
+    return getBoolean(JobContext.TASK_PROFILE, false);
   }
 
   /**
@@ -1421,7 +1430,7 @@
    * @param newValue true means it should be gathered
    */
   public void setProfileEnabled(boolean newValue) {
-    setBoolean("mapred.task.profile", newValue);
+    setBoolean(JobContext.TASK_PROFILE, newValue);
   }
 
   /**
@@ -1433,7 +1442,7 @@
    * @return the parameters to pass to the task child to configure profiling
    */
   public String getProfileParams() {
-    return get("mapred.task.profile.params",
+    return get(JobContext.TASK_PROFILE_PARAMS,
                "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
                  "verbose=n,file=%s");
   }
@@ -1448,7 +1457,7 @@
    * @param value the configuration string
    */
   public void setProfileParams(String value) {
-    set("mapred.task.profile.params", value);
+    set(JobContext.TASK_PROFILE_PARAMS, value);
   }
 
   /**
@@ -1457,8 +1466,8 @@
    * @return the task ranges
    */
   public IntegerRanges getProfileTaskRange(boolean isMap) {
-    return getRange((isMap ? "mapred.task.profile.maps" : 
-                       "mapred.task.profile.reduces"), "0-2");
+    return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 
+                       JobContext.NUM_REDUCE_PROFILES), "0-2");
   }
 
   /**
@@ -1468,9 +1477,9 @@
    */
   public void setProfileTaskRange(boolean isMap, String newValue) {
     // parse the value to make sure it is legal
-    new Configuration.IntegerRanges(newValue);
-    set((isMap ? "mapred.task.profile.maps" : "mapred.task.profile.reduces"), 
-        newValue);
+      new Configuration.IntegerRanges(newValue);
+    set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 
+          newValue);
   }
 
   /**
@@ -1497,7 +1506,7 @@
    * @param mDbgScript the script name
    */
   public void  setMapDebugScript(String mDbgScript) {
-    set("mapred.map.task.debug.script", mDbgScript);
+    set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
   }
   
   /**
@@ -1507,7 +1516,7 @@
    * @see #setMapDebugScript(String)
    */
   public String getMapDebugScript() {
-    return get("mapred.map.task.debug.script");
+    return get(JobContext.MAP_DEBUG_SCRIPT);
   }
   
   /**
@@ -1534,7 +1543,7 @@
    * @param rDbgScript the script name
    */
   public void  setReduceDebugScript(String rDbgScript) {
-    set("mapred.reduce.task.debug.script", rDbgScript);
+    set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
   }
   
   /**
@@ -1544,7 +1553,7 @@
    * @see #setReduceDebugScript(String)
    */
   public String getReduceDebugScript() {
-    return get("mapred.reduce.task.debug.script");
+    return get(JobContext.REDUCE_DEBUG_SCRIPT);
   }
 
   /**
@@ -1556,7 +1565,7 @@
    * @see #setJobEndNotificationURI(String)
    */
   public String getJobEndNotificationURI() {
-    return get("job.end.notification.url");
+    return get(JobContext.END_NOTIFICATION_URL);
   }
 
   /**
@@ -1576,7 +1585,7 @@
    *       JobCompletionAndChaining">Job Completion and Chaining</a>
    */
   public void setJobEndNotificationURI(String uri) {
-    set("job.end.notification.url", uri);
+    set(JobContext.END_NOTIFICATION_URL, uri);
   }
 
   /**
@@ -1585,9 +1594,9 @@
    * <p>
    * When a job starts, a shared directory is created at location
    * <code>
-   * ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </code>.
+   * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
    * This directory is exposed to the users through 
-   * <code>job.local.dir </code>.
+   * <code>mapreduce.job.local.dir </code>.
    * So, the tasks can use this space 
    * as scratch space and share files among them. </p>
    * This value is available as System property also.
@@ -1595,35 +1604,71 @@
    * @return The localized job specific shared directory
    */
   public String getJobLocalDir() {
-    return get(TaskTracker.JOB_LOCAL_DIR);
+    return get(JobContext.JOB_LOCAL_DIR);
   }
 
+  /**
+   * Get memory required to run a map task of the job, in MB.
+   * 
+   * If a value is specified in the configuration, it is returned.
+   * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
+   * <p/>
+   * For backward compatibility, if the job configuration sets the
+   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
+   * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
+   * after converting it from bytes to MB.
+   * @return memory required to run a map task of the job, in MB,
+   *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
+   */
   public long getMemoryForMapTask() {
-    if (get(MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
-      long val = getLong(
-        MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
-      return (val == DISABLED_MEMORY_LIMIT) ? val :
-        ((val < 0) ? DISABLED_MEMORY_LIMIT : val / (1024 * 1024));
+    long value = getDeprecatedMemoryValue();
+    if (value == DISABLED_MEMORY_LIMIT) {
+      value = normalizeMemoryConfigValue(
+                getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
+                          DISABLED_MEMORY_LIMIT));
     }
-    return getLong(
-      JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
-      DISABLED_MEMORY_LIMIT);
+    return value;
   }
 
   public void setMemoryForMapTask(long mem) {
     setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
   }
 
+  /**
+   * Get memory required to run a reduce task of the job, in MB.
+   * 
+   * If a value is specified in the configuration, it is returned.
+   * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
+   * <p/>
+   * For backward compatibility, if the job configuration sets the
+   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
+   * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
+   * after converting it from bytes to MB.
+   * @return memory required to run a reduce task of the job, in MB,
+   *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
+   */
   public long getMemoryForReduceTask() {
-    if (get(MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
-      long val = getLong(
-        MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
-      return (val == DISABLED_MEMORY_LIMIT) ? val :
-        ((val < 0) ? DISABLED_MEMORY_LIMIT : val / (1024 * 1024));
+    long value = getDeprecatedMemoryValue();
+    if (value == DISABLED_MEMORY_LIMIT) {
+      value = normalizeMemoryConfigValue(
+                getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
+                        DISABLED_MEMORY_LIMIT));
     }
-    return getLong(
-      JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
-      DISABLED_MEMORY_LIMIT);
+    return value;
+  }
+  
+  // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
+  // converted into MBs.
+  // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
+  // value.
+  private long getDeprecatedMemoryValue() {
+    long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 
+        DISABLED_MEMORY_LIMIT);
+    oldValue = normalizeMemoryConfigValue(oldValue);
+    if (oldValue != DISABLED_MEMORY_LIMIT) {
+      oldValue /= (1024*1024);
+    }
+    return oldValue;
   }
 
   public void setMemoryForReduceTask(long mem) {
@@ -1637,7 +1682,7 @@
    * @return name of the queue
    */
   public String getQueueName() {
-    return get("mapred.job.queue.name", DEFAULT_QUEUE_NAME);
+    return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
   }
   
   /**
@@ -1646,7 +1691,7 @@
    * @param queueName Name of the queue
    */
   public void setQueueName(String queueName) {
-    set("mapred.job.queue.name", queueName);
+    set(JobContext.QUEUE_NAME, queueName);
   }
   
   /**
@@ -1728,18 +1773,21 @@
 
 
   /**
-   * The maximum amount of memory any task of this job will use. See
+   * Get the memory required to run a task of this job, in bytes. See
    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
    * <p/>
-   * mapred.task.maxvmem is split into
-   * mapred.job.map.memory.mb
-   * and mapred.job.map.memory.mb,mapred
-   * each of the new key are set
-   * as mapred.task.maxvmem / 1024
-   * as new values are in MB
+   * This method is deprecated. Now, different memory limits can be
+   * set for map and reduce tasks of a job, in MB. 
+   * <p/>
+   * For backward compatibility, if the job configuration sets the
+   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
+   * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 
+   * Otherwise, this method will return the larger of the values returned by 
+   * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
+   * after converting them into bytes.
    *
-   * @return The maximum amount of memory any task of this job will use, in
-   *         bytes.
+   * @return Memory required to run a task of this job, in bytes,
+   *          or {@link #DISABLED_MEMORY_LIMIT}, if unset.
    * @see #setMaxVirtualMemoryForTask(long)
    * @deprecated Use {@link #getMemoryForMapTask()} and
    *             {@link #getMemoryForReduceTask()}
@@ -1750,24 +1798,16 @@
       "getMaxVirtualMemoryForTask() is deprecated. " +
       "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
 
-    if (get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
-      if (get(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY) != null || get(
-        JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY) != null) {
-        long val = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
-        if (val == JobConf.DISABLED_MEMORY_LIMIT) {
-          return val;
-        } else {
-          if (val < 0) {
-            return JobConf.DISABLED_MEMORY_LIMIT;
-          }
-          return val * 1024 * 1024;
-          //Convert MB to byte as new value is in
-          // MB and old deprecated method returns bytes
-        }
+    long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+    value = normalizeMemoryConfigValue(value);
+    if (value == DISABLED_MEMORY_LIMIT) {
+      value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
+      value = normalizeMemoryConfigValue(value);
+      if (value != DISABLED_MEMORY_LIMIT) {
+        value *= 1024*1024;
       }
     }
-
-    return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+    return value;
   }
 
   /**
@@ -1775,8 +1815,8 @@
    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
    * <p/>
    * mapred.task.maxvmem is split into
-   * mapred.job.map.memory.mb
-   * and mapred.job.map.memory.mb,mapred
+   * mapreduce.map.memory.mb
+   * and mapreduce.map.memory.mb,mapred
    * each of the new key are set
    * as mapred.task.maxvmem / 1024
    * as new values are in MB

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobContext.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -23,36 +24,18 @@
  * @deprecated Use {@link org.apache.hadoop.mapreduce.JobContext} instead.
  */
 @Deprecated
-public class JobContext extends org.apache.hadoop.mapreduce.JobContext {
-  private JobConf job;
-  private Progressable progress;
-
-  JobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId, 
-             Progressable progress) {
-    super(conf, jobId);
-    this.job = conf;
-    this.progress = progress;
-  }
-
-  JobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId) {
-    this(conf, jobId, Reporter.NULL);
-  }
-  
+public interface JobContext extends org.apache.hadoop.mapreduce.JobContext {
   /**
    * Get the job Configuration
    * 
    * @return JobConf
    */
-  public JobConf getJobConf() {
-    return job;
-  }
+  public JobConf getJobConf();
   
   /**
    * Get the progress mechanism for reporting progress.
    * 
    * @return progress mechanism 
    */
-  public Progressable getProgressible() {
-    return progress;
-  }
+  public Progressable getProgressible();
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java Sat Nov 28 20:26:01 2009
@@ -100,8 +100,8 @@
     String uri = conf.getJobEndNotificationURI();
     if (uri != null) {
       // +1 to make logic for first notification identical to a retry
-      int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;
-      long retryInterval = conf.getInt("job.end.retry.interval", 30000);
+      int retryAttempts = conf.getInt(JobContext.END_NOTIFICATION_RETRIES, 0) + 1;
+      long retryInterval = conf.getInt(JobContext.END_NOTIFICATION_RETRIE_INTERVAL, 30000);
       if (uri.contains("$jobId")) {
         uri = uri.replace("$jobId", status.getJobID().toString());
       }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Nov 28 20:26:01 2009
@@ -18,13 +18,15 @@
 package org.apache.hadoop.mapred;
 
 import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -41,10 +43,29 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -52,7 +73,7 @@
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.fs.FSDataOutputStream;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -99,7 +120,7 @@
   int failedMapTasks = 0; 
   int failedReduceTasks = 0;
   
-  private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
+  static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
   int completedMapsForReduceSlowstart = 0;
   
   // runningMapTasks include speculative tasks, so we need to capture 
@@ -119,6 +140,8 @@
 
   JobPriority priority = JobPriority.NORMAL;
   protected JobTracker jobtracker;
+  
+  JobHistory jobHistory;
 
   // NetworkTopology Node to the set of TIPs
   Map<Node, List<TaskInProgress>> nonRunningMapCache;
@@ -151,7 +174,7 @@
    * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
    * schedule any available map tasks for this job, including speculative tasks.
    */
-  private int anyCacheLevel;
+  int anyCacheLevel;
   
   /**
    * A special value indicating that 
@@ -180,7 +203,7 @@
     new TreeMap<String, Integer>();
     
   //Confine estimation algorithms to an "oracle" class that JIP queries.
-  private ResourceEstimator resourceEstimator; 
+  ResourceEstimator resourceEstimator; 
   
   long startTime;
   long launchTime;
@@ -189,20 +212,20 @@
   // Indicates how many times the job got restarted
   private final int restartCount;
 
-  private JobConf conf;
+  JobConf conf;
   protected AtomicBoolean tasksInited = new AtomicBoolean(false);
   private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
 
-  private LocalFileSystem localFs;
-  private FileSystem fs;
-  private JobID jobId;
+  LocalFileSystem localFs;
+  FileSystem fs;
+  JobID jobId;
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
-  private long inputLength = 0;
+  long inputLength = 0;
   
-  private Counters jobCounters = new Counters();
+  Counters jobCounters = new Counters();
   
-  private MetricsRecord jobMetrics;
+  MetricsRecord jobMetrics;
   
   // Maximum no. of fetch-failure notifications after which map task is killed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
@@ -219,9 +242,9 @@
   private Object schedulingInfo;
 
   //thresholds for speculative execution
-  private float slowTaskThreshold;
-  private float speculativeCap;
-  private float slowNodeThreshold; //standard deviations
+  float slowTaskThreshold;
+  float speculativeCap;
+  float slowNodeThreshold; //standard deviations
 
   //Statistics are maintained for a couple of things
   //mapTaskStats is used for maintaining statistics about
@@ -303,17 +326,26 @@
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, 
         this.profile.getUser(), this.profile.getJobName(), 
         this.profile.getJobFile(), "");
+    this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
     (numMapTasks + numReduceTasks + 10);
     
     this.slowTaskThreshold = Math.max(0.0f,
-        conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f));
+        conf.getFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
     this.speculativeCap = conf.getFloat(
-        "mapred.speculative.execution.speculativeCap",0.1f);
+        JobContext.SPECULATIVECAP,0.1f);
     this.slowNodeThreshold = conf.getFloat(
-        "mapred.speculative.execution.slowNodeThreshold",1.0f);
+        JobContext.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
     this.jobSetupCleanupNeeded = conf.getBoolean(
-        "mapred.committer.job.setup.cleanup.needed", true);
+        JobContext.SETUP_CLEANUP_NEEDED, true);
+    if (tracker != null) { // Some mock tests have null tracker
+      this.jobHistory = tracker.getJobHistory();
+    }
+  }
+  
+  JobInProgress() {
+    restartCount = 0;
+    jobSetupCleanupNeeded = false;
   }
   
   /**
@@ -321,18 +353,13 @@
    * to the tracker.
    */
   public JobInProgress(JobID jobid, JobTracker jobtracker, 
-                       JobConf default_conf) throws IOException {
-    this(jobid, jobtracker, default_conf, 0);
-  }
-  
-  public JobInProgress(JobID jobid, JobTracker jobtracker, 
                        JobConf default_conf, int rCount) throws IOException {
     this.restartCount = rCount;
     this.jobId = jobid;
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
         + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
     this.jobtracker = jobtracker;
-    
+    this.jobHistory = jobtracker.getJobHistory();
     this.startTime = System.currentTimeMillis();
     
     this.localFs = jobtracker.getLocalFileSystem();
@@ -354,6 +381,7 @@
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, 
         profile.getUser(), profile.getJobName(), profile.getJobFile(), 
         profile.getURL().toString());
+    this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
     status.setStartTime(startTime);
     this.status.setJobPriority(this.priority);
 
@@ -367,7 +395,7 @@
     this.numReduceTasks = conf.getNumReduceTasks();
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
        (numMapTasks + numReduceTasks + 10);
-    JobContext jobContext = new JobContext(conf, jobId);
+    JobContext jobContext = new JobContextImpl(conf, jobId);
     this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
@@ -379,10 +407,8 @@
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobId", jobid.toString());
-    if (!hasRestarted()) { //This is temporary until we fix the restart model
-      hasSpeculativeMaps = conf.getMapSpeculativeExecution();
-      hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
-    }
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
     this.maxLevel = jobtracker.getNumTaskCacheLevels();
     this.anyCacheLevel = this.maxLevel+1;
     this.nonLocalMaps = new LinkedList<TaskInProgress>();
@@ -398,11 +424,11 @@
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.slowTaskThreshold = Math.max(0.0f,
-        conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f));
+        conf.getFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
     this.speculativeCap = conf.getFloat(
-        "mapred.speculative.execution.speculativeCap",0.1f);
+        JobContext.SPECULATIVECAP,0.1f);
     this.slowNodeThreshold = conf.getFloat(
-        "mapred.speculative.execution.slowNodeThreshold",1.0f);
+        JobContext.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
 
   }
 
@@ -447,7 +473,7 @@
   }
   
   Map<Node, List<TaskInProgress>> createCache(
-                         JobClient.RawSplit[] splits, int maxLevel) {
+                         Job.RawSplit[] splits, int maxLevel) {
     Map<Node, List<TaskInProgress>> cache = 
       new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
@@ -491,10 +517,6 @@
   public boolean inited() {
     return tasksInited.get();
   }
-  
-  boolean hasRestarted() {
-    return restartCount > 0;
-  }
 
   /**
    * Get the number of slots required to run a single map task-attempt.
@@ -551,17 +573,22 @@
 
     LOG.info("Initializing " + jobId);
 
-    logToJobHistory();
+    logSubmissionToJobHistory();
     
     // log the job priority
     setPriority(this.priority);
     
     //
+    // generate security keys needed by Tasks
+    //
+    generateJobTokens(jobtracker.getSystemDirectoryForJob(jobId));
+    
+    //
     // read input splits and create a map per a split
     //
     String jobFile = profile.getJobFile();
 
-    JobClient.RawSplit[] splits = createSplits();
+    Job.RawSplit[] splits = createSplits();
     numMapTasks = splits.length;
 
     checkTaskLimits();
@@ -584,7 +611,7 @@
     // we should start scheduling reduces
     completedMapsForReduceSlowstart = 
       (int)Math.ceil(
-          (conf.getFloat("mapred.reduce.slowstart.completed.maps", 
+          (conf.getFloat(JobContext.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 
                          DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
            numMapTasks));
     
@@ -599,8 +626,13 @@
     }
     
     tasksInited.set(true);
-    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
-                                 numMapTasks, numReduceTasks);
+    JobInitedEvent jie = new JobInitedEvent(
+        profile.getJobID(),  this.launchTime,
+        numMapTasks, numReduceTasks,
+        JobStatus.getJobRunState(JobStatus.PREP));
+    
+    jobHistory.logEvent(jie, jobId);
+   
   }
 
   // Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup)
@@ -623,18 +655,26 @@
     setupComplete();
   }
 
-  void logToJobHistory() throws IOException {
+  void logSubmissionToJobHistory() throws IOException {
     // log job info
-    JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
-        this.startTime, hasRestarted());
+    String username = conf.getUser();
+    if (username == null) { username = ""; }
+    String jobname = conf.getJobName();
+    if (jobname == null) { jobname = ""; }
+    setUpLocalizedJobConf(conf, jobId);
+    jobHistory.setupEventWriter(jobId, conf);
+    JobSubmittedEvent jse = new JobSubmittedEvent(jobId, jobname, username,
+        this.startTime, jobFile.toString());
+    jobHistory.logEvent(jse, jobId);
+    
   }
 
-  JobClient.RawSplit[] createSplits() throws IOException {
+  Job.RawSplit[] createSplits() throws IOException {
     DataInputStream splitFile =
-      fs.open(new Path(conf.get("mapred.job.split.file")));
-    JobClient.RawSplit[] splits;
+      fs.open(new Path(conf.get(JobContext.SPLIT_FILE)));
+    Job.RawSplit[] splits;
     try {
-      splits = JobClient.readSplitFile(splitFile);
+      splits = Job.readSplitFile(splitFile);
     } finally {
       splitFile.close();
     }
@@ -655,7 +695,7 @@
     }
   }
 
-  synchronized void createMapTasks(String jobFile, JobClient.RawSplit[] splits) {
+  synchronized void createMapTasks(String jobFile, Job.RawSplit[] splits) {
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
       inputLength += splits[i].getDataLength();
@@ -690,7 +730,7 @@
 
     // cleanup map tip. This map doesn't use any splits. Just assign an empty
     // split.
-    JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+    Job.RawSplit emptySplit = new Job.RawSplit();
     cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks, 1);
     cleanup[0].setJobCleanupTask();
@@ -715,11 +755,14 @@
     setup[1].setJobSetupTask();
   }
   
-  private void setupComplete() {
+  void setupComplete() {
     status.setSetupProgress(1.0f);
     if (this.status.getRunState() == JobStatus.PREP) {
-      this.status.setRunState(JobStatus.RUNNING);
-      JobHistory.JobInfo.logStarted(profile.getJobID());
+      changeStateTo(JobStatus.RUNNING);
+      JobStatusChangedEvent jse = 
+        new JobStatusChangedEvent(profile.getJobID(),
+         JobStatus.getJobRunState(JobStatus.RUNNING));
+      jobHistory.logEvent(jse, profile.getJobID());
     }
   }
 
@@ -767,6 +810,7 @@
     return numReduceTasks - runningReduceTasks - failedReduceTIPs - 
     finishedReduceTasks + speculativeReduceTasks;
   }
+ 
   public synchronized int getNumSlotsPerTask(TaskType taskType) {
     if (taskType == TaskType.MAP) {
       return numSlotsPerMap;
@@ -787,7 +831,11 @@
       this.priority = priority;
       status.setJobPriority(priority);
       // log and change to the job's priority
-      JobHistory.JobInfo.logJobPriority(jobId, priority);
+      JobPriorityChangeEvent prEvent = 
+        new JobPriorityChangeEvent(jobId, priority);
+       
+      jobHistory.logEvent(prEvent, jobId);
+      
     }
   }
 
@@ -796,7 +844,11 @@
     // log and change to the job's start/launch time
     this.startTime = startTime;
     this.launchTime = launchTime;
-    JobHistory.JobInfo.logJobInfo(jobId, startTime, launchTime);
+    JobInfoChangeEvent event = 
+      new JobInfoChangeEvent(jobId, startTime, launchTime);
+     
+    jobHistory.logEvent(event, jobId);
+    
   }
 
   /**
@@ -1303,6 +1355,16 @@
       Task result = tip.getTaskToRun(tts.getTrackerName());
       if (result != null) {
         addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+        if (jobFailed) {
+          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
+                .State.FAILED);
+        } else if (jobKilled) {
+          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
+                .State.KILLED);
+        } else {
+          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce
+                .JobStatus.State.SUCCEEDED);
+        }
       }
       return result;
     }
@@ -1476,18 +1538,18 @@
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
 
     // keeping the earlier ordering intact
-    String name;
+    TaskType name;
     String splits = "";
     Enum counter = null;
     if (tip.isJobSetupTask()) {
       launchedSetup = true;
-      name = Values.SETUP.name();
+      name = TaskType.JOB_SETUP;
     } else if (tip.isJobCleanupTask()) {
       launchedCleanup = true;
-      name = Values.CLEANUP.name();
+      name = TaskType.JOB_CLEANUP;
     } else if (tip.isMapTask()) {
       ++runningMapTasks;
-      name = Values.MAP.name();
+      name = TaskType.MAP;
       counter = JobCounter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
       if (tip.isSpeculating()) {
@@ -1498,7 +1560,7 @@
       metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
-      name = Values.REDUCE.name();
+      name = TaskType.REDUCE;
       counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
       if (tip.isSpeculating()) {
         speculativeReduceTasks++;
@@ -1510,8 +1572,12 @@
     // Note that the logs are for the scheduled tasks only. Tasks that join on 
     // restart has already their logs in place.
     if (tip.isFirstAttempt(id)) {
-      JobHistory.Task.logStarted(tip.getTIPId(), name,
-                                 tip.getExecStartTime(), splits);
+      TaskStartedEvent tse = new TaskStartedEvent(tip.getTIPId(), 
+          tip.getExecStartTime(),
+          name, splits);
+      
+      jobHistory.logEvent(tse, tip.getJob().jobId);
+      
     }
     if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
       jobCounters.incrCounter(counter, 1);
@@ -1551,7 +1617,7 @@
     }
   }
     
-  static String convertTrackerNameToHostName(String trackerName) {
+  public static String convertTrackerNameToHostName(String trackerName) {
     // Ugly!
     // Convert the trackerName to it's host name
     int indexOfColon = trackerName.indexOf(":");
@@ -1605,8 +1671,10 @@
     long now = System.currentTimeMillis();
     
     FallowSlotInfo info = map.get(taskTracker);
+    int reservedSlots = 0;
     if (info == null) {
       info = new FallowSlotInfo(now, numSlots);
+      reservedSlots = numSlots;
     } else {
       // Increment metering info if the reservation is changing
       if (info.getNumSlots() != numSlots) {
@@ -1618,11 +1686,19 @@
         jobCounters.incrCounter(counter, fallowSlotMillis);
         
         // Update 
+        reservedSlots = numSlots - info.getNumSlots();
         info.setTimestamp(now);
         info.setNumSlots(numSlots);
       }
     }
     map.put(taskTracker, info);
+    if (type == TaskType.MAP) {
+      jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
+    }
+    else {
+      jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
+    }
+    jobtracker.incrementReservations(type, reservedSlots);
   }
   
   public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
@@ -1648,6 +1724,14 @@
     jobCounters.incrCounter(counter, fallowSlotMillis);
 
     map.remove(taskTracker);
+    if (type == TaskType.MAP) {
+      jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
+    }
+    else {
+      jobtracker.getInstrumentation().decReservedReduceSlots(
+        info.getNumSlots());
+    }
+    jobtracker.decrementReservations(type, info.getNumSlots());
   }
   
   public int getNumReservedTaskTrackersForMaps() {
@@ -1981,7 +2065,9 @@
     String taskTrackerName = tts.getTrackerName();
     String taskTrackerHost = tts.getHost();
     if (numMapTasks == 0) {
-      LOG.info("No maps to schedule for " + profile.getJobID());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("No maps to schedule for " + profile.getJobID());
+      }
       return -1;
     }
 
@@ -2171,7 +2257,9 @@
     String taskTrackerName = tts.getTrackerName();
     String taskTrackerHost = tts.getHost();
     if (numReduceTasks == 0) {
-      LOG.info("No reduces to schedule for " + profile.getJobID());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("No reduces to schedule for " + profile.getJobID());
+      }
       return -1;
     }
     TaskInProgress tip = null;
@@ -2457,35 +2545,45 @@
     TaskTrackerStatus ttStatus = 
       this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
-    String taskType = getTaskType(tip);
+    TaskType taskType = getTaskType(tip);
+
+    TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
+        status.getTaskID(), taskType, status.getStartTime(), 
+        status.getTaskTracker(),  ttStatus.getHttpPort());
+    
+    jobHistory.logEvent(tse, status.getTaskID().getJobID());
+    
+
     if (status.getIsMap()){
-      JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-                                       status.getTaskTracker(), 
-                                       ttStatus.getHttpPort(), 
-                                       taskType); 
-      JobHistory.MapAttempt.logFinished(status.getTaskID(),
-                                        status.getMapFinishTime(),
-                                        status.getFinishTime(), 
-                                        trackerHostname, taskType,
-                                        status.getStateString(), 
-                                        status.getCounters()); 
+      MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
+          status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(),
+          status.getMapFinishTime(),
+          status.getFinishTime(),  trackerHostname,
+          status.getStateString(), 
+          new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+      
+      jobHistory.logEvent(mfe,  status.getTaskID().getJobID());
+      
     }else{
-      JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
-                                          status.getTaskTracker(),
-                                          ttStatus.getHttpPort(), 
-                                          taskType); 
-      JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
-                                           status.getSortFinishTime(), status.getFinishTime(), 
-                                           trackerHostname, 
-                                           taskType,
-                                           status.getStateString(), 
-                                           status.getCounters()); 
-    }
-    JobHistory.Task.logFinished(tip.getTIPId(), 
-                                taskType,
-                                tip.getExecFinishTime(),
-                                status.getCounters()); 
-        
+      ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent(
+          status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(), 
+          status.getShuffleFinishTime(),
+          status.getSortFinishTime(), status.getFinishTime(),
+          trackerHostname, status.getStateString(),
+          new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+      
+      jobHistory.logEvent(rfe,  status.getTaskID().getJobID());
+      
+    }
+
+    TaskFinishedEvent tfe = new TaskFinishedEvent(tip.getTIPId(),
+        tip.getExecFinishTime(), taskType, 
+        TaskStatus.State.SUCCEEDED.toString(),
+        new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+    
+    jobHistory.logEvent(tfe, tip.getJob().getJobID());
+    
+   
     if (tip.isJobSetupTask()) {
       // setup task has finished. kill the extra setup tip
       killSetupTip(!tip.isMapTask());
@@ -2591,7 +2689,32 @@
   public float getSlowTaskThreshold() {
     return slowTaskThreshold;
   }
-  
+
+  /**
+   * Job state change must happen thru this call
+   */
+  private void changeStateTo(int newState) {
+    int oldState = this.status.getRunState();
+    if (oldState == newState) {
+      return; //old and new states are same
+    }
+    this.status.setRunState(newState);
+    
+    //update the metrics
+    if (oldState == JobStatus.PREP) {
+      this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
+    } else if (oldState == JobStatus.RUNNING) {
+      this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
+    }
+    
+    if (newState == JobStatus.PREP) {
+      this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+    } else if (newState == JobStatus.RUNNING) {
+      this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
+    }
+    
+  }
+
   /**
    * The job is done since all it's component tasks are either
    * successful or have failed.
@@ -2603,7 +2726,7 @@
     //
     if (this.status.getRunState() == JobStatus.RUNNING ||
         this.status.getRunState() == JobStatus.PREP) {
-      this.status.setRunState(JobStatus.SUCCEEDED);
+      changeStateTo(JobStatus.SUCCEEDED);
       this.status.setCleanupProgress(1.0f);
       if (maps.length == 0) {
         this.status.setMapProgress(1.0f);
@@ -2621,10 +2744,18 @@
       JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
 
       // Log job-history
-      JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
-                                     this.finishedMapTasks, 
-                                     this.finishedReduceTasks, failedMapTasks, 
-                                     failedReduceTasks, getCounters());
+      JobFinishedEvent jfe = 
+        new JobFinishedEvent(this.status.getJobID(),
+          this.finishTime,
+          this.finishedMapTasks,this.finishedReduceTasks, failedMapTasks, 
+          failedReduceTasks, 
+          new org.apache.hadoop.mapreduce.Counters(getMapCounters()),
+          new org.apache.hadoop.mapreduce.Counters(getReduceCounters()),
+          new org.apache.hadoop.mapreduce.Counters(getCounters()));
+      
+      jobHistory.logEvent(jfe, this.status.getJobID());
+      jobHistory.closeWriter(this.status.getJobID());
+
       // Note that finalize will close the job history handles which garbage collect
       // might try to finalize
       garbageCollect();
@@ -2644,30 +2775,34 @@
       this.status.setFinishTime(this.finishTime);
 
       if (jobTerminationState == JobStatus.FAILED) {
-        this.status.setRunState(JobStatus.FAILED);
-        
-        // Log the job summary
-        JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
-
-        // Log to job-history
-        JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
-                                     this.finishedMapTasks, 
-                                     this.finishedReduceTasks);
+        changeStateTo(JobStatus.FAILED);
       } else {
-        this.status.setRunState(JobStatus.KILLED);
+        changeStateTo(JobStatus.KILLED);
+      }
+      // Log the job summary
+      JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
 
-        // Log the job summary
-        JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
+      JobUnsuccessfulCompletionEvent failedEvent = 
+        new JobUnsuccessfulCompletionEvent(this.status.getJobID(),
+            finishTime,
+            this.finishedMapTasks, 
+            this.finishedReduceTasks,
+            JobStatus.getJobRunState(jobTerminationState));
+      
+      jobHistory.logEvent(failedEvent, this.status.getJobID());
+      jobHistory.closeWriter(this.status.getJobID());
 
-        // Log to job-history
-        JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime, 
-                                     this.finishedMapTasks, 
-                                     this.finishedReduceTasks);
-      }
       garbageCollect();
-      
+
       jobtracker.getInstrumentation().terminateJob(
           this.conf, this.status.getJobID());
+      if (jobTerminationState == JobStatus.FAILED) {
+        jobtracker.getInstrumentation().failedJob(
+            this.conf, this.status.getJobID());
+      } else {
+        jobtracker.getInstrumentation().killedJob(
+            this.conf, this.status.getJobID());
+      }
     }
   }
 
@@ -2893,28 +3028,18 @@
     List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
     String diagInfo = taskDiagnosticInfo == null ? "" :
       StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
-    String taskType = getTaskType(tip);
-    if (taskStatus.getIsMap()) {
-      JobHistory.MapAttempt.logStarted(taskid, startTime, 
-        taskTrackerName, taskTrackerPort, taskType);
-      if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.MapAttempt.logFailed(taskid, finishTime,
-          taskTrackerHostName, diagInfo, taskType);
-      } else {
-        JobHistory.MapAttempt.logKilled(taskid, finishTime,
-          taskTrackerHostName, diagInfo, taskType);
-      }
-    } else {
-      JobHistory.ReduceAttempt.logStarted(taskid, startTime, 
-        taskTrackerName, taskTrackerPort, taskType);
-      if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.ReduceAttempt.logFailed(taskid, finishTime,
-          taskTrackerHostName, diagInfo, taskType);
-      } else {
-        JobHistory.ReduceAttempt.logKilled(taskid, finishTime,
-          taskTrackerHostName, diagInfo, taskType);
-      }
-    }
+    TaskType taskType = getTaskType(tip);
+    TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
+        taskid, taskType, startTime, taskTrackerName, taskTrackerPort);
+    
+    jobHistory.logEvent(tse, taskid.getJobID());
+   
+    TaskAttemptUnsuccessfulCompletionEvent tue = 
+      new TaskAttemptUnsuccessfulCompletionEvent(taskid, 
+          taskType, taskStatus.getRunState().toString(),
+          finishTime, 
+          taskTrackerHostName, diagInfo);
+    jobHistory.logEvent(tue, taskid.getJobID());
         
     // After this, try to assign tasks with the one after this, so that
     // the failed task goes to the end of the list.
@@ -2955,10 +3080,13 @@
       
       if (killJob) {
         LOG.info("Aborting job " + profile.getJobID());
-        JobHistory.Task.logFailed(tip.getTIPId(), 
-                                  taskType,  
-                                  finishTime, 
-                                  diagInfo);
+        TaskFailedEvent tfe = 
+          new TaskFailedEvent(tip.getTIPId(), finishTime, taskType, diagInfo,
+              TaskStatus.State.FAILED.toString(),
+              null);
+        
+        jobHistory.logEvent(tfe, tip.getJob().getJobID());
+        
         if (tip.isJobCleanupTask()) {
           // kill the other tip
           if (tip.isMapTask()) {
@@ -3042,9 +3170,14 @@
     updateTaskStatus(tip, status);
     boolean isComplete = tip.isComplete();
     if (wasComplete && !isComplete) { // mark a successful tip as failed
-      String taskType = getTaskType(tip);
-      JobHistory.Task.logFailed(tip.getTIPId(), taskType, 
-                                tip.getExecFinishTime(), reason, taskid);
+      TaskType taskType = getTaskType(tip);
+      TaskFailedEvent tfe = 
+        new TaskFailedEvent(tip.getTIPId(), tip.getExecFinishTime(), taskType,
+            reason, TaskStatus.State.FAILED.toString(),
+            taskid);
+      
+        jobHistory.logEvent(tfe, tip.getJob().getJobID());
+      
     }
   }
        
@@ -3232,15 +3365,15 @@
   /**
    * Get the task type for logging it to {@link JobHistory}.
    */
-  private String getTaskType(TaskInProgress tip) {
+  private TaskType getTaskType(TaskInProgress tip) {
     if (tip.isJobCleanupTask()) {
-      return Values.CLEANUP.name();
+      return TaskType.JOB_CLEANUP;
     } else if (tip.isJobSetupTask()) {
-      return Values.SETUP.name();
+      return TaskType.JOB_SETUP;
     } else if (tip.isMapTask()) {
-      return Values.MAP.name();
+      return TaskType.MAP;
     } else {
-      return Values.REDUCE.name();
+      return TaskType.REDUCE;
     }
   }
   
@@ -3337,4 +3470,73 @@
       );
     }
   }
+  
+  /**
+   * Creates the localized copy of job conf
+   * @param jobConf
+   * @param id
+   */
+  void setUpLocalizedJobConf(JobConf jobConf, 
+      org.apache.hadoop.mapreduce.JobID id) {
+    String localJobFilePath = jobtracker.getLocalJobFilePath(id); 
+    File localJobFile = new File(localJobFilePath);
+    FileOutputStream jobOut = null;
+    try {
+      jobOut = new FileOutputStream(localJobFile);
+      jobConf.writeXml(jobOut);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Job conf for " + id + " stored at " 
+            + localJobFile.getAbsolutePath());
+      }
+    } catch (IOException ioe) {
+      LOG.error("Failed to store job conf on the local filesystem ", ioe);
+    } finally {
+      if (jobOut != null) {
+        try {
+          jobOut.close();
+        } catch (IOException ie) {
+          LOG.info("Failed to close the job configuration file " 
+              + StringUtils.stringifyException(ie));
+        }
+      }
+    }
+  }
+
+  /**
+   * Deletes localized copy of job conf
+   */
+  void cleanupLocalizedJobConf(org.apache.hadoop.mapreduce.JobID id) {
+    String localJobFilePath = jobtracker.getLocalJobFilePath(id);
+    File f = new File (localJobFilePath);
+    LOG.info("Deleting localized job conf at " + f);
+    if (!f.delete()) {
+      LOG.debug("Failed to delete file " + f);
+    }
+  }
+  
+  /**
+   * generate keys and save it into the file
+   * @param jobDir
+   * @throws IOException
+   */
+  private void generateJobTokens(Path jobDir) throws IOException{
+    Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
+    FSDataOutputStream os = fs.create(keysFile);
+    //create JobTokens file and add key to it
+    JobTokens jt = new JobTokens();
+    byte [] key;
+    try {
+      // new key
+      key = SecureShuffleUtils.getNewEncodedKey();
+    } catch (java.security.GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+    // remember the key 
+    jt.setShuffleJobToken(key);
+    // other keys..
+    jt.write(os);
+    os.close();
+    LOG.debug("jobTokens generated and stored in "+ keysFile.toUri().getPath());
+  }
+
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobPriority.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobPriority.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobPriority.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobPriority.java Sat Nov 28 20:26:01 2009
@@ -19,8 +19,9 @@
 
 /**
  * Used to describe the priority of the running job. 
- *
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.JobPriority} instead
  */
+@Deprecated
 public enum JobPriority {
 
   VERY_HIGH,

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java Sat Nov 28 20:26:01 2009
@@ -18,41 +18,46 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.util.List;
+import java.util.ArrayList;
 
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+
 /**
- * <code>JobQueueClient</code> is interface provided to the user in order
- * to get JobQueue related information from the {@link JobTracker}
+ * <code>JobQueueClient</code> is interface provided to the user in order to get
+ * JobQueue related information from the {@link JobTracker}
  * 
- * It provides the facility to list the JobQueues present and ability to 
- * view the list of jobs within a specific JobQueue 
+ * It provides the facility to list the JobQueues present and ability to view
+ * the list of jobs within a specific JobQueue
  * 
-**/
+ **/
+
+class JobQueueClient extends Configured implements Tool {
 
-class JobQueueClient extends Configured implements  Tool {
-  
   JobClient jc;
-  
+
   public JobQueueClient() {
   }
-  
+
   public JobQueueClient(JobConf conf) throws IOException {
     setConf(conf);
   }
-  
+
   private void init(JobConf conf) throws IOException {
     setConf(conf);
     jc = new JobClient(conf);
   }
-  
+
   @Override
   public int run(String[] argv) throws Exception {
     int exitcode = -1;
-    
-    if(argv.length < 1){
+
+    if (argv.length < 1) {
       displayUsage("");
       return exitcode;
     }
@@ -61,102 +66,130 @@
     boolean displayQueueInfoWithJobs = false;
     boolean displayQueueInfoWithoutJobs = false;
     boolean displayQueueAclsInfoForCurrentUser = false;
-    
-    if("-list".equals(cmd)){
+
+    if ("-list".equals(cmd)) {
       displayQueueList = true;
-    }else if("-showacls".equals(cmd)) {
+    } else if ("-showacls".equals(cmd)) {
       displayQueueAclsInfoForCurrentUser = true;
-    }else if("-info".equals(cmd)){
-      if(argv.length == 2 && !(argv[1].equals("-showJobs"))) {
+    } else if ("-info".equals(cmd)) {
+      if (argv.length == 2 && !(argv[1].equals("-showJobs"))) {
         displayQueueInfoWithoutJobs = true;
-      } else if(argv.length == 3){
-        if(argv[2].equals("-showJobs")){
+      } else if (argv.length == 3) {
+        if (argv[2].equals("-showJobs")) {
           displayQueueInfoWithJobs = true;
-        }else {
+        } else {
           displayUsage(cmd);
           return exitcode;
         }
-      }else {
+      } else {
         displayUsage(cmd);
         return exitcode;
-      }      
+      }
     } else {
       displayUsage(cmd);
       return exitcode;
     }
+    
     JobConf conf = new JobConf(getConf());
     init(conf);
     if (displayQueueList) {
       displayQueueList();
       exitcode = 0;
-    } else if (displayQueueInfoWithoutJobs){
-      displayQueueInfo(argv[1],false);
+    } else if (displayQueueInfoWithoutJobs) {
+      displayQueueInfo(argv[1], false);
       exitcode = 0;
     } else if (displayQueueInfoWithJobs) {
-      displayQueueInfo(argv[1],true);
+      displayQueueInfo(argv[1], true);
       exitcode = 0;
-    }else if (displayQueueAclsInfoForCurrentUser) {
+    } else if (displayQueueAclsInfoForCurrentUser) {
       this.displayQueueAclsInfoForCurrentUser();
       exitcode = 0;
     }
-    
     return exitcode;
   }
-  
-  /**
-   * Method used to display information pertaining to a Single JobQueue 
-   * registered with the {@link QueueManager}. Display of the Jobs is 
-   * determine by the boolean 
-   * 
-   * @throws IOException
-   */
 
-  private void displayQueueInfo(String queue, boolean showJobs) throws IOException {
-    JobQueueInfo jobQueueInfo = jc.getQueueInfo(queue);
+  // format and print information about the passed in job queue.
+  void printJobQueueInfo(JobQueueInfo jobQueueInfo, Writer writer)
+      throws IOException {
     if (jobQueueInfo == null) {
-      System.out.printf("Queue Name : %s has no scheduling information \n", queue);
-    } else {
-      printJobQueueInfo(jobQueueInfo);
+      writer.write("No queue found.\n");
+      writer.flush();
+      return;
+    }
+    writer.write(String.format("Queue Name : %s \n",
+        jobQueueInfo.getQueueName()));
+    writer.write(String.format("Queue State : %s \n",
+        jobQueueInfo.getQueueState()));
+    writer.write(String.format("Scheduling Info : %s \n",
+        jobQueueInfo.getSchedulingInfo()));
+    List<JobQueueInfo> childQueues = jobQueueInfo.getChildren();
+    if (childQueues != null && childQueues.size() > 0) {
+      writer.write(String.format("Child Queues : "));
+      for (int i = 0; i < childQueues.size(); i++) {
+        JobQueueInfo childQueue = childQueues.get(i);
+        writer.write(String.format("%s", childQueue.getQueueName()));
+        if (i != childQueues.size() - 1) {
+          writer.write(String.format(", "));
+        }
+      }
+      writer.write("\n");
     }
-    if (showJobs) {
-      System.out.printf("Job List\n");
-      JobStatus[] jobs = jc.getJobsFromQueue(queue);
-      if (jobs == null)
-        jobs = new JobStatus[0];
-      jc.displayJobList(jobs);
+    writer.write(String.format("======================\n"));
+    writer.flush();
+  }
+  
+  private void displayQueueList() throws IOException {
+    JobQueueInfo[] rootQueues = jc.getRootQueues();
+    List<JobQueueInfo> allQueues = expandQueueList(rootQueues);
+    for (JobQueueInfo queue : allQueues) {
+      printJobQueueInfo(queue, new PrintWriter(System.out));
     }
   }
-
-  // format and print information about the passed in job queue.
-  private void printJobQueueInfo(JobQueueInfo jobQueueInfo) {
-    System.out.printf("Queue Name : %s \n", jobQueueInfo.getQueueName()); 
-    System.out.printf("Queue State : %s \n", jobQueueInfo.getQueueState());
-    System.out.printf("Scheduling Info : %s \n",jobQueueInfo.getSchedulingInfo());
+  
+  /**
+   * Expands the hierarchy of queues and gives the list of all queues in 
+   * depth-first order
+   * @param rootQueues the top-level queues
+   * @return the list of all the queues in depth-first order.
+   */
+  List<JobQueueInfo> expandQueueList(JobQueueInfo[] rootQueues) {
+    List<JobQueueInfo> allQueues = new ArrayList<JobQueueInfo>();
+    for (JobQueueInfo queue : rootQueues) {
+      allQueues.add(queue);
+      if (queue.getChildren() != null) {
+        JobQueueInfo[] childQueues 
+          = queue.getChildren().toArray(new JobQueueInfo[0]);
+        allQueues.addAll(expandQueueList(childQueues));
+      }
+    }
+    return allQueues;
   }
-
+ 
   /**
-   * Method used to display the list of the JobQueues registered
-   * with the {@link QueueManager}
+   * Method used to display information pertaining to a Single JobQueue
+   * registered with the {@link QueueManager}. Display of the Jobs is determine
+   * by the boolean
    * 
    * @throws IOException
    */
-  private void displayQueueList() throws IOException {
-    JobQueueInfo[] queues = jc.getQueues();
-    for (JobQueueInfo queue : queues) {
-      String schedInfo = queue.getSchedulingInfo();
-      if(schedInfo.trim().equals("")){
-        schedInfo = "N/A";
-      }
-      printJobQueueInfo(queue);
+  private void displayQueueInfo(String queue, boolean showJobs)
+      throws IOException {
+    JobQueueInfo jobQueueInfo = jc.getQueueInfo(queue);
+    printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
+    if (showJobs && (jobQueueInfo.getChildren() == null ||
+        jobQueueInfo.getChildren().size() == 0)) {
+      JobStatus[] jobs = jc.getJobsFromQueue(queue);
+      if (jobs == null)
+        jobs = new JobStatus[0];
+      jc.displayJobList(jobs);
     }
   }
-
+   
   private void displayQueueAclsInfoForCurrentUser() throws IOException {
     QueueAclsInfo[] queueAclsInfoList = jc.getQueueAclsForCurrentUser();
     UserGroupInformation ugi = UserGroupInformation.readFrom(getConf());
     if (queueAclsInfoList.length > 0) {
-      System.out.println("Queue acls for user :  "
-              + ugi.getUserName());
+      System.out.println("Queue acls for user :  " + ugi.getUserName());
       System.out.println("\nQueue  Operations");
       System.out.println("=====================");
       for (QueueAclsInfo queueInfo : queueAclsInfoList) {
@@ -172,17 +205,16 @@
         System.out.println();
       }
     } else {
-      System.out.println("User " +
-              ugi.getUserName() +
-              " does not have access to any queue. \n");
+      System.out.println("User " + ugi.getUserName()
+          + " does not have access to any queue. \n");
     }
   }
-  
+
   private void displayUsage(String cmd) {
     String prefix = "Usage: JobQueueClient ";
-    if ("-queueinfo".equals(cmd)){
+    if ("-queueinfo".equals(cmd)) {
       System.err.println(prefix + "[" + cmd + "<job-queue-name> [-showJobs]]");
-    }else {
+    } else {
       System.err.printf(prefix + "<command> <args>\n");
       System.err.printf("\t[-list]\n");
       System.err.printf("\t[-info <job-queue-name> [-showJobs]]\n");
@@ -195,5 +227,5 @@
     int res = ToolRunner.run(new JobQueueClient(), argv);
     System.exit(res);
   }
-  
+
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueInfo.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueInfo.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueInfo.java Sat Nov 28 20:26:01 2009
@@ -17,35 +17,29 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
 
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.QueueState;
 
 /**
  * Class that contains the information regarding the Job Queues which are 
  * maintained by the Hadoop Map/Reduce framework.
- * 
+ * @deprecated Use {@link QueueInfo} instead
  */
+@Deprecated
+public class JobQueueInfo extends QueueInfo {
 
-public class JobQueueInfo implements Writable {
-
-  private String queueName = "";
-  //The scheduling Information object is read back as String.
-  //Once the scheduling information is set there is no way to recover it.
-  private String schedulingInfo; 
-  
-  private String queueState;
-  
   /**
    * Default constructor for Job Queue Info.
    * 
    */
   public JobQueueInfo() {
-    
+    super();  
   }
+
   /**
    * Construct a new JobQueueInfo object using the queue name and the
    * scheduling information passed.
@@ -55,29 +49,24 @@
    * queue
    */
   public JobQueueInfo(String queueName, String schedulingInfo) {
-    this.queueName = queueName;
-    this.schedulingInfo = schedulingInfo;
-    // make it running by default.
-    this.queueState = Queue.QueueState.RUNNING.getStateName();
+    super(queueName, schedulingInfo);
   }
   
+  JobQueueInfo(QueueInfo queue) {
+    this(queue.getQueueName(), queue.getSchedulingInfo());
+    setQueueState(queue.getState().getStateName());
+    setQueueChildren(queue.getQueueChildren());
+    setProperties(queue.getProperties());
+    setJobStatuses(queue.getJobStatuses());
+  }
   
   /**
    * Set the queue name of the JobQueueInfo
    * 
    * @param queueName Name of the job queue.
    */
-  public void setQueueName(String queueName) {
-    this.queueName = queueName;
-  }
-
-  /**
-   * Get the queue name from JobQueueInfo
-   * 
-   * @return queue name
-   */
-  public String getQueueName() {
-    return queueName;
+  protected void setQueueName(String queueName) {
+    super.setQueueName(queueName);
   }
 
   /**
@@ -85,55 +74,73 @@
    * 
    * @param schedulingInfo
    */
-  public void setSchedulingInfo(String schedulingInfo) {
-    this.schedulingInfo = schedulingInfo;
+  protected void setSchedulingInfo(String schedulingInfo) {
+    super.setSchedulingInfo(schedulingInfo);
   }
 
   /**
-   * Gets the scheduling information associated to particular job queue.
-   * If nothing is set would return <b>"N/A"</b>
-   * 
-   * @return Scheduling information associated to particular Job Queue
-   */
-  public String getSchedulingInfo() {
-    if(schedulingInfo != null) {
-      return schedulingInfo;
-    }else {
-      return "N/A";
-    }
-  }
-  
-  /**
    * Set the state of the queue
    * @param state state of the queue.
    */
-  public void setQueueState(String state) {
-    queueState = state;
+  protected void setQueueState(String state) {
+    super.setState(QueueState.getState(state));
   }
   
-  /**
-   * Return the queue state
-   * @return the queue state.
-   */
-  public String getQueueState() {
-    return queueState;
+  String getQueueState() {
+    return super.getState().toString();
   }
   
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    queueName = Text.readString(in);
-    queueState = Text.readString(in);
-    schedulingInfo = Text.readString(in);
+  protected void setChildren(List<JobQueueInfo> children) {
+    List<QueueInfo> list = new ArrayList<QueueInfo>();
+    for (JobQueueInfo q : children) {
+      list.add(q);
+    }
+    super.setQueueChildren(list);
   }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    Text.writeString(out, queueName);
-    Text.writeString(out, queueState);
-    if(schedulingInfo!= null) {
-      Text.writeString(out, schedulingInfo);
-    }else {
-      Text.writeString(out, "N/A");
+  public List<JobQueueInfo> getChildren() {
+    List<JobQueueInfo> list = new ArrayList<JobQueueInfo>();
+    for (QueueInfo q : super.getQueueChildren()) {
+      list.add((JobQueueInfo)q);
     }
+    return list;
+  }
+
+  protected void setProperties(Properties props) {
+    super.setProperties(props);
+  }
+
+  /**
+   * Add a child {@link JobQueueInfo} to this {@link JobQueueInfo}. Modify the
+   * fully-qualified name of the child {@link JobQueueInfo} to reflect the
+   * hierarchy.
+   * 
+   * Only for testing.
+   * 
+   * @param child
+   */
+  void addChild(JobQueueInfo child) {
+    List<JobQueueInfo> children = getChildren();
+    children.add(child);
+    setChildren(children);
   }
+
+  /**
+   * Remove the child from this {@link JobQueueInfo}. This also resets the
+   * queue-name of the child from a fully-qualified name to a simple queue name.
+   * 
+   * Only for testing.
+   * 
+   * @param child
+   */
+  void removeChild(JobQueueInfo child) {
+    List<JobQueueInfo> children = getChildren();
+    children.remove(child);
+    setChildren(children);
+  }
+
+  protected void setJobStatuses(org.apache.hadoop.mapreduce.JobStatus[] stats) {
+    super.setJobStatuses(stats);
+  }
+
 }