You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/12/15 23:21:35 UTC

svn commit: r726850 [3/4] - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/core/org/apache/hadoop/util/ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/h...

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java Mon Dec 15 14:21:32 2008
@@ -18,10 +18,167 @@
 
 package org.apache.hadoop.mapreduce;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
 /**
  * A group of {@link Counter}s that logically belong together. Typically,
  * it is an {@link Enum} subclass and the counters are the values.
  */
-public abstract class CounterGroup implements Iterable<Counter> {
-  abstract public String getName();
+public class CounterGroup implements Writable, Iterable<Counter> {
+  private String name;
+  private String displayName;
+  private TreeMap<String, Counter> counters = new TreeMap<String, Counter>();
+  // Optional ResourceBundle for localization of group and counter names.
+  private ResourceBundle bundle = null;    
+  
+  /**
+   * Returns the specified resource bundle, or throws an exception.
+   * @throws MissingResourceException if the bundle isn't found
+   */
+  private static ResourceBundle getResourceBundle(String enumClassName) {
+    String bundleName = enumClassName.replace('$','_');
+    return ResourceBundle.getBundle(bundleName);
+  }
+
+  protected CounterGroup(String name) {
+    this.name = name;
+    try {
+      bundle = getResourceBundle(name);
+    }
+    catch (MissingResourceException neverMind) {
+    }
+    displayName = localize("CounterGroupName", name);
+  }
+  
+  protected CounterGroup(String name, String displayName) {
+    this.name = name;
+    this.displayName = displayName;
+  }
+ 
+  /**
+   * Get the internal name of the group
+   * @return the internal name
+   */
+  public synchronized String getName() {
+    return name;
+  }
+  
+  /**
+   * Get the display name of the group.
+   * @return the human readable name
+   */
+  public synchronized String getDisplayName() {
+    return displayName;
+  }
+
+  synchronized void addCounter(Counter counter) {
+    counters.put(counter.getName(), counter);
+  }
+
+  /**
+   * Internal to find a counter in a group.
+   * @param counterName the name of the counter
+   * @param displayName the display name of the counter
+   * @return the counter that was found or added
+   */
+  protected Counter findCounter(String counterName, String displayName) {
+    Counter result = counters.get(counterName);
+    if (result == null) {
+      result = new Counter(counterName, displayName);
+      counters.put(counterName, result);
+    }
+    return result;
+  }
+
+  public synchronized Counter findCounter(String counterName) {
+    Counter result = counters.get(counterName);
+    if (result == null) {
+      String displayName = localize(counterName, counterName);
+      result = new Counter(counterName, displayName);
+      counters.put(counterName, result);
+    }
+    return result;
+  }
+  
+  public synchronized Iterator<Counter> iterator() {
+    return counters.values().iterator();
+  }
+
+  public synchronized void write(DataOutput out) throws IOException {
+    Text.writeString(out, displayName);
+    WritableUtils.writeVInt(out, counters.size());
+    for(Counter counter: counters.values()) {
+      counter.write(out);
+    }
+  }
+  
+  public synchronized void readFields(DataInput in) throws IOException {
+    displayName = Text.readString(in);
+    counters.clear();
+    int size = WritableUtils.readVInt(in);
+    for(int i=0; i < size; i++) {
+      Counter counter = new Counter();
+      counter.readFields(in);
+      counters.put(counter.getName(), counter);
+    }
+  }
+
+  /**
+   * Looks up key in the ResourceBundle and returns the corresponding value.
+   * If the bundle or the key doesn't exist, returns the default value.
+   */
+  private String localize(String key, String defaultValue) {
+    String result = defaultValue;
+    if (bundle != null) {
+      try {
+        result = bundle.getString(key);
+      }
+      catch (MissingResourceException mre) {
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Returns the number of counters in this group.
+   */
+  public synchronized int size() {
+    return counters.size();
+  }
+
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof CounterGroup) {
+      Iterator<Counter> right = ((CounterGroup) genericRight).counters.
+                                       values().iterator();
+      Iterator<Counter> left = counters.values().iterator();
+      while (left.hasNext()) {
+        if (!right.hasNext() || !left.next().equals(right.next())) {
+          return false;
+        }
+      }
+      return !right.hasNext();
+    }
+    return false;
+  }
+
+  public synchronized int hashCode() {
+    return counters.hashCode();
+  }
+  
+  public synchronized void incrAllCounters(CounterGroup rightGroup) {
+    for(Counter right: rightGroup.counters.values()) {
+      Counter left = findCounter(right.getName(), right.getDisplayName());
+      left.increment(right.getValue());
+    }
+  }
 }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counters.java?rev=726850&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counters.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counters.java Mon Dec 15 14:21:32 2008
@@ -0,0 +1,184 @@
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class Counters implements Writable,Iterable<CounterGroup> {
+  /**
+   * A cache from enum values to the associated counter. Dramatically speeds up
+   * typical usage.
+   */
+  private Map<Enum<?>, Counter> cache = new IdentityHashMap<Enum<?>, Counter>();
+
+  private TreeMap<String, CounterGroup> groups = 
+      new TreeMap<String, CounterGroup>();
+  
+  public Counters() {
+  }
+  
+  Counters(org.apache.hadoop.mapred.Counters counters) {
+    for(org.apache.hadoop.mapred.Counters.Group group: counters) {
+      String name = group.getName();
+      CounterGroup newGroup = new CounterGroup(name, group.getDisplayName());
+      groups.put(name, newGroup);
+      for(Counter counter: group) {
+        newGroup.addCounter(counter);
+      }
+    }
+  }
+
+  public Counter findCounter(String groupName, String counterName) {
+    CounterGroup grp = groups.get(groupName);
+    if (grp == null) {
+      grp = new CounterGroup(groupName);
+      groups.put(groupName, grp);
+    }
+    return grp.findCounter(counterName);
+  }
+
+  /**
+   * Find the counter for the given enum. The same enum will always return the
+   * same counter.
+   * @param key the counter key
+   * @return the matching counter object
+   */
+  public synchronized Counter findCounter(Enum<?> key) {
+    Counter counter = cache.get(key);
+    if (counter == null) {
+      counter = findCounter(key.getDeclaringClass().getName(), key.toString());
+      cache.put(key, counter);
+    }
+    return counter;    
+  }
+
+  /**
+   * Returns the names of all counter classes.
+   * @return Set of counter names.
+   */
+  public synchronized Collection<String> getGroupNames() {
+    return groups.keySet();
+  }
+
+  @Override
+  public Iterator<CounterGroup> iterator() {
+    return groups.values().iterator();
+  }
+
+  /**
+   * Returns the named counter group, or an empty group if there is none
+   * with the specified name.
+   */
+  public synchronized CounterGroup getGroup(String groupName) {
+    return groups.get(groupName);
+  }
+
+  /**
+   * Returns the total number of counters, by summing the number of counters
+   * in each group.
+   */
+  public synchronized  int countCounters() {
+    int result = 0;
+    for (CounterGroup group : this) {
+      result += group.size();
+    }
+    return result;
+  }
+
+  /**
+   * Write the set of groups.
+   * The external format is:
+   *     #groups (groupName group)*
+   *
+   * i.e. the number of groups followed by 0 or more groups, where each 
+   * group is of the form:
+   *
+   *     groupDisplayName #counters (false | true counter)*
+   *
+   * where each counter is of the form:
+   *
+   *     name (false | true displayName) value
+   */
+  @Override
+  public synchronized void write(DataOutput out) throws IOException {
+    out.writeInt(groups.size());
+    for (org.apache.hadoop.mapreduce.CounterGroup group: groups.values()) {
+      Text.writeString(out, group.getName());
+      group.write(out);
+    }
+  }
+  
+  /**
+   * Read a set of groups.
+   */
+  @Override
+  public synchronized void readFields(DataInput in) throws IOException {
+    int numClasses = in.readInt();
+    groups.clear();
+    while (numClasses-- > 0) {
+      String groupName = Text.readString(in);
+      CounterGroup group = new CounterGroup(groupName);
+      group.readFields(in);
+      groups.put(groupName, group);
+    }
+  }
+
+  /**
+   * Return textual representation of the counter values.
+   */
+  public synchronized String toString() {
+    StringBuilder sb = new StringBuilder("Counters: " + countCounters());
+    for (CounterGroup group: this) {
+      sb.append("\n\t" + group.getDisplayName());
+      for (Counter counter: group) {
+        sb.append("\n\t\t" + counter.getDisplayName() + "=" + 
+                  counter.getValue());
+      }
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Increments multiple counters by their amounts in another Counters 
+   * instance.
+   * @param other the other Counters instance
+   */
+  public synchronized void incrAllCounters(Counters other) {
+    for(Map.Entry<String, CounterGroup> rightEntry: other.groups.entrySet()) {
+      CounterGroup left = groups.get(rightEntry.getKey());
+      CounterGroup right = rightEntry.getValue();
+      if (left == null) {
+        left = new CounterGroup(right.getName(), right.getDisplayName());
+        groups.put(rightEntry.getKey(), left);
+      }
+      left.incrAllCounters(right);
+    }
+  }
+
+  public boolean equals(Object genericRight) {
+    if (genericRight instanceof Counters) {
+      Iterator<CounterGroup> right = ((Counters) genericRight).groups.
+                                       values().iterator();
+      Iterator<CounterGroup> left = groups.values().iterator();
+      while (left.hasNext()) {
+        if (!right.hasNext() || !left.next().equals(right.next())) {
+          return false;
+        }
+      }
+      return !right.hasNext();
+    }
+    return false;
+  }
+  
+  public int hashCode() {
+    return groups.hashCode();
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java Mon Dec 15 14:21:32 2008
@@ -34,6 +34,7 @@
  * @see TaskAttemptID
  */
 public abstract class ID implements WritableComparable<ID> {
+  protected static final char SEPARATOR = '_';
   protected int id;
 
   /** constructs an ID object from the given int */
@@ -85,4 +86,5 @@
   public void write(DataOutput out) throws IOException {
     out.writeInt(id);
   }
+  
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java Mon Dec 15 14:21:32 2008
@@ -21,10 +21,12 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 
 /**
@@ -34,27 +36,40 @@
  * IllegalStateException.
  */
 public class Job extends JobContext {  
-  
-  public Job() {
+  public static enum JobState {DEFINE, RUNNING};
+  private JobState state = JobState.DEFINE;
+  private JobClient jobTracker;
+  private RunningJob info;
+
+  public Job() throws IOException {
     this(new Configuration());
   }
 
-  public Job(Configuration conf) {
+  public Job(Configuration conf) throws IOException {
     super(conf, null);
+    jobTracker = new JobClient((JobConf) getConfiguration());
   }
 
-  public Job(Configuration conf, String jobName) {
+  public Job(Configuration conf, String jobName) throws IOException {
     this(conf);
     setJobName(jobName);
   }
 
+  private void ensureState(JobState state) throws IllegalStateException {
+    if (state != this.state) {
+      throw new IllegalStateException("Job in state "+ this.state + 
+                                      " instead of " + state);
+    }
+  }
+
   /**
    * Set the number of reduce tasks for the job.
    * @param tasks the number of reduce tasks
    * @throws IllegalStateException if the job is submitted
    */
   public void setNumReduceTasks(int tasks) throws IllegalStateException {
-    conf.setInt(NUM_REDUCES_ATTR, tasks);
+    ensureState(JobState.DEFINE);
+    conf.setNumReduceTasks(tasks);
   }
 
   /**
@@ -64,8 +79,8 @@
    * @throws IllegalStateException if the job is submitted
    */
   public void setWorkingDirectory(Path dir) throws IOException {
-    dir = dir.makeQualified(FileSystem.get(conf));
-    conf.set(WORKING_DIR_ATTR, dir.toString());
+    ensureState(JobState.DEFINE);
+    conf.setWorkingDirectory(dir);
   }
 
   /**
@@ -75,6 +90,7 @@
    */
   public void setInputFormatClass(Class<? extends InputFormat<?,?>> cls
                                   ) throws IllegalStateException {
+    ensureState(JobState.DEFINE);
     conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
   }
 
@@ -85,6 +101,7 @@
    */
   public void setOutputFormatClass(Class<? extends OutputFormat<?,?>> cls
                                    ) throws IllegalStateException {
+    ensureState(JobState.DEFINE);
     conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
   }
 
@@ -95,16 +112,34 @@
    */
   public void setMapperClass(Class<? extends Mapper<?,?,?,?>> cls
                              ) throws IllegalStateException {
+    ensureState(JobState.DEFINE);
     conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
   }
 
   /**
+   * Set the Jar by finding where a given class came from.
+   * @param cls the example class
+   */
+  public void setJarByClass(Class<?> cls) {
+    conf.setJarByClass(cls);
+  }
+  
+  /**
+   * Get the pathname of the job's jar.
+   * @return the pathname
+   */
+  public String getJar() {
+    return conf.getJar();
+  }
+
+  /**
    * Set the combiner class for the job.
    * @param cls the combiner to use
    * @throws IllegalStateException if the job is submitted
    */
   public void setCombinerClass(Class<? extends Reducer<?,?,?,?>> cls
                                ) throws IllegalStateException {
+    ensureState(JobState.DEFINE);
     conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
   }
 
@@ -115,6 +150,7 @@
    */
   public void setReducerClass(Class<? extends Reducer<?,?,?,?>> cls
                               ) throws IllegalStateException {
+    ensureState(JobState.DEFINE);
     conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
   }
 
@@ -125,6 +161,7 @@
    */
   public void setPartitionerClass(Class<? extends Partitioner<?,?>> cls
                                   ) throws IllegalStateException {
+    ensureState(JobState.DEFINE);
     conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
   }
 
@@ -138,7 +175,8 @@
    */
   public void setMapOutputKeyClass(Class<?> theClass
                                    ) throws IllegalStateException {
-    conf.setClass(MAP_OUTPUT_KEY_CLASS_ATTR, theClass, Object.class);
+    ensureState(JobState.DEFINE);
+    conf.setMapOutputKeyClass(theClass);
   }
 
   /**
@@ -151,7 +189,8 @@
    */
   public void setMapOutputValueClass(Class<?> theClass
                                      ) throws IllegalStateException {
-    conf.setClass(MAP_OUTPUT_VALUE_CLASS_ATTR, theClass, Object.class);
+    ensureState(JobState.DEFINE);
+    conf.setMapOutputValueClass(theClass);
   }
 
   /**
@@ -162,7 +201,8 @@
    */
   public void setOutputKeyClass(Class<?> theClass
                                 ) throws IllegalStateException {
-    conf.setClass(OUTPUT_KEY_CLASS_ATTR, theClass, Object.class);
+    ensureState(JobState.DEFINE);
+    conf.setOutputKeyClass(theClass);
   }
 
   /**
@@ -173,7 +213,8 @@
    */
   public void setOutputValueClass(Class<?> theClass
                                   ) throws IllegalStateException {
-    conf.setClass(OUTPUT_VALUE_CLASS_ATTR, theClass, Object.class);
+    ensureState(JobState.DEFINE);
+    conf.setOutputValueClass(theClass);
   }
 
   /**
@@ -184,19 +225,22 @@
    */
   public void setSortComparatorClass(Class<? extends RawComparator<?>> cls
                                      ) throws IllegalStateException {
-    conf.setClass(SORT_COMPARATOR_ATTR, cls, RawComparator.class);
+    ensureState(JobState.DEFINE);
+    conf.setOutputKeyComparatorClass(cls);
   }
 
   /**
    * Define the comparator that controls which keys are grouped together
    * for a single call to 
-   * {@link Reducer#reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)}
+   * {@link Reducer#reduce(Object, Iterable, 
+   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
    * @param cls the raw comparator to use
    * @throws IllegalStateException if the job is submitted
    */
   public void setGroupingComparatorClass(Class<? extends RawComparator<?>> cls
                                          ) throws IllegalStateException {
-    conf.setClass(GROUPING_COMPARATOR_ATTR, cls, RawComparator.class);
+    ensureState(JobState.DEFINE);
+    conf.setOutputValueGroupingComparator(cls);
   }
 
   /**
@@ -206,7 +250,8 @@
    * @throws IllegalStateException if the job is submitted
    */
   public void setJobName(String name) throws IllegalStateException {
-    conf.set(JOB_NAME_ATTR, name);
+    ensureState(JobState.DEFINE);
+    conf.setJobName(name);
   }
 
   /**
@@ -215,8 +260,8 @@
    * @return the URL where some job progress information will be displayed.
    */
   public String getTrackingURL() {
-    // TODO
-    return null;
+    ensureState(JobState.RUNNING);
+    return info.getTrackingURL();
   }
 
   /**
@@ -227,8 +272,8 @@
    * @throws IOException
    */
   public float mapProgress() throws IOException {
-    // TODO
-    return 0.0f;
+    ensureState(JobState.RUNNING);
+    return info.mapProgress();
   }
 
   /**
@@ -239,8 +284,8 @@
    * @throws IOException
    */
   public float reduceProgress() throws IOException {
-    // TODO
-    return 0.0f;
+    ensureState(JobState.RUNNING);
+    return info.reduceProgress();
   }
 
   /**
@@ -251,8 +296,8 @@
    * @throws IOException
    */
   public boolean isComplete() throws IOException {
-    // TODO
-    return false;
+    ensureState(JobState.RUNNING);
+    return info.isComplete();
   }
 
   /**
@@ -262,8 +307,8 @@
    * @throws IOException
    */
   public boolean isSuccessful() throws IOException {
-    // TODO
-    return false;
+    ensureState(JobState.RUNNING);
+    return info.isSuccessful();
   }
 
   /**
@@ -273,7 +318,8 @@
    * @throws IOException
    */
   public void killJob() throws IOException {
-    // TODO
+    ensureState(JobState.RUNNING);
+    info.killJob();
   }
     
   /**
@@ -285,8 +331,8 @@
    */
   public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom
                                                        ) throws IOException {
-    // TODO
-    return null;
+    ensureState(JobState.RUNNING);
+    return info.getTaskCompletionEvents(startFrom);
   }
   
   /**
@@ -296,7 +342,9 @@
    * @throws IOException
    */
   public void killTask(TaskAttemptID taskId) throws IOException {
-    // TODO
+    ensureState(JobState.RUNNING);
+    info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
+                  false);
   }
 
   /**
@@ -306,7 +354,9 @@
    * @throws IOException
    */
   public void failTask(TaskAttemptID taskId) throws IOException {
-    // TODO
+    ensureState(JobState.RUNNING);
+    info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
+                  true);
   }
 
   /**
@@ -316,17 +366,77 @@
    * @throws IOException
    */
   public Iterable<CounterGroup> getCounters() throws IOException {
-    // TODO
-    return null;
+    ensureState(JobState.RUNNING);
+    return new Counters(info.getCounters());
+  }
+
+  private void ensureNotSet(String attr, String msg) throws IOException {
+    if (conf.get(attr) != null) {
+      throw new IOException(attr + " is incompatible with " + msg + " mode.");
+    }    
+  }
+
+  /**
+   * Default to the new APIs unless they are explicitly set or the old mapper or
+   * reduce attributes are used.
+   * @throws IOException if the configuration is inconsistant
+   */
+  private void setUseNewAPI() throws IOException {
+    int numReduces = conf.getNumReduceTasks();
+    String oldMapperClass = "mapred.mapper.class";
+    String oldReduceClass = "mapred.reducer.class";
+    String oldCombineClass = "mapred.combiner.class";
+    conf.setBooleanIfUnset("mapred.mapper.new-api",
+                           conf.get(oldMapperClass) == null);
+    if (conf.getUseNewMapper()) {
+      String mode = "new map API";
+      ensureNotSet("mapred.input.format.class", mode);
+      ensureNotSet(oldMapperClass, mode);
+      if (numReduces != 0) {
+        ensureNotSet(oldCombineClass, mode);
+        ensureNotSet("mapred.partitioner.class", mode);
+       } else {
+        ensureNotSet("mapred.output.format.class", mode);
+      }      
+    } else {
+      String mode = "map compatability";
+      ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode);
+      ensureNotSet(JobContext.MAP_CLASS_ATTR, mode);
+      if (numReduces != 0) {
+        ensureNotSet(JobContext.COMBINE_CLASS_ATTR, mode);
+        ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode);
+       } else {
+        ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
+      }
+    }
+    if (numReduces != 0) {
+      conf.setBooleanIfUnset("mapred.reducer.new-api",
+                             conf.get(oldReduceClass) == null);
+      if (conf.getUseNewReducer()) {
+        String mode = "new reduce API";
+        ensureNotSet("mapred.output.format.class", mode);
+        ensureNotSet(oldReduceClass, mode);   
+        ensureNotSet(oldCombineClass, mode);
+      } else {
+        String mode = "reduce compatability";
+        ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
+        ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode);   
+        ensureNotSet(JobContext.COMBINE_CLASS_ATTR, mode);        
+      }
+    }   
   }
 
   /**
    * Submit the job to the cluster and return immediately.
    * @throws IOException
    */
-  public void submit() throws IOException {
-    // TODO
-  }
+  public void submit() throws IOException, InterruptedException, 
+                              ClassNotFoundException {
+    ensureState(JobState.DEFINE);
+    setUseNewAPI();
+    info = jobTracker.submitJobInternal(conf);
+    state = JobState.RUNNING;
+   }
   
   /**
    * Submit the job to the cluster and wait for it to finish.
@@ -334,8 +444,12 @@
    * @throws IOException thrown if the communication with the 
    *         <code>JobTracker</code> is lost
    */
-  public boolean waitForCompletion() throws IOException {
-    // TODO
-    return false;
+  public boolean waitForCompletion() throws IOException, InterruptedException,
+                                            ClassNotFoundException {
+    if (state == JobState.DEFINE) {
+      submit();
+    }
+    info.waitForCompletion();
+    return isSuccessful();
   }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Mon Dec 15 14:21:32 2008
@@ -21,15 +21,12 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 
 /**
  * A read-only view of the job that is provided to the tasks while they
@@ -38,35 +35,21 @@
 public class JobContext {
   // Put all of the attribute names in here so that Job and JobContext are
   // consistent.
-  protected static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.map.class";
+  protected static final String INPUT_FORMAT_CLASS_ATTR = 
+    "mapreduce.inputformat.class";
   protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";
   protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
   protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
   protected static final String OUTPUT_FORMAT_CLASS_ATTR = 
     "mapreduce.outputformat.class";
-  protected static final String OUTPUT_KEY_CLASS_ATTR = 
-    "mapreduce.out.key.class";
-  protected static final String OUTPUT_VALUE_CLASS_ATTR = 
-    "mapreduce.out.value.class";
-  protected static final String MAP_OUTPUT_KEY_CLASS_ATTR = 
-    "mapreduce.map.out.key.class";
-  protected static final String MAP_OUTPUT_VALUE_CLASS_ATTR = 
-    "mapreduce.map.out.value.class";
-  protected static final String NUM_REDUCES_ATTR = "mapreduce.reduce.tasks";
-  protected static final String WORKING_DIR_ATTR = "mapreduce.work.dir";
-  protected static final String JOB_NAME_ATTR = "mapreduce.job.name";
-  protected static final String SORT_COMPARATOR_ATTR = 
-    "mapreduce.sort.comparator";
-  protected static final String GROUPING_COMPARATOR_ATTR = 
-    "mapreduce.grouping.comparator";
   protected static final String PARTITIONER_CLASS_ATTR = 
     "mapreduce.partitioner.class";
 
-  protected final Configuration conf;
+  protected final org.apache.hadoop.mapred.JobConf conf;
   private final JobID jobId;
   
   public JobContext(Configuration conf, JobID jobId) {
-    this.conf = conf;
+    this.conf = new org.apache.hadoop.mapred.JobConf(conf);
     this.jobId = jobId;
   }
 
@@ -92,7 +75,7 @@
    * @return the number of reduce tasks for this job.
    */
   public int getNumReduceTasks() {
-    return conf.getInt(NUM_REDUCES_ATTR, 1);
+    return conf.getNumReduceTasks();
   }
   
   /**
@@ -101,14 +84,7 @@
    * @return the directory name.
    */
   public Path getWorkingDirectory() throws IOException {
-    String name = conf.get(WORKING_DIR_ATTR);
-    if (name != null) {
-      return new Path(name);
-    } else {
-      Path dir = FileSystem.get(conf).getWorkingDirectory();
-      conf.set(WORKING_DIR_ATTR, dir.toString());
-      return dir;
-    }
+    return conf.getWorkingDirectory();
   }
 
   /**
@@ -116,8 +92,7 @@
    * @return the key class for the job output data.
    */
   public Class<?> getOutputKeyClass() {
-    return conf.getClass(OUTPUT_KEY_CLASS_ATTR,
-                         LongWritable.class, Object.class);
+    return conf.getOutputKeyClass();
   }
   
   /**
@@ -125,7 +100,7 @@
    * @return the value class for job outputs.
    */
   public Class<?> getOutputValueClass() {
-    return conf.getClass(OUTPUT_VALUE_CLASS_ATTR, Text.class, Object.class);
+    return conf.getOutputValueClass();
   }
 
   /**
@@ -135,12 +110,7 @@
    * @return the map output key class.
    */
   public Class<?> getMapOutputKeyClass() {
-    Class<?> retv = conf.getClass(MAP_OUTPUT_KEY_CLASS_ATTR, null, 
-                                  Object.class);
-    if (retv == null) {
-      retv = getOutputKeyClass();
-    }
-    return retv;
+    return conf.getMapOutputKeyClass();
   }
 
   /**
@@ -151,12 +121,7 @@
    * @return the map output value class.
    */
   public Class<?> getMapOutputValueClass() {
-    Class<?> retv = conf.getClass(MAP_OUTPUT_VALUE_CLASS_ATTR, null,
-        Object.class);
-    if (retv == null) {
-      retv = getOutputValueClass();
-    }
-    return retv;
+    return conf.getMapOutputValueClass();
   }
 
   /**
@@ -166,7 +131,7 @@
    * @return the job's name, defaulting to "".
    */
   public String getJobName() {
-    return conf.get(JOB_NAME_ATTR, "");
+    return conf.getJobName();
   }
 
   /**
@@ -178,7 +143,7 @@
   public Class<? extends InputFormat<?,?>> getInputFormatClass() 
      throws ClassNotFoundException {
     return (Class<? extends InputFormat<?,?>>) 
-      conf.getClass(INPUT_FORMAT_CLASS_ATTR, InputFormat.class);
+      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
   }
 
   /**
@@ -202,7 +167,7 @@
   public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
      throws ClassNotFoundException {
     return (Class<? extends Reducer<?,?,?,?>>) 
-      conf.getClass(COMBINE_CLASS_ATTR, Reducer.class);
+      conf.getClass(COMBINE_CLASS_ATTR, null);
   }
 
   /**
@@ -226,7 +191,7 @@
   public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
      throws ClassNotFoundException {
     return (Class<? extends OutputFormat<?,?>>) 
-      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, OutputFormat.class);
+      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
   }
 
   /**
@@ -238,7 +203,7 @@
   public Class<? extends Partitioner<?,?>> getPartitionerClass() 
      throws ClassNotFoundException {
     return (Class<? extends Partitioner<?,?>>) 
-      conf.getClass(PARTITIONER_CLASS_ATTR, Partitioner.class);
+      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
   }
 
   /**
@@ -246,14 +211,16 @@
    * 
    * @return the {@link RawComparator} comparator used to compare keys.
    */
-  @SuppressWarnings("unchecked")
   public RawComparator<?> getSortComparator() {
-    Class<?> theClass = conf.getClass(SORT_COMPARATOR_ATTR, null,
-                                   RawComparator.class);
-    if (theClass != null)
-      return (RawComparator<?>) ReflectionUtils.newInstance(theClass, conf);
-    return WritableComparator.get(
-        (Class<? extends WritableComparable>)getMapOutputKeyClass());
+    return conf.getOutputKeyComparator();
+  }
+
+  /**
+   * Get the pathname of the job's jar.
+   * @return the pathname
+   */
+  public String getJar() {
+    return conf.getJar();
   }
 
   /** 
@@ -264,12 +231,6 @@
    * @see Job#setGroupingComparatorClass(Class) for details.  
    */
   public RawComparator<?> getGroupingComparator() {
-    Class<?> theClass = conf.getClass(GROUPING_COMPARATOR_ATTR, null,
-                                   RawComparator.class);
-    if (theClass == null) {
-      return getSortComparator();
-    }
-    return (RawComparator<?>) ReflectionUtils.newInstance(theClass, conf);
+    return conf.getOutputValueGroupingComparator();
   }
-
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java Mon Dec 15 14:21:32 2008
@@ -24,7 +24,6 @@
 import java.text.NumberFormat;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobTracker;
 
 /**
  * JobID represents the immutable and unique identifier for 
@@ -42,15 +41,15 @@
  * 
  * @see TaskID
  * @see TaskAttemptID
- * @see JobTracker#getNewJobId()
- * @see JobTracker#getStartTime()
+ * @see org.apache.hadoop.mapred.JobTracker#getNewJobId()
+ * @see org.apache.hadoop.mapred.JobTracker#getStartTime()
  */
-public class JobID extends ID {
-  private static final String JOB = "job";
-  private String jtIdentifier;
-  private static char UNDERSCORE = '_';
+public class JobID extends org.apache.hadoop.mapred.ID 
+                   implements Comparable<ID> {
+  protected static final String JOB = "job";
+  private final Text jtIdentifier;
   
-  private static NumberFormat idFormat = NumberFormat.getInstance();
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
   static {
     idFormat.setGroupingUsed(false);
     idFormat.setMinimumIntegerDigits(4);
@@ -63,13 +62,15 @@
    */
   public JobID(String jtIdentifier, int id) {
     super(id);
-    this.jtIdentifier = jtIdentifier;
+    this.jtIdentifier = new Text(jtIdentifier);
   }
   
-  private JobID() { }
+  public JobID() { 
+    jtIdentifier = new Text();
+  }
   
   public String getJtIdentifier() {
-    return jtIdentifier;
+    return jtIdentifier.toString();
   }
   
   @Override
@@ -92,42 +93,40 @@
     else return jtComp;
   }
   
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    return builder.append(JOB).append(UNDERSCORE)
-      .append(toStringWOPrefix()).toString();
-  }
-  
-  /** Returns the string representation w/o prefix */
-  StringBuilder toStringWOPrefix() {
-    StringBuilder builder = new StringBuilder();
-    builder.append(jtIdentifier).append(UNDERSCORE)
-    .append(idFormat.format(id)).toString();
+  /**
+   * Add the stuff after the "job" prefix to the given builder. This is useful,
+   * because the sub-ids use this substring at the start of their string.
+   * @param builder the builder to append to
+   * @return the builder that was passed in
+   */
+  public StringBuilder appendTo(StringBuilder builder) {
+    builder.append(SEPARATOR);
+    builder.append(jtIdentifier);
+    builder.append(SEPARATOR);
+    builder.append(idFormat.format(id));
     return builder;
   }
-  
+
   @Override
   public int hashCode() {
-    return toStringWOPrefix().toString().hashCode();
+    return jtIdentifier.hashCode() + id;
   }
-  
+
+  @Override
+  public String toString() {
+    return appendTo(new StringBuilder(JOB)).toString();
+  }
+
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    this.jtIdentifier = Text.readString(in);
+    this.jtIdentifier.readFields(in);
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    Text.writeString(out, jtIdentifier);
-  }
-  
-  public static JobID read(DataInput in) throws IOException {
-    JobID jobId = new JobID();
-    jobId.readFields(in);
-    return jobId;
+    jtIdentifier.write(out);
   }
   
   /** Construct a JobId object from given string 
@@ -141,7 +140,8 @@
       String[] parts = str.split("_");
       if(parts.length == 3) {
         if(parts[0].equals(JOB)) {
-          return new JobID(parts[1], Integer.parseInt(parts[2]));
+          return new org.apache.hadoop.mapred.JobID(parts[1], 
+                                                    Integer.parseInt(parts[2]));
         }
       }
     }catch (Exception ex) {//fall below
@@ -150,32 +150,4 @@
         + " is not properly formed");
   }
   
-  /** 
-   * Returns a regex pattern which matches task IDs. Arguments can 
-   * be given null, in which case that part of the regex will be generic.  
-   * For example to obtain a regex matching <i>any job</i> 
-   * run on the jobtracker started at <i>200707121733</i>, we would use :
-   * <pre> 
-   * JobID.getTaskIDsPattern("200707121733", null);
-   * </pre>
-   * which will return :
-   * <pre> "job_200707121733_[0-9]*" </pre> 
-   * @param jtIdentifier jobTracker identifier, or null
-   * @param jobId job number, or null
-   * @return a regex pattern matching JobIDs
-   */
-  public static String getJobIDsPattern(String jtIdentifier, Integer jobId) {
-    StringBuilder builder = new StringBuilder(JOB).append(UNDERSCORE);
-    builder.append(getJobIDsPatternWOPrefix(jtIdentifier, jobId));
-    return builder.toString();
-  }
-  
-  static StringBuilder getJobIDsPatternWOPrefix(String jtIdentifier
-      , Integer jobId) {
-    StringBuilder builder = new StringBuilder()
-      .append(jtIdentifier != null ? jtIdentifier : "[^_]*").append(UNDERSCORE)
-      .append(jobId != null ? idFormat.format(jobId) : "[0-9]*");
-    return builder;
-  }
-  
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java Mon Dec 15 14:21:32 2008
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -27,17 +29,43 @@
  * @param <KEYOUT> the key output type from the Mapper
  * @param <VALUEOUT> the value output type from the Mapper
  */
-public abstract class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
+  private RecordReader<KEYIN,VALUEIN> reader;
+  private InputSplit split;
 
-  public MapContext(Configuration conf, TaskAttemptID taskid) {
-    super(conf, taskid);
+  public MapContext(Configuration conf, TaskAttemptID taskid,
+                    RecordReader<KEYIN,VALUEIN> reader,
+                    RecordWriter<KEYOUT,VALUEOUT> writer,
+                    OutputCommitter committer,
+                    StatusReporter reporter,
+                    InputSplit split) {
+    super(conf, taskid, writer, committer, reporter);
+    this.reader = reader;
+    this.split = split;
   }
 
   /**
    * Get the input split for this map.
    */
-  public abstract InputSplit getInputSplit();
+  public InputSplit getInputSplit() {
+    return split;
+  }
+
+  @Override
+  public KEYIN getCurrentKey() throws IOException, InterruptedException {
+    return reader.getCurrentKey();
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+    return reader.getCurrentValue();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return reader.nextKeyValue();
+  }
 
 }
      
\ No newline at end of file

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java Mon Dec 15 14:21:32 2008
@@ -94,10 +94,15 @@
  */
 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
-  public abstract class Context 
+  public class Context 
     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
-    public Context(Configuration conf, TaskAttemptID taskid) {
-      super(conf, taskid);
+    public Context(Configuration conf, TaskAttemptID taskid,
+                   RecordReader<KEYIN,VALUEIN> reader,
+                   RecordWriter<KEYOUT,VALUEOUT> writer,
+                   OutputCommitter committer,
+                   StatusReporter reporter,
+                   InputSplit split) throws IOException, InterruptedException {
+      super(conf, taskid, reader, writer, committer, reporter, split);
     }
   }
   
@@ -116,7 +121,7 @@
   @SuppressWarnings("unchecked")
   protected void map(KEYIN key, VALUEIN value, 
                      Context context) throws IOException, InterruptedException {
-    context.collect((KEYOUT) key, (VALUEOUT) value);
+    context.write((KEYOUT) key, (VALUEOUT) value);
   }
 
   /**
@@ -135,12 +140,8 @@
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    KEYIN key = context.nextKey(null);
-    VALUEIN value = null;
-    while (key != null) {
-      value = context.nextValue(value);
-      map(key, value, context);
-      key = context.nextKey(key);
+    while (context.nextKeyValue()) {
+      map(context.getCurrentKey(), context.getCurrentValue(), context);
     }
     cleanup(context);
   }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=726850&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java Mon Dec 15 14:21:32 2008
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+/**
+ * <code>OutputCommitter</code> describes the commit of task output for a 
+ * Map-Reduce job.
+ *
+ * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 
+ * the job to:<p>
+ * <ol>
+ *   <li>
+ *   Setup the job during initialization. For example, create the temporary 
+ *   output directory for the job during the initialization of the job.
+ *   </li>
+ *   <li>
+ *   Cleanup the job after the job completion. For example, remove the
+ *   temporary output directory after the job completion. 
+ *   </li>
+ *   <li>
+ *   Setup the task temporary output.
+ *   </li> 
+ *   <li>
+ *   Check whether a task needs a commit. This is to avoid the commit
+ *   procedure if a task does not need commit.
+ *   </li>
+ *   <li>
+ *   Commit of the task output.
+ *   </li>  
+ *   <li>
+ *   Discard the task commit.
+ *   </li>
+ * </ol>
+ * 
+ * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
+ * @see JobContext
+ * @see TaskAttemptContext 
+ *
+ */
+public abstract class OutputCommitter {
+  /**
+   * For the framework to setup the job output during initialization
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException if temporary output could not be created
+   */
+  public abstract void setupJob(JobContext jobContext) throws IOException;
+
+  /**
+   * For cleaning up the job's output after job completion
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException
+   */
+  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+
+  /**
+   * Sets up output for the task.
+   * 
+   * @param taskContext Context of the task whose output is being written.
+   * @throws IOException
+   */
+  public abstract void setupTask(TaskAttemptContext taskContext)
+  throws IOException;
+  
+  /**
+   * Check whether task needs a commit
+   * 
+   * @param taskContext
+   * @return true/false
+   * @throws IOException
+   */
+  public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
+  throws IOException;
+
+  /**
+   * To promote the task's temporary output to final output location
+   * 
+   * The task's output is moved to the job's output directory.
+   * 
+   * @param taskContext Context of the task whose output is being written.
+   * @throws IOException if commit is not 
+   */
+  public abstract void commitTask(TaskAttemptContext taskContext)
+  throws IOException;
+  
+  /**
+   * Discard the task output
+   * 
+   * @param taskContext
+   * @throws IOException
+   */
+  public abstract void abortTask(TaskAttemptContext taskContext)
+  throws IOException;
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java Mon Dec 15 14:21:32 2008
@@ -68,5 +68,17 @@
   public abstract void checkOutputSpecs(JobContext context
                                         ) throws IOException, 
                                                  InterruptedException;
+
+  /**
+   * Get the output committer for this output format. This is responsible
+   * for ensuring the output is committed correctly.
+   * @param context the task context
+   * @return an output committer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract 
+  OutputCommitter getOutputCommitter(TaskAttemptContext context
+                                     ) throws IOException, InterruptedException;
 }
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java Mon Dec 15 14:21:32 2008
@@ -44,4 +44,5 @@
    * @return the partition number for the <code>key</code>.
    */
   public abstract int getPartition(KEY key, VALUE value, int numPartitions);
+  
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java Mon Dec 15 14:21:32 2008
@@ -41,24 +41,31 @@
                                   ) throws IOException, InterruptedException;
 
   /**
-   * Read the next key.
-   * @param key the object to be read into, which may be null
-   * @return the object that was read
+   * Read the next key, value pair.
+   * @return true if a key/value pair was read
    * @throws IOException
    * @throws InterruptedException
    */
-  public abstract KEYIN nextKey(KEYIN key
-                                ) throws IOException, InterruptedException;
+  public abstract 
+  boolean nextKeyValue() throws IOException, InterruptedException;
 
   /**
-   * Read the next value. It must be called after {@link #nextKey(Object)}.
-   * @param value the object to read into, which may be null
+   * Get the current key
+   * @return the current key or null if there is no current key
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract
+  KEYIN getCurrentKey() throws IOException, InterruptedException;
+  
+  /**
+   * Get the current value.
    * @return the object that was read
    * @throws IOException
    * @throws InterruptedException
    */
-  public abstract VALUEIN nextValue(VALUEIN value
-                                    ) throws IOException, InterruptedException;
+  public abstract 
+  VALUEIN getCurrentValue() throws IOException, InterruptedException;
   
   /**
    * The current progress of the record reader through its data.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java Mon Dec 15 14:21:32 2008
@@ -31,7 +31,7 @@
  * 
  * @see OutputFormat
  */
-public interface RecordWriter<K, V> {
+public abstract class RecordWriter<K, V> {
   /** 
    * Writes a key/value pair.
    *
@@ -39,7 +39,8 @@
    * @param value the value to write.
    * @throws IOException
    */      
-  void write(K key, V value) throws IOException, InterruptedException;
+  public abstract void write(K key, V value
+                             ) throws IOException, InterruptedException;
 
   /** 
    * Close this <code>RecordWriter</code> to future operations.
@@ -47,6 +48,6 @@
    * @param context the context of the task
    * @throws IOException
    */ 
-  void close(TaskAttemptContext context
-             ) throws IOException, InterruptedException;
+  public abstract void close(TaskAttemptContext context
+                             ) throws IOException, InterruptedException;
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java Mon Dec 15 14:21:32 2008
@@ -19,8 +19,17 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.util.Progressable;
 
 /**
  * The context passed to the {@link Reducer}.
@@ -29,19 +38,151 @@
  * @param <KEYOUT> the class of the output keys
  * @param <VALUEOUT> the class of the output values
  */
-public abstract class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
+  private RawKeyValueIterator input;
+  private RawComparator<KEYIN> comparator;
+  private KEYIN key;                                  // current key
+  private VALUEIN value;                              // current value
+  private boolean firstValue = false;                 // first value in key
+  private boolean nextKeyIsSame = false;              // more w/ this key
+  private boolean hasMore;                            // more in file
+  protected Progressable reporter;
+  private Deserializer<KEYIN> keyDeserializer;
+  private Deserializer<VALUEIN> valueDeserializer;
+  private DataInputBuffer buffer = new DataInputBuffer();
+  private BytesWritable currentRawKey = new BytesWritable();
+  private ValueIterable iterable = new ValueIterable();
+
+  public ReduceContext(Configuration conf, TaskAttemptID taskid,
+                       RawKeyValueIterator input, 
+                       RecordWriter<KEYOUT,VALUEOUT> output,
+                       OutputCommitter committer,
+                       StatusReporter reporter,
+                       RawComparator<KEYIN> comparator,
+                       Class<KEYIN> keyClass,
+                       Class<VALUEIN> valueClass
+                       ) throws InterruptedException, IOException{
+    super(conf, taskid, output, committer, reporter);
+    this.input = input;
+    this.comparator = comparator;
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(buffer);
+    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
+    this.valueDeserializer.open(buffer);
+    hasMore = input.next();
+  }
 
-  public ReduceContext(Configuration conf, TaskAttemptID taskid) {
-    super(conf, taskid);
+  /** Start processing next unique key. */
+  public boolean nextKey() throws IOException,InterruptedException {
+    while (hasMore && nextKeyIsSame) {
+      nextKeyValue();
+    }
+    if (hasMore) {
+      return nextKeyValue();
+    } else {
+      return false;
+    }
   }
 
   /**
+   * Advance to the next key/value pair.
+   */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!hasMore) {
+      key = null;
+      value = null;
+      return false;
+    }
+    firstValue = !nextKeyIsSame;
+    DataInputBuffer next = input.getKey();
+    currentRawKey.set(next.getData(), next.getPosition(), 
+                      next.getLength() - next.getPosition());
+    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
+    key = keyDeserializer.deserialize(key);
+    next = input.getValue();
+    buffer.reset(next.getData(), next.getPosition(), next.getLength());
+    value = valueDeserializer.deserialize(value);
+    hasMore = input.next();
+    if (hasMore) {
+      next = input.getKey();
+      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
+                                         currentRawKey.getLength(),
+                                         next.getData(),
+                                         next.getPosition(),
+                                         next.getLength() - next.getPosition()
+                                         ) == 0;
+    } else {
+      nextKeyIsSame = false;
+    }
+    return true;
+  }
+
+  public KEYIN getCurrentKey() {
+    return key;
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() {
+    return value;
+  }
+
+  protected class ValueIterator implements Iterator<VALUEIN> {
+
+    @Override
+    public boolean hasNext() {
+      return firstValue || nextKeyIsSame;
+    }
+
+    @Override
+    public VALUEIN next() {
+      // if this is the first record, we don't need to advance
+      if (firstValue) {
+        firstValue = false;
+        return value;
+      }
+      // if this isn't the first record and the next key is different, they
+      // can't advance it here.
+      if (!nextKeyIsSame) {
+        throw new NoSuchElementException("iterate past last value");
+      }
+      // otherwise, go to the next key/value pair
+      try {
+        nextKeyValue();
+        return value;
+      } catch (IOException ie) {
+        throw new RuntimeException("next value iterator failed", ie);
+      } catch (InterruptedException ie) {
+        // this is bad, but we can't modify the exception list of java.util
+        throw new RuntimeException("next value iterator interrupted", ie);        
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove not implemented");
+    }
+    
+  }
+
+  protected class ValueIterable implements Iterable<VALUEIN> {
+    private ValueIterator iterator = new ValueIterator();
+    @Override
+    public Iterator<VALUEIN> iterator() {
+      return iterator;
+    } 
+  }
+  
+  /**
    * Iterate through the values for the current key, reusing the same value 
    * object, which is stored in the context.
-   * @return the series of values associated with the current key
+   * @return the series of values associated with the current key. All of the 
+   * objects returned directly and indirectly from this method are reused.
    */
-  public abstract 
-  Iterable<VALUEIN> getValues() throws IOException, InterruptedException;
-
+  public 
+  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
+    return iterable;
+  }
 }
\ No newline at end of file

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java Mon Dec 15 14:21:32 2008
@@ -21,8 +21,8 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
 
 /** 
  * Reduces a set of intermediate values which share a key to a smaller set of
@@ -88,7 +88,7 @@
  *   the sorted inputs.</p>
  *   <p>The output of the reduce task is typically written to a 
  *   {@link RecordWriter} via 
- *   {@link Context#collect(Object, Object)}.</p>
+ *   {@link Context#write(Object, Object)}.</p>
  *   </li>
  * </ol>
  * 
@@ -117,10 +117,19 @@
  */
 public abstract class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 
-  protected abstract class Context 
+  public class Context 
     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
-    public Context(Configuration conf, TaskAttemptID taskid) {
-      super(conf, taskid);
+    public Context(Configuration conf, TaskAttemptID taskid,
+                   RawKeyValueIterator input, 
+                   RecordWriter<KEYOUT,VALUEOUT> output,
+                   OutputCommitter committer,
+                   StatusReporter reporter,
+                   RawComparator<KEYIN> comparator,
+                   Class<KEYIN> keyClass,
+                   Class<VALUEIN> valueClass
+                   ) throws IOException, InterruptedException {
+      super(conf, taskid, input, output, committer, reporter, comparator, 
+            keyClass, valueClass);
     }
   }
 
@@ -141,7 +150,7 @@
   protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                         ) throws IOException, InterruptedException {
     for(VALUEIN value: values) {
-      context.collect((KEYOUT) key, (VALUEOUT) value);
+      context.write((KEYOUT) key, (VALUEOUT) value);
     }
   }
 
@@ -160,10 +169,8 @@
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    KEYIN key = context.nextKey(null);
-    while(key != null) {
-      reduce(key, context.getValues(), context);
-      key = context.nextKey(key);
+    while (context.nextKey()) {
+      reduce(context.getCurrentKey(), context.getValues(), context);
     }
     cleanup(context);
   }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java?rev=726850&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java Mon Dec 15 14:21:32 2008
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+public abstract class StatusReporter {
+  public abstract Counter getCounter(Enum<?> name);
+  public abstract Counter getCounter(String group, String name);
+  public abstract void progress();
+  public abstract void setStatus(String status);
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java Mon Dec 15 14:21:32 2008
@@ -26,12 +26,12 @@
 /**
  * The context for task attempts.
  */
-public abstract class TaskAttemptContext extends JobContext 
-    implements Progressable {
+public class TaskAttemptContext extends JobContext implements Progressable {
   private final TaskAttemptID taskId;
   private String status = "";
   
-  public TaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
+  public TaskAttemptContext(Configuration conf, 
+                            TaskAttemptID taskId) {
     super(conf, taskId.getJobID());
     this.taskId = taskId;
   }
@@ -39,7 +39,7 @@
   /**
    * Get the unique name for this task attempt.
    */
-  public TaskAttemptID getTaskAttemptId() {
+  public TaskAttemptID getTaskAttemptID() {
     return taskId;
   }
 
@@ -59,13 +59,8 @@
   }
 
   /**
-   * Lookup a counter by an enum.
+   * Report progress. The subtypes actually do work in this method.
    */
-  public abstract Counter getCounter(Enum<?> counterName);
-
-  /**
-   * Lookup a counter by group and counter name. The enum-based interface is
-   * preferred.
-   */
-  public abstract Counter getCounter(String groupName, String counterName);
+  public void progress() { 
+  }
 }
\ No newline at end of file

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java Mon Dec 15 14:21:32 2008
@@ -33,7 +33,7 @@
  * An example TaskAttemptID is : 
  * <code>attempt_200707121733_0003_m_000005_0</code> , which represents the
  * zeroth task attempt for the fifth map task in the third job 
- * running at the jobtracker started at <code>200707121733</code>. 
+ * running at the jobtracker started at <code>200707121733</code>.
  * <p>
  * Applications should never construct or parse TaskAttemptID strings
  * , but rather use appropriate constructors or {@link #forName(String)} 
@@ -42,10 +42,9 @@
  * @see JobID
  * @see TaskID
  */
-public class TaskAttemptID extends ID {
-  private static final String ATTEMPT = "attempt";
+public class TaskAttemptID extends org.apache.hadoop.mapred.ID {
+  protected static final String ATTEMPT = "attempt";
   private TaskID taskId;
-  private static final char UNDERSCORE = '_';
   
   /**
    * Constructs a TaskAttemptID object from given {@link TaskID}.  
@@ -68,12 +67,14 @@
    * @param taskId taskId number
    * @param id the task attempt number
    */
-  public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap
-      , int taskId, int id) {
+  public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap, 
+                       int taskId, int id) {
     this(new TaskID(jtIdentifier, jobId, isMap, taskId), id);
   }
   
-  private TaskAttemptID() { }
+  public TaskAttemptID() { 
+    taskId = new TaskID();
+  }
   
   /** Returns the {@link JobID} object that this task attempt belongs to */
   public JobID getJobID() {
@@ -99,38 +100,19 @@
     return this.taskId.equals(that.taskId);
   }
   
-  /**Compare TaskIds by first tipIds, then by task numbers. */
-  @Override
-  public int compareTo(ID o) {
-    TaskAttemptID that = (TaskAttemptID)o;
-    int tipComp = this.taskId.compareTo(that.taskId);
-    if(tipComp == 0) {
-      return this.id - that.id;
-    }
-    else return tipComp;
-  }
-  @Override
-  public String toString() { 
-    StringBuilder builder = new StringBuilder();
-    return builder.append(ATTEMPT).append(UNDERSCORE)
-      .append(toStringWOPrefix()).toString();
-  }
-
-  StringBuilder toStringWOPrefix() {
-    StringBuilder builder = new StringBuilder();
-    return builder.append(taskId.toStringWOPrefix())
-      .append(UNDERSCORE).append(id);
-  }
-  
-  @Override
-  public int hashCode() {
-    return toStringWOPrefix().toString().hashCode();
+  /**
+   * Add the unique string to the StringBuilder
+   * @param builder the builder to append ot
+   * @return the builder that was passed in.
+   */
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return taskId.appendTo(builder).append(SEPARATOR).append(id);
   }
   
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    this.taskId = TaskID.read(in);
+    taskId.readFields(in);
   }
 
   @Override
@@ -138,72 +120,55 @@
     super.write(out);
     taskId.write(out);
   }
-  
-  public static TaskAttemptID read(DataInput in) throws IOException {
-    TaskAttemptID taskId = new TaskAttemptID();
-    taskId.readFields(in);
-    return taskId;
+
+  @Override
+  public int hashCode() {
+    return taskId.hashCode() * 5 + id;
   }
   
+  /**Compare TaskIds by first tipIds, then by task numbers. */
+  @Override
+  public int compareTo(ID o) {
+    TaskAttemptID that = (TaskAttemptID)o;
+    int tipComp = this.taskId.compareTo(that.taskId);
+    if(tipComp == 0) {
+      return this.id - that.id;
+    }
+    else return tipComp;
+  }
+  @Override
+  public String toString() { 
+    return appendTo(new StringBuilder(ATTEMPT)).toString();
+  }
+
   /** Construct a TaskAttemptID object from given string 
    * @return constructed TaskAttemptID object or null if the given String is null
    * @throws IllegalArgumentException if the given string is malformed
    */
-  public static TaskAttemptID forName(String str) throws IllegalArgumentException {
+  public static TaskAttemptID forName(String str
+                                      ) throws IllegalArgumentException {
     if(str == null)
       return null;
     try {
-      String[] parts = str.split("_");
+      String[] parts = str.split(Character.toString(SEPARATOR));
       if(parts.length == 6) {
         if(parts[0].equals(ATTEMPT)) {
           boolean isMap = false;
           if(parts[3].equals("m")) isMap = true;
           else if(parts[3].equals("r")) isMap = false;
           else throw new Exception();
-          return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
-              isMap, Integer.parseInt(parts[4]), Integer.parseInt(parts[5]));
+          return new org.apache.hadoop.mapred.TaskAttemptID
+                       (parts[1],
+                        Integer.parseInt(parts[2]),
+                        isMap, Integer.parseInt(parts[4]), 
+                        Integer.parseInt(parts[5]));
         }
       }
-    }catch (Exception ex) {//fall below
+    } catch (Exception ex) {
+      //fall below
     }
     throw new IllegalArgumentException("TaskAttemptId string : " + str 
         + " is not properly formed");
   }
-  
-  /** 
-   * Returns a regex pattern which matches task attempt IDs. Arguments can 
-   * be given null, in which case that part of the regex will be generic.  
-   * For example to obtain a regex matching <i>all task attempt IDs</i> 
-   * of <i>any jobtracker</i>, in <i>any job</i>, of the <i>first 
-   * map task</i>, we would use :
-   * <pre> 
-   * TaskAttemptID.getTaskAttemptIDsPattern(null, null, true, 1, null);
-   * </pre>
-   * which will return :
-   * <pre> "attempt_[^_]*_[0-9]*_m_000001_[0-9]*" </pre> 
-   * @param jtIdentifier jobTracker identifier, or null
-   * @param jobId job number, or null
-   * @param isMap whether the tip is a map, or null 
-   * @param taskId taskId number, or null
-   * @param attemptId the task attempt number, or null
-   * @return a regex pattern matching TaskAttemptIDs
-   */
-  public static String getTaskAttemptIDsPattern(String jtIdentifier,
-      Integer jobId, Boolean isMap, Integer taskId, Integer attemptId) {
-    StringBuilder builder = new StringBuilder(ATTEMPT).append(UNDERSCORE);
-    builder.append(getTaskAttemptIDsPatternWOPrefix(jtIdentifier, jobId,
-        isMap, taskId, attemptId));
-    return builder.toString();
-  }
-  
-  static StringBuilder getTaskAttemptIDsPatternWOPrefix(String jtIdentifier
-      , Integer jobId, Boolean isMap, Integer taskId, Integer attemptId) {
-    StringBuilder builder = new StringBuilder();
-    builder.append(TaskID.getTaskIDsPatternWOPrefix(jtIdentifier
-        , jobId, isMap, taskId))
-        .append(UNDERSCORE)
-        .append(attemptId != null ? attemptId : "[0-9]*");
-    return builder;
-  }
-  
+
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java Mon Dec 15 14:21:32 2008
@@ -45,10 +45,9 @@
  * @see JobID
  * @see TaskAttemptID
  */
-public class TaskID extends ID {
-  private static final String TASK = "task";
-  private static char UNDERSCORE = '_';  
-  private static NumberFormat idFormat = NumberFormat.getInstance();
+public class TaskID extends org.apache.hadoop.mapred.ID {
+  protected static final String TASK = "task";
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
   static {
     idFormat.setGroupingUsed(false);
     idFormat.setMinimumIntegerDigits(6);
@@ -83,7 +82,9 @@
     this(new JobID(jtIdentifier, jobId), isMap, id);
   }
   
-  private TaskID() { }
+  public TaskID() { 
+    jobId = new JobID();
+  }
   
   /** Returns the {@link JobID} object that this tip belongs to */
   public JobID getJobID() {
@@ -118,31 +119,34 @@
     }
     else return jobComp;
   }
-  
   @Override
   public String toString() { 
-    StringBuilder builder = new StringBuilder();
-    return builder.append(TASK).append(UNDERSCORE)
-      .append(toStringWOPrefix()).toString();
+    return appendTo(new StringBuilder(TASK)).toString();
   }
 
-  StringBuilder toStringWOPrefix() {
-    StringBuilder builder = new StringBuilder();
-    builder.append(jobId.toStringWOPrefix())
-      .append(isMap ? "_m_" : "_r_");
-    return builder.append(idFormat.format(id));
+  /**
+   * Add the unique string to the given builder.
+   * @param builder the builder to append to
+   * @return the builder that was passed in
+   */
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return jobId.appendTo(builder).
+                 append(SEPARATOR).
+                 append(isMap ? 'm' : 'r').
+                 append(SEPARATOR).
+                 append(idFormat.format(id));
   }
   
   @Override
   public int hashCode() {
-    return toStringWOPrefix().toString().hashCode();
+    return jobId.hashCode() * 524287 + id;
   }
   
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    this.jobId = JobID.read(in);
-    this.isMap = in.readBoolean();
+    jobId.readFields(in);
+    isMap = in.readBoolean();
   }
 
   @Override
@@ -152,12 +156,6 @@
     out.writeBoolean(isMap);
   }
   
-  public static TaskID read(DataInput in) throws IOException {
-    TaskID tipId = new TaskID();
-    tipId.readFields(in);
-    return tipId;
-  }
-  
   /** Construct a TaskID object from given string 
    * @return constructed TaskID object or null if the given String is null
    * @throws IllegalArgumentException if the given string is malformed
@@ -174,8 +172,10 @@
           if(parts[3].equals("m")) isMap = true;
           else if(parts[3].equals("r")) isMap = false;
           else throw new Exception();
-          return new TaskID(parts[1], Integer.parseInt(parts[2]),
-              isMap, Integer.parseInt(parts[4]));
+          return new org.apache.hadoop.mapred.TaskID(parts[1], 
+                                                     Integer.parseInt(parts[2]),
+                                                     isMap, 
+                                                     Integer.parseInt(parts[4]));
         }
       }
     }catch (Exception ex) {//fall below
@@ -184,37 +184,4 @@
         + " is not properly formed");
   }
   
-  /** 
-   * Returns a regex pattern which matches task IDs. Arguments can 
-   * be given null, in which case that part of the regex will be generic.  
-   * For example to obtain a regex matching <i>the first map task</i> 
-   * of <i>any jobtracker</i>, of <i>any job</i>, we would use :
-   * <pre> 
-   * TaskID.getTaskIDsPattern(null, null, true, 1);
-   * </pre>
-   * which will return :
-   * <pre> "task_[^_]*_[0-9]*_m_000001*" </pre> 
-   * @param jtIdentifier jobTracker identifier, or null
-   * @param jobId job number, or null
-   * @param isMap whether the tip is a map, or null 
-   * @param taskId taskId number, or null
-   * @return a regex pattern matching TaskIDs
-   */
-  public static String getTaskIDsPattern(String jtIdentifier, Integer jobId
-      , Boolean isMap, Integer taskId) {
-    StringBuilder builder = new StringBuilder(TASK).append(UNDERSCORE)
-      .append(getTaskIDsPatternWOPrefix(jtIdentifier, jobId, isMap, taskId));
-    return builder.toString();
-  }
-  
-  static StringBuilder getTaskIDsPatternWOPrefix(String jtIdentifier
-      , Integer jobId, Boolean isMap, Integer taskId) {
-    StringBuilder builder = new StringBuilder();
-    builder.append(JobID.getJobIDsPatternWOPrefix(jtIdentifier, jobId))
-      .append(UNDERSCORE)
-      .append(isMap != null ? (isMap ? "m" : "r") : "(m|r)").append(UNDERSCORE)
-      .append(taskId != null ? idFormat.format(taskId) : "[0-9]*");
-    return builder;
-  }
-  
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java Mon Dec 15 14:21:32 2008
@@ -21,6 +21,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
 
 /**
  * A context object that allows input and output from the task. It is only
@@ -31,34 +32,73 @@
  * @param <VALUEOUT> the output value type for the task
  */
 public abstract class TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
-    extends TaskAttemptContext {
-
-  public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid) {
+       extends TaskAttemptContext implements Progressable {
+  private RecordWriter<KEYOUT,VALUEOUT> output;
+  private StatusReporter reporter;
+  private OutputCommitter committer;
+
+  public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid,
+                                RecordWriter<KEYOUT,VALUEOUT> output,
+                                OutputCommitter committer,
+                                StatusReporter reporter) {
     super(conf, taskid);
+    this.output = output;
+    this.reporter = reporter;
+    this.committer = committer;
   }
 
   /**
-   * Advance to the next key, returning null if at end.
-   * @param key the key object to read in to, which may be null
-   * @return the key object that was read into
+   * Advance to the next key, value pair, returning null if at end.
+   * @return the key object that was read into, or null if no more
    */
-  public abstract KEYIN nextKey(KEYIN key
-                                ) throws IOException, InterruptedException;
-  
+  public abstract 
+  boolean nextKeyValue() throws IOException, InterruptedException;
+ 
   /**
-   * Read the next value. Must be called after nextKey.
-   * @param value the value object to read in to, which may be null
+   * Get the current key.
+   * @return the current key object or null if there isn't one
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract 
+  KEYIN getCurrentKey() throws IOException, InterruptedException;
+
+  /**
+   * Get the current value.
    * @return the value object that was read into
    * @throws IOException
    * @throws InterruptedException
    */
-  public abstract VALUEIN nextValue(VALUEIN value
-                                    ) throws IOException, InterruptedException;
+  public abstract VALUEIN getCurrentValue() throws IOException, 
+                                                   InterruptedException;
 
   /**
    * Generate an output key/value pair.
    */
-  public abstract void collect(KEYOUT key, VALUEOUT value
-                               ) throws IOException, InterruptedException;
+  public void write(KEYOUT key, VALUEOUT value
+                    ) throws IOException, InterruptedException {
+    output.write(key, value);
+  }
 
+  public Counter getCounter(Enum<?> counterName) {
+    return reporter.getCounter(counterName);
+  }
+
+  public Counter getCounter(String groupName, String counterName) {
+    return reporter.getCounter(groupName, counterName);
+  }
+
+  @Override
+  public void progress() {
+    reporter.progress();
+  }
+
+  @Override
+  public void setStatus(String status) {
+    reporter.setStatus(status);
+  }
+  
+  public OutputCommitter getOutputCommitter() {
+    return committer;
+  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Mon Dec 15 14:21:32 2008
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -108,26 +109,37 @@
 
   /**
    * Set a PathFilter to be applied to the input paths for the map-reduce job.
-   *
+   * @param job the job to modify
    * @param filter the PathFilter class use for filtering the input paths.
    */
-  public static void setInputPathFilter(Configuration conf,
+  public static void setInputPathFilter(Job job,
                                         Class<? extends PathFilter> filter) {
-    conf.setClass("mapred.input.pathFilter.class", filter, PathFilter.class);
+    job.getConfiguration().setClass("mapred.input.pathFilter.class", filter, 
+                                    PathFilter.class);
   }
 
-  public static void setMinInputSplitSize(Configuration conf,
+  /**
+   * Set the minimum input split size
+   * @param job the job to modify
+   * @param size the minimum size
+   */
+  public static void setMinInputSplitSize(Job job,
                                           long size) {
-    conf.setLong("mapred.min.split.size", size);
+    job.getConfiguration().setLong("mapred.min.split.size", size);
   }
 
   public static long getMinSplitSize(Configuration conf) {
     return conf.getLong("mapred.min.split.size", 1L);
   }
 
-  public static void setMaxInputSplitSize(Configuration conf,
+  /**
+   * Set the maximum split size
+   * @param job the job to modify
+   * @param size the maximum split size
+   */
+  public static void setMaxInputSplitSize(Job job,
                                           long size) {
-    conf.setLong("mapred.max.split.size", size);
+    job.getConfiguration().setLong("mapred.max.split.size", size);
   }
 
   public static long getMaxSplitSize(Configuration conf) {
@@ -271,14 +283,14 @@
    * Sets the given comma separated paths as the list of inputs 
    * for the map-reduce job.
    * 
-   * @param conf Configuration of the job
+   * @param job the job
    * @param commaSeparatedPaths Comma separated paths to be set as 
    *        the list of inputs for the map-reduce job.
    */
-  public static void setInputPaths(Configuration conf, 
+  public static void setInputPaths(Job job, 
                                    String commaSeparatedPaths
                                    ) throws IOException {
-    setInputPaths(conf, StringUtils.stringToPath(
+    setInputPaths(job, StringUtils.stringToPath(
                         getPathStrings(commaSeparatedPaths)));
   }
 
@@ -286,15 +298,15 @@
    * Add the given comma separated paths to the list of inputs for
    *  the map-reduce job.
    * 
-   * @param conf The configuration of the job 
+   * @param job The job to modify
    * @param commaSeparatedPaths Comma separated paths to be added to
    *        the list of inputs for the map-reduce job.
    */
-  public static void addInputPaths(Configuration conf, 
+  public static void addInputPaths(Job job, 
                                    String commaSeparatedPaths
                                    ) throws IOException {
     for (String str : getPathStrings(commaSeparatedPaths)) {
-      addInputPath(conf, new Path(str));
+      addInputPath(job, new Path(str));
     }
   }
 
@@ -302,12 +314,13 @@
    * Set the array of {@link Path}s as the list of inputs
    * for the map-reduce job.
    * 
-   * @param conf Configuration of the job. 
+   * @param job The job to modify 
    * @param inputPaths the {@link Path}s of the input directories/files 
    * for the map-reduce job.
    */ 
-  public static void setInputPaths(Configuration conf, 
+  public static void setInputPaths(Job job, 
                                    Path... inputPaths) throws IOException {
+    Configuration conf = job.getConfiguration();
     FileSystem fs = FileSystem.get(conf);
     Path path = inputPaths[0].makeQualified(fs);
     StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
@@ -322,12 +335,13 @@
   /**
    * Add a {@link Path} to the list of inputs for the map-reduce job.
    * 
-   * @param conf The configuration of the job 
+   * @param job The {@link Job} to modify
    * @param path {@link Path} to be added to the list of inputs for 
    *            the map-reduce job.
    */
-  public static void addInputPath(Configuration conf, 
+  public static void addInputPath(Job job, 
                                   Path path) throws IOException {
+    Configuration conf = job.getConfiguration();
     FileSystem fs = FileSystem.get(conf);
     path = path.makeQualified(fs);
     String dirStr = StringUtils.escapeString(path.toString());

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java Mon Dec 15 14:21:32 2008
@@ -27,6 +27,7 @@
  * by one.
  */
 public class InvalidInputException extends IOException {
+  private static final long serialVersionUID = -380668190578456802L;
   private List<IOException> problems;
   
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Mon Dec 15 14:21:32 2008
@@ -46,7 +46,9 @@
   private long pos;
   private long end;
   private LineReader in;
-  int maxLineLength;
+  private int maxLineLength;
+  private LongWritable key = null;
+  private Text value = null;
 
   public void initialize(InputSplit genericSplit,
                          TaskAttemptContext context) throws IOException {
@@ -82,25 +84,21 @@
     this.pos = start;
   }
   
-  public LongWritable nextKey(LongWritable key) throws IOException {
+  public boolean nextKeyValue() throws IOException {
     if (key == null) {
       key = new LongWritable();
     }
     key.set(pos);
-    return key;
-  }
-
-  public Text nextValue(Text value) throws IOException {
     if (value == null) {
       value = new Text();
     }
+    int newSize = 0;
     while (pos < end) {
-      int newSize = in.readLine(value, maxLineLength,
-                                Math.max((int)Math.min(Integer.MAX_VALUE, 
-                                                       end-pos),
-                                         maxLineLength));
+      newSize = in.readLine(value, maxLineLength,
+                            Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
+                                     maxLineLength));
       if (newSize == 0) {
-        return null;
+        break;
       }
       pos += newSize;
       if (newSize < maxLineLength) {
@@ -111,6 +109,22 @@
       LOG.info("Skipped line of size " + newSize + " at pos " + 
                (pos - newSize));
     }
+    if (newSize == 0) {
+      key = null;
+      value = null;
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  public LongWritable getCurrentKey() {
+    return key;
+  }
+
+  @Override
+  public Text getCurrentValue() {
     return value;
   }