You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [22/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Sat Nov 28 20:26:01 2009
@@ -105,7 +105,7 @@
   }
   
   @Override
-  void addFetchFailedMap(TaskAttemptID mapTaskId) {
+  public void addFetchFailedMap(TaskAttemptID mapTaskId) {
     failedFetchTasks.add(mapTaskId);
   }
   

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Reducer.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Reducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Reducer.java Sat Nov 28 20:26:01 2009
@@ -109,7 +109,7 @@
  *       private int noKeys = 0;
  *       
  *       public void configure(JobConf job) {
- *         reduceTaskId = job.get("mapred.task.id");
+ *         reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID);
  *       }
  *       
  *       public void reduce(K key, Iterator<V> values,
@@ -185,8 +185,8 @@
    * takes an insignificant amount of time to process individual key/value 
    * pairs, this is crucial since the framework might assume that the task has 
    * timed-out and kill that task. The other way of avoiding this is to set 
-   * <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout">
-   * mapred.task.timeout</a> to a high-enough value (or even zero for no 
+   * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout">
+   * mapreduce.task.timeout</a> to a high-enough value (or even zero for no 
    * time-outs).</p>
    * 
    * @param key the key.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ResourceEstimator.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ResourceEstimator.java Sat Nov 28 20:26:01 2009
@@ -56,9 +56,11 @@
       completedMapsInputSize+=(tip.getMapInputSize()+1);
       completedMapsOutputSize+=ts.getOutputSize();
 
-      LOG.info("completedMapsUpdates:"+completedMapsUpdates+"  "+
-          "completedMapsInputSize:"+completedMapsInputSize+"  " +
-        "completedMapsOutputSize:"+completedMapsOutputSize);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("completedMapsUpdates:"+completedMapsUpdates+"  "+
+                  "completedMapsInputSize:"+completedMapsInputSize+"  " +
+                  "completedMapsOutputSize:"+completedMapsOutputSize);
+      }
     }
   }
 
@@ -73,7 +75,9 @@
       //add desiredMaps() so that randomwriter case doesn't blow up
       long estimate = Math.round((inputSize * 
           completedMapsOutputSize * 2.0)/completedMapsInputSize);
-      LOG.debug("estimate total map output will be " + estimate);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("estimate total map output will be " + estimate);
+      }
       return estimate;
     }
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/RunningJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/RunningJob.java Sat Nov 28 20:26:01 2009
@@ -30,7 +30,9 @@
  * progress etc.</p> 
  * 
  * @see JobClient
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.Job} instead
  */
+@Deprecated
 public interface RunningJob {
   /**
    * Get the job identifier.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -61,7 +61,8 @@
    */
   static public void setSequenceFileOutputKeyClass(JobConf conf, 
                                                    Class<?> theClass) {
-    conf.setClass("mapred.seqbinary.output.key.class", theClass, Object.class);
+    conf.setClass(org.apache.hadoop.mapreduce.lib.output.
+      SequenceFileAsBinaryOutputFormat.KEY_CLASS, theClass, Object.class);
   }
 
   /**
@@ -74,8 +75,8 @@
    */
   static public void setSequenceFileOutputValueClass(JobConf conf, 
                                                      Class<?> theClass) {
-    conf.setClass("mapred.seqbinary.output.value.class", 
-                  theClass, Object.class);
+    conf.setClass(org.apache.hadoop.mapreduce.lib.output.
+      SequenceFileAsBinaryOutputFormat.VALUE_CLASS, theClass, Object.class);
   }
 
   /**
@@ -84,9 +85,10 @@
    * @return the key class of the {@link SequenceFile}
    */
   static public Class<? extends WritableComparable> getSequenceFileOutputKeyClass(JobConf conf) { 
-    return conf.getClass("mapred.seqbinary.output.key.class", 
-                         conf.getOutputKeyClass().asSubclass(WritableComparable.class),
-                         WritableComparable.class);
+    return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
+      SequenceFileAsBinaryOutputFormat.KEY_CLASS, 
+      conf.getOutputKeyClass().asSubclass(WritableComparable.class),
+      WritableComparable.class);
   }
 
   /**
@@ -95,9 +97,9 @@
    * @return the value class of the {@link SequenceFile}
    */
   static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf conf) { 
-    return conf.getClass("mapred.seqbinary.output.value.class", 
-                         conf.getOutputValueClass().asSubclass(Writable.class),
-                         Writable.class);
+    return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
+      SequenceFileAsBinaryOutputFormat.VALUE_CLASS, 
+      conf.getOutputValueClass().asSubclass(Writable.class), Writable.class);
   }
   
   @Override 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -102,8 +102,8 @@
    *         defaulting to {@link CompressionType#RECORD}
    */
   public static CompressionType getOutputCompressionType(JobConf conf) {
-    String val = conf.get("mapred.output.compression.type", 
-                          CompressionType.RECORD.toString());
+    String val = conf.get(org.apache.hadoop.mapreduce.lib.output.
+      FileOutputFormat.COMPRESS_TYPE, CompressionType.RECORD.toString());
     return CompressionType.valueOf(val);
   }
   
@@ -116,7 +116,8 @@
   public static void setOutputCompressionType(JobConf conf, 
 		                                          CompressionType style) {
     setCompressOutput(conf, true);
-    conf.set("mapred.output.compression.type", style.toString());
+    conf.set(org.apache.hadoop.mapreduce.lib.output.
+      FileOutputFormat.COMPRESS_TYPE, style.toString());
   }
 
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SkipBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SkipBadRecords.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SkipBadRecords.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SkipBadRecords.java Sat Nov 28 20:26:01 2009
@@ -72,16 +72,16 @@
     "ReduceProcessedGroups";
   
   private static final String ATTEMPTS_TO_START_SKIPPING = 
-    "mapred.skip.attempts.to.start.skipping";
+    JobContext.SKIP_START_ATTEMPTS;
   private static final String AUTO_INCR_MAP_PROC_COUNT = 
-    "mapred.skip.map.auto.incr.proc.count";
+    JobContext.MAP_SKIP_INCR_PROC_COUNT;
   private static final String AUTO_INCR_REDUCE_PROC_COUNT = 
-    "mapred.skip.reduce.auto.incr.proc.count";
-  private static final String OUT_PATH = "mapred.skip.out.dir";
+    JobContext.REDUCE_SKIP_INCR_PROC_COUNT;
+  private static final String OUT_PATH = JobContext.SKIP_OUTDIR;
   private static final String MAPPER_MAX_SKIP_RECORDS = 
-    "mapred.skip.map.max.skip.records";
+    JobContext.MAP_SKIP_MAX_RECORDS;
   private static final String REDUCER_MAX_SKIP_GROUPS = 
-    "mapred.skip.reduce.max.skip.groups";
+    JobContext.REDUCE_SKIP_MAXGROUPS;
   
   /**
    * Get the number of Task attempts AFTER which skip mode 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java Sat Nov 28 20:26:01 2009
@@ -21,8 +21,6 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.text.NumberFormat;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -42,15 +40,24 @@
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 
 /** 
  * Base class for tasks.
@@ -61,6 +68,30 @@
   private static final Log LOG =
     LogFactory.getLog(Task.class);
 
+  // Counters used by Task subclasses
+  protected static enum Counter { 
+    MAP_INPUT_RECORDS, 
+    MAP_OUTPUT_RECORDS,
+    MAP_SKIPPED_RECORDS,
+    MAP_INPUT_BYTES, 
+    MAP_OUTPUT_BYTES,
+    COMBINE_INPUT_RECORDS,
+    COMBINE_OUTPUT_RECORDS,
+    REDUCE_INPUT_GROUPS,
+    REDUCE_SHUFFLE_BYTES,
+    REDUCE_INPUT_RECORDS,
+    REDUCE_OUTPUT_RECORDS,
+    REDUCE_SKIPPED_GROUPS,
+    REDUCE_SKIPPED_RECORDS,
+    SPILLED_RECORDS,
+    FAILED_SHUFFLE,
+    SHUFFLED_MAPS,
+    MERGED_MAP_OUTPUTS,
+  }
+  
+  public static String MERGED_OUTPUT_PREFIX = ".merged";
+  
+
   /**
    * Counters to measure the usage of the different file systems.
    * Always return the String array with two elements. First one is the name of  
@@ -97,9 +128,11 @@
   ////////////////////////////////////////////
 
   private String jobFile;                         // job configuration file
+  private String user;                            // user running the job
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
+  protected JobStatus.State jobRunStateForCleanup;
   protected boolean jobCleanup = false;
   protected boolean jobSetup = false;
   protected boolean taskCleanup = false;
@@ -123,7 +156,11 @@
   protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
   protected org.apache.hadoop.mapreduce.OutputCommitter committer;
   protected final Counters.Counter spilledRecordsCounter;
+  protected final Counters.Counter failedShuffleCounter;
+  protected final Counters.Counter mergedMapOutputsCounter;
   private int numSlotsRequired;
+  protected TaskUmbilicalProtocol umbilical;
+  protected JobTokens jobTokens=null; // storage of the secret keys
 
   ////////////////////////////////////////////
   // Constructors
@@ -133,6 +170,8 @@
     taskStatus = TaskStatus.createTaskStatus(isMapTask());
     taskId = new TaskAttemptID();
     spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
+    failedShuffleCounter = counters.findCounter(Counter.FAILED_SHUFFLE);
+    mergedMapOutputsCounter = counters.findCounter(Counter.MERGED_MAP_OUTPUTS);
   }
 
   public Task(String jobFile, TaskAttemptID taskId, int partition, 
@@ -151,6 +190,8 @@
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
     spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
+    failedShuffleCounter = counters.findCounter(Counter.FAILED_SHUFFLE);
+    mergedMapOutputsCounter = counters.findCounter(Counter.MERGED_MAP_OUTPUTS);
   }
 
   ////////////////////////////////////////////
@@ -172,6 +213,23 @@
   public JobID getJobID() {
     return taskId.getJobID();
   }
+
+  /**
+   * set JobToken storage 
+   * @param jt
+   */
+  public void setJobTokens(JobTokens jt) {
+    this.jobTokens = jt;
+  }
+
+  /**
+   * get JobToken storage
+   * @return storage object
+   */
+  public JobTokens getJobTokens() {
+    return this.jobTokens;
+  }
+
   
   /**
    * Get the index of this task within the job.
@@ -211,6 +269,24 @@
   }
   
   /**
+   * Report a fatal error to the parent (task) tracker.
+   */
+  protected void reportFatalError(TaskAttemptID id, Throwable throwable, 
+                                  String logMsg) {
+    LOG.fatal(logMsg);
+    Throwable tCause = throwable.getCause();
+    String cause = tCause == null 
+                   ? StringUtils.stringifyException(throwable)
+                   : StringUtils.stringifyException(tCause);
+    try {
+      umbilical.fatalError(id, cause);
+    } catch (IOException ioe) {
+      LOG.fatal("Failed to contact the tasktracker", ioe);
+      System.exit(-1);
+    }
+  }
+
+  /**
    * Get skipRanges.
    */
   public SortedRanges getSkipRanges() {
@@ -268,6 +344,14 @@
     return jobCleanup;
   }
 
+  boolean isJobAbortTask() {
+    // the task is an abort task if its marked for cleanup and the final 
+    // expected state is either failed or killed.
+    return isJobCleanupTask() 
+           && (jobRunStateForCleanup == JobStatus.State.KILLED 
+               || jobRunStateForCleanup == JobStatus.State.FAILED);
+  }
+  
   boolean isJobSetupTask() {
     return jobSetup;
   }
@@ -280,10 +364,29 @@
     jobCleanup = true; 
   }
 
+  /**
+   * Sets the task to do job abort in the cleanup.
+   * @param status the final runstate of the job. 
+   */
+  void setJobCleanupTaskState(JobStatus.State status) {
+    jobRunStateForCleanup = status;
+  }
+  
   boolean isMapOrReduce() {
     return !jobSetup && !jobCleanup && !taskCleanup;
   }
-  
+
+  /**
+   * Get the name of the user running the job/task. TaskTracker needs task's
+   * user name even before it's JobConf is localized. So we explicitly serialize
+   * the user name.
+   * 
+   * @return user
+   */
+  String getUser() {
+    return user;
+  }
+
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -297,9 +400,13 @@
     skipRanges.write(out);
     out.writeBoolean(skipping);
     out.writeBoolean(jobCleanup);
+    if (jobCleanup) {
+      WritableUtils.writeEnum(out, jobRunStateForCleanup);
+    }
     out.writeBoolean(jobSetup);
     out.writeBoolean(writeSkipRecs);
-    out.writeBoolean(taskCleanup);  
+    out.writeBoolean(taskCleanup);
+    Text.writeString(out, user);
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -313,12 +420,17 @@
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
     jobCleanup = in.readBoolean();
+    if (jobCleanup) {
+      jobRunStateForCleanup = 
+        WritableUtils.readEnum(in, JobStatus.State.class);
+    }
     jobSetup = in.readBoolean();
     writeSkipRecs = in.readBoolean();
     taskCleanup = in.readBoolean();
     if (taskCleanup) {
       setPhase(TaskStatus.Phase.CLEANUP);
     }
+    user = Text.readString(in);
   }
 
   @Override
@@ -328,11 +440,11 @@
    * Localize the given JobConf to be specific for this task.
    */
   public void localizeConfiguration(JobConf conf) throws IOException {
-    conf.set("mapred.tip.id", taskId.getTaskID().toString()); 
-    conf.set("mapred.task.id", taskId.toString());
-    conf.setBoolean("mapred.task.is.map", isMapTask());
-    conf.setInt("mapred.task.partition", partition);
-    conf.set("mapred.job.id", taskId.getJobID().toString());
+    conf.set(JobContext.TASK_ID, taskId.getTaskID().toString()); 
+    conf.set(JobContext.TASK_ATTEMPT_ID, taskId.toString());
+    conf.setBoolean(JobContext.TASK_ISMAP, isMapTask());
+    conf.setInt(JobContext.TASK_PARTITION, partition);
+    conf.set(JobContext.ID, taskId.getJobID().toString());
   }
   
   /** Run this task as a part of the named job.  This method is executed in the
@@ -368,8 +480,8 @@
                          boolean useNewApi) throws IOException, 
                                                    ClassNotFoundException,
                                                    InterruptedException {
-    jobContext = new JobContext(job, id, reporter);
-    taskContext = new TaskAttemptContext(job, taskId, reporter);
+    jobContext = new JobContextImpl(job, id, reporter);
+    taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
     if (getState() == TaskStatus.State.UNASSIGNED) {
       setState(TaskStatus.State.RUNNING);
     }
@@ -674,7 +786,12 @@
     sendDone(umbilical);
   }
 
-  protected void statusUpdate(TaskUmbilicalProtocol umbilical) 
+  /**
+   * Send a status update to the task tracker
+   * @param umbilical
+   * @throws IOException
+   */
+  public void statusUpdate(TaskUmbilicalProtocol umbilical) 
   throws IOException {
     int retries = MAX_RETRIES;
     while (true) {
@@ -800,7 +917,27 @@
     getProgress().setStatus("cleanup");
     statusUpdate(umbilical);
     // do the cleanup
-    committer.cleanupJob(jobContext);
+    LOG.info("Cleaning up job");
+    if (jobRunStateForCleanup == JobStatus.State.FAILED 
+        || jobRunStateForCleanup == JobStatus.State.KILLED) {
+      LOG.info("Aborting job with runstate : " + jobRunStateForCleanup.name());
+      if (conf.getUseNewMapper()) {
+        committer.abortJob(jobContext, jobRunStateForCleanup);
+      } else {
+        org.apache.hadoop.mapred.OutputCommitter oldCommitter = 
+          (org.apache.hadoop.mapred.OutputCommitter)committer;
+        oldCommitter.abortJob(jobContext, jobRunStateForCleanup);
+      }
+    } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
+      LOG.info("Committing job");
+      committer.commitJob(jobContext);
+    } else {
+      throw new IOException("Invalid state of the job for cleanup. State found "
+                            + jobRunStateForCleanup + " expecting "
+                            + JobStatus.State.SUCCEEDED + ", " 
+                            + JobStatus.State.FAILED + " or "
+                            + JobStatus.State.KILLED);
+    }
     done(umbilical, reporter);
   }
 
@@ -820,11 +957,11 @@
       this.conf = new JobConf(conf);
     }
     this.mapOutputFile.setConf(this.conf);
-    this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+    this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
     // add the static resolutions (this is required for the junit to
     // work on testcases that simulate multiple nodes on a single physical
     // node.
-    String hostToResolved[] = conf.getStrings("hadoop.net.static.resolutions");
+    String hostToResolved[] = conf.getStrings(TTConfig.TT_STATIC_RESOLUTIONS);
     if (hostToResolved != null) {
       for (String str : hostToResolved) {
         String name = str.substring(0, str.indexOf('='));
@@ -832,6 +969,7 @@
         NetUtils.addStaticResolution(name, resolvedName);
       }
     }
+    this.user = this.conf.getUser();
   }
 
   public Configuration getConf() {
@@ -841,7 +979,7 @@
   /**
    * OutputCollector for the combiner.
    */
-  protected static class CombineOutputCollector<K extends Object, V extends Object> 
+  public static class CombineOutputCollector<K extends Object, V extends Object> 
   implements OutputCollector<K, V> {
     private Writer<K, V> writer;
     private Counters.Counter outCounter;
@@ -919,7 +1057,7 @@
     /// Auxiliary methods
 
     /** Start processing next unique key. */
-    void nextKey() throws IOException {
+    public void nextKey() throws IOException {
       // read until we find a new key
       while (hasNext) { 
         readNextKey();
@@ -934,12 +1072,12 @@
     }
 
     /** True iff more keys remain. */
-    boolean more() { 
+    public boolean more() { 
       return more; 
     }
 
     /** The current key. */
-    KEY getKey() { 
+    public KEY getKey() { 
       return key; 
     }
 
@@ -969,7 +1107,8 @@
     }
   }
 
-  protected static class CombineValuesIterator<KEY,VALUE>
+    /** Iterator to return Combined values */
+  public static class CombineValuesIterator<KEY,VALUE>
       extends ValuesIterator<KEY,VALUE> {
 
     private final Counters.Counter combineInputCounter;
@@ -988,28 +1127,6 @@
     }
   }
 
-  private static final Constructor<org.apache.hadoop.mapreduce.Reducer.Context> 
-  contextConstructor;
-  static {
-    try {
-      contextConstructor = 
-        org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor
-        (new Class[]{org.apache.hadoop.mapreduce.Reducer.class,
-            Configuration.class,
-            org.apache.hadoop.mapreduce.TaskAttemptID.class,
-            RawKeyValueIterator.class,
-            org.apache.hadoop.mapreduce.Counter.class,
-            org.apache.hadoop.mapreduce.RecordWriter.class,
-            org.apache.hadoop.mapreduce.OutputCommitter.class,
-            org.apache.hadoop.mapreduce.StatusReporter.class,
-            RawComparator.class,
-            Class.class,
-            Class.class});
-    } catch (NoSuchMethodException nme) {
-      throw new IllegalArgumentException("Can't find constructor");
-    }
-  }
-
   @SuppressWarnings("unchecked")
   protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> 
   org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
@@ -1018,26 +1135,33 @@
                       Configuration job,
                       org.apache.hadoop.mapreduce.TaskAttemptID taskId, 
                       RawKeyValueIterator rIter,
-                      org.apache.hadoop.mapreduce.Counter inputCounter,
+                      org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+                      org.apache.hadoop.mapreduce.Counter inputValueCounter,
                       org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, 
                       org.apache.hadoop.mapreduce.OutputCommitter committer,
                       org.apache.hadoop.mapreduce.StatusReporter reporter,
                       RawComparator<INKEY> comparator,
                       Class<INKEY> keyClass, Class<INVALUE> valueClass
-  ) throws IOException, ClassNotFoundException {
-    try {
+  ) throws IOException, InterruptedException {
+    org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
+    reduceContext = 
+      new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, 
+                                                              rIter, 
+                                                              inputKeyCounter, 
+                                                              inputValueCounter, 
+                                                              output, 
+                                                              committer, 
+                                                              reporter, 
+                                                              comparator, 
+                                                              keyClass, 
+                                                              valueClass);
+
+    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
+        reducerContext = 
+          new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
+              reduceContext);
 
-      return contextConstructor.newInstance(reducer, job, taskId,
-                                            rIter, inputCounter, output, 
-                                            committer, reporter, comparator, 
-                                            keyClass, valueClass);
-    } catch (InstantiationException e) {
-      throw new IOException("Can't create Context", e);
-    } catch (InvocationTargetException e) {
-      throw new IOException("Can't invoke Context constructor", e);
-    } catch (IllegalAccessException e) {
-      throw new IOException("Can't invoke Context constructor", e);
-    }
+    return reducerContext;
   }
 
   protected static abstract class CombinerRunner<K,V> {
@@ -1079,7 +1203,7 @@
       }
       // make a task context so we can get the classes
       org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-        new org.apache.hadoop.mapreduce.TaskAttemptContext(job, taskId);
+        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId);
       Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = 
         (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
            taskContext.getCombinerClass();
@@ -1188,13 +1312,12 @@
           ReflectionUtils.newInstance(reducerClass, job);
       org.apache.hadoop.mapreduce.Reducer.Context 
            reducerContext = createReduceContext(reducer, job, taskId,
-                                                iterator, inputCounter, 
+                                                iterator, null, inputCounter, 
                                                 new OutputConverter(collector),
                                                 committer,
                                                 reporter, comparator, keyClass,
                                                 valueClass);
       reducer.run(reducerContext);
-    }
-    
+    } 
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -24,39 +25,12 @@
  *   instead.
  */
 @Deprecated
-public class TaskAttemptContext 
+public interface TaskAttemptContext 
        extends org.apache.hadoop.mapreduce.TaskAttemptContext {
-  private Progressable progress;
 
-  TaskAttemptContext(JobConf conf, TaskAttemptID taskid) {
-    this(conf, taskid, Reporter.NULL);
-  }
-  
-  TaskAttemptContext(JobConf conf, TaskAttemptID taskid,
-                     Progressable progress) {
-    super(conf, taskid);
-    this.progress = progress;
-  }
-  
-  /**
-   * Get the taskAttemptID.
-   *  
-   * @return TaskAttemptID
-   */
-  public TaskAttemptID getTaskAttemptID() {
-    return (TaskAttemptID) super.getTaskAttemptID();
-  }
-  
-  public Progressable getProgressible() {
-    return progress;
-  }
-  
-  public JobConf getJobConf() {
-    return (JobConf) getConfiguration();
-  }
+  public TaskAttemptID getTaskAttemptID();
 
-  @Override
-  public void progress() {
-    progress.progress();
-  }
+  public Progressable getProgressible();
+  
+  public JobConf getJobConf();
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Sat Nov 28 20:26:01 2009
@@ -18,35 +18,25 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
 /**
  * This is used to track task completion events on 
  * job tracker. 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.TaskCompletionEvent} instead
  */
-public class TaskCompletionEvent implements Writable{
+@Deprecated
+public class TaskCompletionEvent 
+    extends org.apache.hadoop.mapreduce.TaskCompletionEvent {
   static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED};
-    
-  private int eventId; 
-  private String taskTrackerHttp;
-  private int taskRunTime; // using int since runtime is the time difference
-  private TaskAttemptID taskId;
-  Status status; 
-  boolean isMap = false;
-  private int idWithinJob;
+  
   public static final TaskCompletionEvent[] EMPTY_ARRAY = 
-    new TaskCompletionEvent[0];
+	    new TaskCompletionEvent[0];
   /**
    * Default constructor for Writable.
    *
    */
-  public TaskCompletionEvent(){
-    taskId = new TaskAttemptID();
+  public TaskCompletionEvent() {
+    super();
   }
 
   /**
@@ -64,20 +54,16 @@
                              boolean isMap,
                              Status status, 
                              String taskTrackerHttp){
-      
-    this.taskId = taskId;
-    this.idWithinJob = idWithinJob;
-    this.isMap = isMap;
-    this.eventId = eventId; 
-    this.status =status; 
-    this.taskTrackerHttp = taskTrackerHttp;
+    super(eventId, taskId, idWithinJob, isMap, org.apache.hadoop.mapreduce.
+          TaskCompletionEvent.Status.valueOf(status.name()), taskTrackerHttp);
   }
-  /**
-   * Returns event Id. 
-   * @return event id
-   */
-  public int getEventId() {
-    return eventId;
+  
+  static TaskCompletionEvent downgrade(
+    org.apache.hadoop.mapreduce.TaskCompletionEvent event) {
+    return new TaskCompletionEvent(event.getEventId(),
+      TaskAttemptID.downgrade(event.getTaskAttemptId()),event.idWithinJob(),
+      event.isMapTask(), Status.valueOf(event.getStatus().name()),
+      event.getTaskTrackerHttp());
   }
   /**
    * Returns task id. 
@@ -86,7 +72,7 @@
    */
   @Deprecated
   public String getTaskId() {
-    return taskId.toString();
+    return getTaskAttemptId().toString();
   }
   
   /**
@@ -94,7 +80,7 @@
    * @return task id
    */
   public TaskAttemptID getTaskAttemptId() {
-    return taskId;
+    return TaskAttemptID.downgrade(super.getTaskAttemptId());
   }
   
   /**
@@ -102,133 +88,57 @@
    * @return task tracker status
    */
   public Status getTaskStatus() {
-    return status;
-  }
-  /**
-   * http location of the tasktracker where this task ran. 
-   * @return http location of tasktracker user logs
-   */
-  public String getTaskTrackerHttp() {
-    return taskTrackerHttp;
-  }
-
-  /**
-   * Returns time (in millisec) the task took to complete. 
-   */
-  public int getTaskRunTime() {
-    return taskRunTime;
-  }
-
-  /**
-   * Set the task completion time
-   * @param taskCompletionTime time (in millisec) the task took to complete
-   */
-  public void setTaskRunTime(int taskCompletionTime) {
-    this.taskRunTime = taskCompletionTime;
-  }
-
-  /**
-   * set event Id. should be assigned incrementally starting from 0. 
-   * @param eventId
-   */
-  public void setEventId(
-                         int eventId) {
-    this.eventId = eventId;
+    return Status.valueOf(super.getStatus().name());
   }
+  
   /**
    * Sets task id. 
    * @param taskId
-   * @deprecated use {@link #setTaskID(TaskAttemptID)} instead.
+   * @deprecated use {@link #setTaskAttemptId(TaskAttemptID)} instead.
    */
   @Deprecated
   public void setTaskId(String taskId) {
-    this.taskId = TaskAttemptID.forName(taskId);
+    this.setTaskAttemptId(TaskAttemptID.forName(taskId));
   }
   
   /**
    * Sets task id. 
    * @param taskId
    */
-  public void setTaskID(TaskAttemptID taskId) {
-    this.taskId = taskId;
+  protected void setTaskAttemptId(TaskAttemptID taskId) {
+    super.setTaskAttemptId(taskId);
   }
   
   /**
    * Set task status. 
    * @param status
    */
-  public void setTaskStatus(
-                            Status status) {
-    this.status = status;
+  protected void setTaskStatus(Status status) {
+    super.setTaskStatus(org.apache.hadoop.mapreduce.
+      TaskCompletionEvent.Status.valueOf(status.name()));
   }
+  
   /**
-   * Set task tracker http location. 
-   * @param taskTrackerHttp
+   * Set the task completion time
+   * @param taskCompletionTime time (in millisec) the task took to complete
    */
-  public void setTaskTrackerHttp(
-                                 String taskTrackerHttp) {
-    this.taskTrackerHttp = taskTrackerHttp;
-  }
-    
-  @Override
-  public String toString(){
-    StringBuffer buf = new StringBuffer(); 
-    buf.append("Task Id : "); 
-    buf.append(taskId); 
-    buf.append(", Status : ");  
-    buf.append(status.name());
-    return buf.toString();
-  }
-    
-  @Override
-  public boolean equals(Object o) {
-    if(o == null)
-      return false;
-    if(o.getClass().equals(this.getClass())) {
-      TaskCompletionEvent event = (TaskCompletionEvent) o;
-      return this.isMap == event.isMapTask() 
-             && this.eventId == event.getEventId()
-             && this.idWithinJob == event.idWithinJob() 
-             && this.status.equals(event.getTaskStatus())
-             && this.taskId.equals(event.getTaskAttemptId()) 
-             && this.taskRunTime == event.getTaskRunTime()
-             && this.taskTrackerHttp.equals(event.getTaskTrackerHttp());
-    }
-    return false;
+  protected void setTaskRunTime(int taskCompletionTime) {
+    super.setTaskRunTime(taskCompletionTime);
   }
 
-  @Override
-  public int hashCode() {
-    return toString().hashCode(); 
+  /**
+   * set event Id. should be assigned incrementally starting from 0. 
+   * @param eventId
+   */
+  protected void setEventId(int eventId) {
+    super.setEventId(eventId);
   }
 
-  public boolean isMapTask() {
-    return isMap;
-  }
-    
-  public int idWithinJob() {
-    return idWithinJob;
-  }
-  //////////////////////////////////////////////
-  // Writable
-  //////////////////////////////////////////////
-  public void write(DataOutput out) throws IOException {
-    taskId.write(out); 
-    WritableUtils.writeVInt(out, idWithinJob);
-    out.writeBoolean(isMap);
-    WritableUtils.writeEnum(out, status); 
-    WritableUtils.writeString(out, taskTrackerHttp);
-    WritableUtils.writeVInt(out, taskRunTime);
-    WritableUtils.writeVInt(out, eventId);
-  }
-  
-  public void readFields(DataInput in) throws IOException {
-    taskId.readFields(in); 
-    idWithinJob = WritableUtils.readVInt(in);
-    isMap = in.readBoolean();
-    status = WritableUtils.readEnum(in, Status.class);
-    taskTrackerHttp = WritableUtils.readString(in);
-    taskRunTime = WritableUtils.readVInt(in);
-    eventId = WritableUtils.readVInt(in);
+  /**
+   * Set task tracker http location. 
+   * @param taskTrackerHttp
+   */
+  protected void setTaskTrackerHttp(String taskTrackerHttp) {
+    super.setTaskTrackerHttp(taskTrackerHttp);
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java Sat Nov 28 20:26:01 2009
@@ -14,22 +14,23 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Map;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * Controls initialization, finalization and clean up of tasks, and
@@ -38,9 +39,12 @@
  * This class defines the API for initializing, finalizing and cleaning
  * up of tasks, as also the launching and killing task JVMs.
  * Subclasses of this class will implement the logic required for
- * performing the actual actions. 
+ * performing the actual actions.
+ * 
+ * <br/>
  */
-abstract class TaskController implements Configurable {
+@InterfaceAudience.Private
+public abstract class TaskController implements Configurable {
   
   private Configuration conf;
   
@@ -50,21 +54,21 @@
     return conf;
   }
 
-  // The list of directory paths specified in the variable mapred.local.dir.
+  // The list of directory paths specified in the variable Configs.LOCAL_DIR
   // This is used to determine which among the list of directories is picked up
   // for storing data for a particular task.
   protected String[] mapredLocalDirs;
 
   public void setConf(Configuration conf) {
     this.conf = conf;
-    mapredLocalDirs = conf.getStrings("mapred.local.dir");
+    mapredLocalDirs = conf.getStrings(MRConfig.LOCAL_DIR);
   }
 
   /**
    * Sets up the permissions of the following directories on all the configured
    * disks:
    * <ul>
-   * <li>mapred-local directories</li>
+   * <li>mapreduce.cluster.local.directories</li>
    * <li>Job cache directories</li>
    * <li>Archive directories</li>
    * <li>Hadoop log directories</li>
@@ -72,35 +76,14 @@
    */
   void setup() {
     for (String localDir : this.mapredLocalDirs) {
-      // Set up the mapred-local directories.
+      // Set up the mapreduce.cluster.local.directories.
       File mapredlocalDir = new File(localDir);
       if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
-        LOG.warn("Unable to create mapred-local directory : "
+        LOG.warn("Unable to create mapreduce.cluster.local.directory : "
             + mapredlocalDir.getPath());
       } else {
-        PermissionsHandler.setPermissions(mapredlocalDir,
-            PermissionsHandler.sevenFiveFive);
-      }
-
-      // Set up the cache directory used for distributed cache files
-      File distributedCacheDir =
-          new File(localDir, TaskTracker.getDistributedCacheDir());
-      if (!distributedCacheDir.exists() && !distributedCacheDir.mkdirs()) {
-        LOG.warn("Unable to create cache directory : "
-            + distributedCacheDir.getPath());
-      } else {
-        PermissionsHandler.setPermissions(distributedCacheDir,
-            PermissionsHandler.sevenFiveFive);
-      }
-
-      // Set up the jobcache directory
-      File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
-      if (!jobCacheDir.exists() && !jobCacheDir.mkdirs()) {
-        LOG.warn("Unable to create job cache directory : "
-            + jobCacheDir.getPath());
-      } else {
-        PermissionsHandler.setPermissions(jobCacheDir,
-            PermissionsHandler.sevenFiveFive);
+        Localizer.PermissionsHandler.setPermissions(mapredlocalDir,
+            Localizer.PermissionsHandler.sevenFiveFive);
       }
     }
 
@@ -109,8 +92,8 @@
     if (!taskLog.exists() && !taskLog.mkdirs()) {
       LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
     } else {
-      PermissionsHandler.setPermissions(taskLog,
-          PermissionsHandler.sevenFiveFive);
+      Localizer.PermissionsHandler.setPermissions(taskLog,
+          Localizer.PermissionsHandler.sevenFiveFive);
     }
   }
 
@@ -124,6 +107,17 @@
   abstract void initializeJob(JobInitializationContext context) throws IOException;
 
   /**
+   * Take task-controller specific actions to initialize the distributed cache
+   * files. This involves setting appropriate permissions for these files so as
+   * to secure them to be accessible only their owners.
+   * 
+   * @param context
+   * @throws IOException
+   */
+  public abstract void initializeDistributedCache(InitializationContext context)
+      throws IOException;
+
+  /**
    * Launch a task JVM
    * 
    * This method defines how a JVM will be launched to run a task. Each
@@ -178,12 +172,14 @@
   abstract void initializeTask(TaskControllerContext context)
       throws IOException;
 
+  static class TaskExecContext {
+    // task being executed
+    Task task;
+  }
   /**
    * Contains task information required for the task controller.  
    */
-  static class TaskControllerContext {
-    // task being executed
-    Task task;
+  static class TaskControllerContext extends TaskExecContext {
     ShellCommandExecutor shExec;     // the Shell executor executing the JVM for this task.
 
     // Information used only when this context is used for launching new tasks.
@@ -194,10 +190,23 @@
     long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
   }
 
-  static class JobInitializationContext {
+  /**
+   * NOTE: This class is internal only class and not intended for users!!
+   * 
+   */
+  public static class InitializationContext {
+    public File workDir;
+    public String user;
+  }
+
+  static class JobInitializationContext extends InitializationContext {
     JobID jobid;
+  }
+  
+  static class DebugScriptContext extends TaskExecContext {
+    List<String> args;
     File workDir;
-    String user;
+    File stdout;
   }
 
   /**
@@ -214,4 +223,23 @@
    * @param context task context
    */
   abstract void killTask(TaskControllerContext context);
+
+  /**
+   * Initialize user on this TaskTracer in a TaskController specific manner.
+   * 
+   * @param context
+   * @throws IOException
+   */
+  public abstract void initializeUser(InitializationContext context)
+      throws IOException;
+  
+  /**
+   * Launch the task debug script
+   * 
+   * @param context
+   * @throws IOException
+   */
+  abstract void runDebugScript(DebugScriptContext context) 
+      throws IOException;
+  
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java Sat Nov 28 20:26:01 2009
@@ -32,10 +32,12 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
 import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
 import org.apache.hadoop.net.Node;
 
 
@@ -63,10 +65,11 @@
 
   // Defines the TIP
   private String jobFile = null;
-  private RawSplit rawSplit;
+  private Job.RawSplit rawSplit;
   private int numMaps;
   private int partition;
   private JobTracker jobtracker;
+  private JobHistory jobHistory;
   private TaskID id;
   private JobInProgress job;
   private final int numSlotsRequired;
@@ -110,7 +113,7 @@
   /**
    * Map from taskId -> TaskStatus
    */
-  private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
+  TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
     new TreeMap<TaskAttemptID,TaskStatus>();
 
   // Map from taskId -> TaskTracker Id, 
@@ -137,7 +140,7 @@
    * Constructor for MapTask
    */
   public TaskInProgress(JobID jobid, String jobFile, 
-                        RawSplit rawSplit, 
+                        Job.RawSplit rawSplit, 
                         JobTracker jobtracker, JobConf conf, 
                         JobInProgress job, int partition,
                         int numSlotsRequired) {
@@ -151,6 +154,9 @@
     this.numSlotsRequired = numSlotsRequired;
     setMaxTaskAttempts();
     init(jobid);
+    if (jobtracker != null) {
+      this.jobHistory = jobtracker.getJobHistory();
+    }
   }
         
   /**
@@ -170,6 +176,9 @@
     this.numSlotsRequired = numSlotsRequired;
     setMaxTaskAttempts();
     init(jobid);
+    if (jobtracker != null) {
+      this.jobHistory = jobtracker.getJobHistory();
+    }
   }
   
   /**
@@ -287,7 +296,8 @@
    */
   public void setExecFinishTime(long finishTime) {
     execFinishTime = finishTime;
-    JobHistory.Task.logUpdates(id, execFinishTime); // log the update
+    TaskUpdatedEvent tue = new TaskUpdatedEvent(id, execFinishTime);
+    jobHistory.logEvent(tue, id.getJobID());
   }
   
   /**
@@ -975,6 +985,8 @@
   public Task addRunningTask(TaskAttemptID taskid, 
                              String taskTracker,
                              boolean taskCleanup) {
+    // 1 slot is enough for taskCleanup task
+    int numSlotsNeeded = taskCleanup ? 1 : numSlotsRequired;
     // create the task
     Task t = null;
     if (isMapTask()) {
@@ -989,9 +1001,9 @@
         split = new BytesWritable();
       }
       t = new MapTask(jobFile, taskid, partition, splitClass, split,
-                      numSlotsRequired);
+                      numSlotsNeeded);
     } else {
-      t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsRequired);
+      t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
     }
     if (jobCleanup) {
       t.setJobCleanupTask();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskLog.java Sat Nov 28 20:26:01 2009
@@ -39,7 +39,7 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
@@ -59,13 +59,9 @@
   private static final File LOG_DIR = 
     new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
   
+  // localFS is set in (and used by) writeToIndexFile()
   static LocalFileSystem localFS = null;
   static {
-    try {
-      localFS = FileSystem.getLocal(new Configuration());
-    } catch (IOException ioe) {
-      LOG.warn("Getting local file system failed.");
-    }
     if (!LOG_DIR.exists()) {
       boolean b = LOG_DIR.mkdirs();
       if (!b) {
@@ -200,6 +196,10 @@
     File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
     Path indexFilePath = new Path(indexFile.getAbsolutePath());
     Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
+
+    if (localFS == null) {// set localFS once
+      localFS = FileSystem.getLocal(new Configuration());
+    }
     localFS.rename (tmpIndexFilePath, indexFilePath);
   }
   private static void resetPrevLengths(TaskAttemptID firstTaskid) {
@@ -395,7 +395,7 @@
    * @return the number of bytes to cap the log files at
    */
   public static long getTaskLogLength(JobConf conf) {
-    return conf.getLong("mapred.userlog.limit.kb", 0) * 1024;
+    return conf.getLong(JobContext.TASK_USERLOG_LIMIT, 0) * 1024;
   }
 
   /**
@@ -561,6 +561,37 @@
   }
   
   /**
+   * Construct the command line for running the debug script
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @return the command line as a String
+   * @throws IOException
+   */
+  static String buildDebugScriptCommandLine(List<String> cmd, String debugout)
+  throws IOException {
+    StringBuilder mergedCmd = new StringBuilder();
+    mergedCmd.append("exec ");
+    boolean isExecutable = true;
+    for(String s: cmd) {
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the  
+        // shell to find it.
+        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false; 
+      } else {
+        mergedCmd.append(s);
+      }
+      mergedCmd.append(" ");
+    }
+    mergedCmd.append(" < /dev/null ");
+    mergedCmd.append(" >");
+    mergedCmd.append(debugout);
+    mergedCmd.append(" 2>&1 ");
+    return mergedCmd.toString();
+  }
+  /**
    * Add quotes to each of the command strings and
    * return as a single string 
    * @param cmd The command to be quoted
@@ -604,25 +635,7 @@
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add("-c");
-    StringBuffer mergedCmd = new StringBuffer();
-    mergedCmd.append("exec ");
-    boolean isExecutable = true;
-    for(String s: cmd) {
-      if (isExecutable) {
-        // the executable name needs to be expressed as a shell path for the  
-        // shell to find it.
-        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
-        isExecutable = false; 
-      } else {
-        mergedCmd.append(s);
-      }
-      mergedCmd.append(" ");
-    }
-    mergedCmd.append(" < /dev/null ");
-    mergedCmd.append(" >");
-    mergedCmd.append(debugout);
-    mergedCmd.append(" 2>&1 ");
-    result.add(mergedCmd.toString());
+    result.add(buildDebugScriptCommandLine(cmd, debugout));
     return result;
   }
   

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Sat Nov 28 20:26:01 2009
@@ -29,8 +29,9 @@
 
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
-import org.apache.hadoop.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -50,11 +51,14 @@
   private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
   private List<TaskAttemptID> tasksToBeRemoved;
 
+  private static final String MEMORY_USAGE_STRING =
+    "Memory usage of ProcessTree %s for task-id %s : %d bytes, " +
+      "limit : %d bytes";
+  
   public TaskMemoryManagerThread(TaskTracker taskTracker) {
     this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
-            taskTracker.getJobConf().getLong(
-                "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
-                5000L));         
+      taskTracker.getJobConf().getLong(
+        TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL, 5000L));         
     this.taskTracker = taskTracker;
   }
 
@@ -179,7 +183,7 @@
                   taskTracker
                       .getJobConf()
                       .getLong(
-                          "mapred.tasktracker.tasks.sleeptime-before-sigkill",
+                          TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
                           ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
 
               // create process tree object
@@ -209,18 +213,19 @@
           // are processes more than 1 iteration old.
           long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
           long limit = ptInfo.getMemLimit();
-          LOG.info("Memory usage of ProcessTree " + pId + " :"
-              + currentMemUsage + "bytes. Limit : " + limit + "bytes");
+          LOG.info(String.format(MEMORY_USAGE_STRING, 
+                                pId, tid.toString(), currentMemUsage, limit));
 
           if (isProcessTreeOverLimit(tid.toString(), currentMemUsage, 
                                       curMemUsageOfAgedProcesses, limit)) {
             // Task (the root process) is still alive and overflowing memory.
-            // Clean up.
+            // Dump the process-tree and then clean it up.
             String msg =
                 "TaskTree [pid=" + pId + ",tipID=" + tid
                     + "] is running beyond memory-limits. Current usage : "
                     + currentMemUsage + "bytes. Limit : " + limit
-                    + "bytes. Killing task.";
+                    + "bytes. Killing task. \nDump of the process-tree for "
+                    + tid + " : \n" + pTree.getProcessTreeDump();
             LOG.warn(msg);
             taskTracker.cleanUpOverMemoryTask(tid, true, msg);
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskReport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskReport.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskReport.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskReport.java Sat Nov 28 20:26:01 2009
@@ -17,33 +17,18 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-/** A report on the state of a task. */
-public class TaskReport implements Writable {
-  private TaskID taskid;
-  private float progress;
-  private String state;
-  private String[] diagnostics;
-  private long startTime; 
-  private long finishTime; 
-  private Counters counters;
-  private TIPStatus currentStatus;
-  
-  private Collection<TaskAttemptID> runningAttempts = 
-    new ArrayList<TaskAttemptID>();
-  private TaskAttemptID successfulAttempt = new TaskAttemptID();
+/** A report on the state of a task. 
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.TaskReport} instead
+ **/
+@Deprecated
+public class TaskReport extends org.apache.hadoop.mapreduce.TaskReport {
+  
   public TaskReport() {
-    taskid = new TaskID();
+    super();
   }
   
   /**
@@ -80,160 +65,83 @@
              String[] diagnostics, TIPStatus currentStatus, 
              long startTime, long finishTime,
              Counters counters) {
-    this.taskid = taskid;
-    this.progress = progress;
-    this.state = state;
-    this.diagnostics = diagnostics;
-    this.currentStatus = currentStatus;
-    this.startTime = startTime; 
-    this.finishTime = finishTime;
-    this.counters = counters;
-  }
-    
-  /** @deprecated use {@link #getTaskID()} instead */
-  @Deprecated
-  public String getTaskId() { return taskid.toString(); }
-  /** The id of the task. */
-  public TaskID getTaskID() { return taskid; }
-  /** The amount completed, between zero and one. */
-  public float getProgress() { return progress; }
-  /** The most recent state, reported by a {@link Reporter}. */
-  public String getState() { return state; }
-  /** A list of error messages. */
-  public String[] getDiagnostics() { return diagnostics; }
-  /** A table of counters. */
-  public Counters getCounters() { return counters; }
-  /** The current status */
-  public TIPStatus getCurrentStatus() {
-    return currentStatus;
+    super(taskid, progress, state, diagnostics, currentStatus, startTime,
+      finishTime, new org.apache.hadoop.mapreduce.Counters(counters));
   }
   
-  /**
-   * Get finish time of task. 
-   * @return 0, if finish time was not set else returns finish time.
-   */
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  /** 
-   * set finish time of task. 
-   * @param finishTime finish time of task. 
-   */
-  void setFinishTime(long finishTime) {
-    this.finishTime = finishTime;
+  static TaskReport downgrade(
+      org.apache.hadoop.mapreduce.TaskReport report) {
+    return new TaskReport(TaskID.downgrade(report.getTaskId()),
+      report.getProgress(), report.getState(), report.getDiagnostics(),
+      report.getCurrentStatus(), report.getStartTime(), report.getFinishTime(),
+      Counters.downgrade(report.getTaskCounters()));
   }
-
-  /**
-   * Get start time of task. 
-   * @return 0 if start time was not set, else start time. 
-   */
-  public long getStartTime() {
-    return startTime;
+  
+  static TaskReport[] downgradeArray(org.apache.hadoop.
+      mapreduce.TaskReport[] reports) {
+    List<TaskReport> ret = new ArrayList<TaskReport>();
+    for (org.apache.hadoop.mapreduce.TaskReport report : reports) {
+      ret.add(downgrade(report));
+    }
+    return ret.toArray(new TaskReport[0]);
   }
-
-  /** 
-   * set start time of the task. 
-   */ 
-  void setStartTime(long startTime) {
-    this.startTime = startTime;
+  
+  /** The id of the task. */
+  public TaskID getTaskID() { return TaskID.downgrade(super.getTaskId()); }
+  
+  public Counters getCounters() { 
+    return Counters.downgrade(super.getTaskCounters()); 
   }
-
+  
   /** 
    * set successful attempt ID of the task. 
    */ 
   public void setSuccessfulAttempt(TaskAttemptID t) {
-    successfulAttempt = t;
+    super.setSuccessfulAttemptId(t);
   }
   /**
    * Get the attempt ID that took this task to completion
    */
   public TaskAttemptID getSuccessfulTaskAttempt() {
-    return successfulAttempt;
+    return TaskAttemptID.downgrade(super.getSuccessfulTaskAttemptId());
   }
   /** 
    * set running attempt(s) of the task. 
    */ 
   public void setRunningTaskAttempts(
       Collection<TaskAttemptID> runningAttempts) {
-    this.runningAttempts = runningAttempts;
+    Collection<org.apache.hadoop.mapreduce.TaskAttemptID> attempts = 
+      new ArrayList<org.apache.hadoop.mapreduce.TaskAttemptID>();
+    for (TaskAttemptID id : runningAttempts) {
+      attempts.add(id);
+    }
+    super.setRunningTaskAttemptIds(attempts);
   }
   /**
    * Get the running task attempt IDs for this task
    */
   public Collection<TaskAttemptID> getRunningTaskAttempts() {
-    return runningAttempts;
-  }
-
-
-  @Override
-  public boolean equals(Object o) {
-    if(o == null)
-      return false;
-    if(o.getClass().equals(this.getClass())) {
-      TaskReport report = (TaskReport) o;
-      return counters.equals(report.getCounters())
-             && Arrays.toString(this.diagnostics)
-                      .equals(Arrays.toString(report.getDiagnostics()))
-             && this.finishTime == report.getFinishTime()
-             && this.progress == report.getProgress()
-             && this.startTime == report.getStartTime()
-             && this.state.equals(report.getState())
-             && this.taskid.equals(report.getTaskID());
+    Collection<TaskAttemptID> attempts = new ArrayList<TaskAttemptID>();
+    for (org.apache.hadoop.mapreduce.TaskAttemptID id : 
+         super.getRunningTaskAttemptIds()) {
+      attempts.add(TaskAttemptID.downgrade(id));
     }
-    return false; 
+    return attempts;
   }
-
-  @Override
-  public int hashCode() {
-    return (counters.toString() + Arrays.toString(this.diagnostics) 
-            + this.finishTime + this.progress + this.startTime + this.state 
-            + this.taskid.toString()).hashCode();
-  }
-  //////////////////////////////////////////////
-  // Writable
-  //////////////////////////////////////////////
-  public void write(DataOutput out) throws IOException {
-    taskid.write(out);
-    out.writeFloat(progress);
-    Text.writeString(out, state);
-    out.writeLong(startTime);
-    out.writeLong(finishTime);
-    WritableUtils.writeStringArray(out, diagnostics);
-    counters.write(out);
-    WritableUtils.writeEnum(out, currentStatus);
-    if (currentStatus == TIPStatus.RUNNING) {
-      WritableUtils.writeVInt(out, runningAttempts.size());
-      TaskAttemptID t[] = new TaskAttemptID[0];
-      t = runningAttempts.toArray(t);
-      for (int i = 0; i < t.length; i++) {
-        t[i].write(out);
-      }
-    } else if (currentStatus == TIPStatus.COMPLETE) {
-      successfulAttempt.write(out);
-    }
+  
+  /** 
+   * set finish time of task. 
+   * @param finishTime finish time of task. 
+   */
+  protected void setFinishTime(long finishTime) {
+    super.setFinishTime(finishTime);
   }
 
-  public void readFields(DataInput in) throws IOException {
-    this.taskid.readFields(in);
-    this.progress = in.readFloat();
-    this.state = Text.readString(in);
-    this.startTime = in.readLong(); 
-    this.finishTime = in.readLong();
-    
-    diagnostics = WritableUtils.readStringArray(in);
-    counters = new Counters();
-    counters.readFields(in);
-    currentStatus = WritableUtils.readEnum(in, TIPStatus.class);
-    if (currentStatus == TIPStatus.RUNNING) {
-      int num = WritableUtils.readVInt(in);    
-      for (int i = 0; i < num; i++) {
-        TaskAttemptID t = new TaskAttemptID();
-        t.readFields(in);
-        runningAttempts.add(t);
-      }
-    } else if (currentStatus == TIPStatus.COMPLETE) {
-      successfulAttempt.readFields(in);
-    }
+  /** 
+   * set start time of the task. 
+   */ 
+  protected void setStartTime(long startTime) {
+    super.setStartTime(startTime);
   }
+
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java Sat Nov 28 20:26:01 2009
@@ -32,15 +32,18 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
+import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
@@ -141,6 +144,7 @@
    *             set via {@link JobConf#MAPRED_MAP_TASK_ENV} or
    *             {@link JobConf#MAPRED_REDUCE_TASK_ENV}
    */
+  @Deprecated
   public String getChildEnv(JobConf jobConf) {
     return jobConf.get(JobConf.MAPRED_TASK_ENV);
   }
@@ -160,21 +164,26 @@
       //before preparing the job localize 
       //all the archives
       TaskAttemptID taskid = t.getTaskID();
-      LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+      LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
       File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
 
       // We don't create any symlinks yet, so presence/absence of workDir
       // actually on the file system doesn't matter.
       taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
           .newTaskDistributedCacheManager(conf);
-      taskDistributedCacheManager.setup(
-          lDirAlloc, workDir, TaskTracker.getDistributedCacheDir());
+      taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
+          .getDistributedCacheDir(conf.getUser()));
 
       // Set up the child task's configuration. After this call, no localization
       // of files should happen in the TaskTracker's process space. Any changes to
       // the conf object after this will NOT be reflected to the child.
       setupChildTaskConfiguration(lDirAlloc);
 
+      InitializationContext context = new InitializationContext();
+      context.user = conf.getUser();
+      context.workDir = new File(conf.get(TaskTracker.JOB_LOCAL_DIR));
+      tracker.getTaskController().initializeDistributedCache(context);
+
       if (!prepare()) {
         return;
       }
@@ -202,7 +211,8 @@
           stderr);
 
       Map<String, String> env = new HashMap<String, String>();
-      errorInfo = getVMEnvironment(errorInfo, workDir, conf, env);
+      errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
+                                   taskid, logSize);
 
       jvmManager.launchJvm(this, 
           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
@@ -246,7 +256,12 @@
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
-      tip.reportTaskFinished();
+
+      // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
+      // *false* since the task has either
+      // a) SUCCEEDED - which means commit has been done
+      // b) FAILED - which means we do not need to commit
+      tip.reportTaskFinished(false);
     }
   }
 
@@ -265,8 +280,8 @@
     if (!b) {
       LOG.warn("mkdirs failed. Ignoring");
     } else {
-      PermissionsHandler.setPermissions(logDir,
-          PermissionsHandler.sevenZeroZero);
+      Localizer.PermissionsHandler.setPermissions(logDir,
+          Localizer.PermissionsHandler.sevenZeroZero);
     }
     return logFiles;
   }
@@ -282,9 +297,9 @@
       throws IOException {
 
     Path localTaskFile =
-        lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(t
-            .getJobID().toString(), t.getTaskID().toString(), t
-            .isTaskCleanupTask()), conf);
+        lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(
+            t.getUser(), t.getJobID().toString(), t.getTaskID().toString(), t
+                .isTaskCleanupTask()), conf);
 
     // write the child's task configuration file to the local disk
     writeLocalTaskFile(localTaskFile.toString(), conf);
@@ -433,8 +448,8 @@
       JobConf conf)
       throws IOException {
 
-    // add java.io.tmpdir given by mapred.child.tmp
-    String tmp = conf.get("mapred.child.tmp", "./tmp");
+    // add java.io.tmpdir given by mapreduce.task.tmp.dir
+    String tmp = conf.get(JobContext.TASK_TEMP_DIR, "./tmp");
     Path tmpDir = new Path(tmp);
 
     // if temp directory path is not absolute, prepend it with workDir.
@@ -471,6 +486,7 @@
   }
 
   /**
+   * sets the environment variables needed for task jvm and its children.
    * @param errorInfo
    * @param workDir
    * @param env
@@ -478,7 +494,7 @@
    * @throws Throwable
    */
   private String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
-      Map<String, String> env)
+      Map<String, String> env, TaskAttemptID taskid, long logSize)
       throws Throwable {
     StringBuffer ldLibraryPath = new StringBuffer();
     ldLibraryPath.append(workDir.toString());
@@ -490,6 +506,23 @@
     }
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
     
+    // put jobTokenFile name into env
+    String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+    LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
+    env.put("JOB_TOKEN_FILE", jobTokenFile);
+    
+    // for the child of task jvm, set hadoop.root.logger
+    env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
+    String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+    if (hadoopClientOpts == null) {
+      hadoopClientOpts = "";
+    } else {
+      hadoopClientOpts = hadoopClientOpts + " ";
+    }
+    hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+                       + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+    env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
+
     // add the env variables passed by the user
     String mapredChildEnv = getChildEnv(conf);
     if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
@@ -550,25 +583,26 @@
   }
 
   /**
-   * Prepare the mapred.local.dir for the child. The child is sand-boxed now.
+   * Prepare the Configs.LOCAL_DIR for the child. The child is sand-boxed now.
    * Whenever it uses LocalDirAllocator from now on inside the child, it will
    * only see files inside the attempt-directory. This is done in the Child's
    * process space.
    */
   static void setupChildMapredLocalDirs(Task t, JobConf conf) {
-    String[] localDirs = conf.getStrings("mapred.local.dir");
+    String[] localDirs = conf.getStrings(MRConfig.LOCAL_DIR);
     String jobId = t.getJobID().toString();
     String taskId = t.getTaskID().toString();
     boolean isCleanup = t.isTaskCleanupTask();
+    String user = t.getUser();
     StringBuffer childMapredLocalDir =
         new StringBuffer(localDirs[0] + Path.SEPARATOR
-            + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+            + TaskTracker.getLocalTaskDir(user, jobId, taskId, isCleanup));
     for (int i = 1; i < localDirs.length; i++) {
       childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
-          + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+          + TaskTracker.getLocalTaskDir(user, jobId, taskId, isCleanup));
     }
-    LOG.debug("mapred.local.dir for child : " + childMapredLocalDir);
-    conf.set("mapred.local.dir", childMapredLocalDir.toString());
+    LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
+    conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
   }
 
   /** Creates the working directory pathname for a task attempt. */ 
@@ -576,8 +610,9 @@
       TaskAttemptID task, boolean isCleanup, JobConf conf) 
       throws IOException {
     Path workDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
-            .getJobID().toString(), task.toString(), isCleanup), conf);
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+            conf.getUser(), task.getJobID().toString(), task.toString(),
+            isCleanup), conf);
 
     return new File(workDir.toString());
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskScheduler.java Sat Nov 28 20:26:01 2009
@@ -91,5 +91,60 @@
    * @return
    */
   public abstract Collection<JobInProgress> getJobs(String queueName);
-    
+
+  /**
+   * Abstract QueueRefresher class. Scheduler's can extend this and return an
+   * instance of this in the {@link #getQueueRefresher()} method. The
+   * {@link #refreshQueues(List)} method of this instance will be invoked by the
+   * {@link QueueManager} whenever it gets a request from an administrator to
+   * refresh its own queue-configuration. This method has a documented contract
+   * between the {@link QueueManager} and the {@link TaskScheduler}.
+   */
+  abstract class QueueRefresher {
+
+    /**
+     * Refresh the queue-configuration in the scheduler. This method has the
+     * following contract.
+     * <ol>
+     * <li>Before this method, {@link QueueManager} does a validation of the new
+     * queue-configuration. For e.g, currently addition of new queues, or
+     * removal of queues at any level in the hierarchy is not supported by
+     * {@link QueueManager} and so are not supported for schedulers too.</li>
+     * <li>Schedulers will be passed a list of {@link JobQueueInfo}s of the root
+     * queues i.e. the queues at the top level. All the descendants are properly
+     * linked from these top-level queues.</li>
+     * <li>Schedulers should use the scheduler specific queue properties from
+     * the newRootQueues, validate the properties themselves and apply them
+     * internally.</li>
+     * <li>
+     * Once the method returns successfully from the schedulers, it is assumed
+     * that the refresh of queue properties is successful throughout and will be
+     * 'committed' internally to {@link QueueManager} too. It is guaranteed that
+     * at no point, after successful return from the scheduler, is the queue
+     * refresh in QueueManager failed. If ever, such abnormalities happen, the
+     * queue framework will be inconsistent and will need a JT restart.</li>
+     * <li>If scheduler throws an exception during {@link #refreshQueues()},
+     * {@link QueueManager} throws away the newly read configuration, retains
+     * the old (consistent) configuration and informs the request issuer about
+     * the error appropriately.</li>
+     * </ol>
+     * 
+     * @param newRootQueues
+     */
+    abstract void refreshQueues(List<JobQueueInfo> newRootQueues)
+        throws Throwable;
+  }
+
+  /**
+   * Get the {@link QueueRefresher} for this scheduler. By default, no
+   * {@link QueueRefresher} exists for a scheduler and is set to null.
+   * Schedulers need to return an instance of {@link QueueRefresher} if they
+   * wish to refresh their queue-configuration when {@link QueueManager}
+   * refreshes its own queue-configuration via an administrator request.
+   * 
+   * @return
+   */
+  QueueRefresher getQueueRefresher() {
+    return null;
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskStatus.java Sat Nov 28 20:26:01 2009
@@ -27,12 +27,13 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringUtils;
 /**************************************************
  * Describes the current status of a task.  This is
  * not intended to be a comprehensive piece of data.
  *
  **************************************************/
-abstract class TaskStatus implements Writable, Cloneable {
+public abstract class TaskStatus implements Writable, Cloneable {
   static final Log LOG =
     LogFactory.getLog(TaskStatus.class.getName());
   
@@ -132,11 +133,21 @@
   }
 
   /**
-   * Sets finishTime. 
+   * Sets finishTime for the task status if and only if the
+   * start time is set and passed finish time is greater than
+   * zero.
+   * 
    * @param finishTime finish time of task.
    */
   void setFinishTime(long finishTime) {
-    this.finishTime = finishTime;
+    if(this.getStartTime() > 0 && finishTime > 0) {
+      this.finishTime = finishTime;
+    } else {
+      //Using String utils to get the stack trace.
+      LOG.error("Trying to set finish time for task " + taskid + 
+          " when no start time is set, stackTrace is : " + 
+      		StringUtils.stringifyException(new Exception()));
+    }
   }
   /**
    * Get shuffle finish time for the task. If shuffle finish time was 
@@ -201,11 +212,20 @@
   }
 
   /**
-   * Set startTime of the task.
+   * Set startTime of the task if start time is greater than zero.
    * @param startTime start time
    */
   void setStartTime(long startTime) {
-    this.startTime = startTime;
+    //Making the assumption of passed startTime to be a positive
+    //long value explicit.
+    if (startTime > 0) {
+      this.startTime = startTime;
+    } else {
+      //Using String utils to get the stack trace.
+      LOG.error("Trying to set illegal startTime for task : " + taskid +
+          ".Stack trace is : " +
+          StringUtils.stringifyException(new Exception()));
+    }
   }
   /**
    * Get current phase of this task. Phase.Map in case of map tasks, 
@@ -219,7 +239,7 @@
    * Set current phase of this task.  
    * @param phase phase of this task
    */
-  void setPhase(Phase phase){
+  public void setPhase(Phase phase){
     TaskStatus.Phase oldPhase = getPhase();
     if (oldPhase != phase){
       // sort phase started
@@ -294,7 +314,7 @@
    *  
    * @param mapTaskId map from which fetch failed
    */
-  synchronized void addFetchFailedMap(TaskAttemptID mapTaskId) {}
+  public abstract void addFetchFailedMap(TaskAttemptID mapTaskId);
 
   /**
    * Update the status of the task.
@@ -326,11 +346,11 @@
 
     setDiagnosticInfo(status.getDiagnosticInfo());
     
-    if (status.getStartTime() != 0) {
-      this.startTime = status.getStartTime(); 
+    if (status.getStartTime() > 0) {
+      this.setStartTime(status.getStartTime()); 
     }
-    if (status.getFinishTime() != 0) {
-      this.finishTime = status.getFinishTime(); 
+    if (status.getFinishTime() > 0) {
+      this.setFinishTime(status.getFinishTime()); 
     }
     
     this.phase = status.getPhase();
@@ -359,8 +379,8 @@
     setProgress(progress);
     setStateString(state);
     setPhase(phase);
-    if (finishTime != 0) {
-      this.finishTime = finishTime; 
+    if (finishTime > 0) {
+      setFinishTime(finishTime); 
     }
   }