You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/12/15 23:21:35 UTC

svn commit: r726850 [1/4] - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/core/org/apache/hadoop/util/ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/h...

Author: omalley
Date: Mon Dec 15 14:21:32 2008
New Revision: 726850

URL: http://svn.apache.org/viewvc?rev=726850&view=rev
Log:
HADOOP-1230. Add new map/reduce API and deprecate the old one. Generally,
the old code should work without problem. The new api is in 
org.apache.hadoop.mapreduce and the old classes in org.apache.hadoop.mapred
are deprecated. Differences in the new API:
  1. All of the methods take Context objects that allow us to add new
     methods without breaking compatability.
  2. Mapper and Reducer now have a "run" method that is called once and
     contains the control loop for the task, which lets applications
     replace it.
  3. Mapper and Reducer by default are Identity Mapper and Reducer.
  4. The FileOutputFormats use part-r-00000 for the output of reduce 0 and
     part-m-00000 for the output of map 0.
  5. The reduce grouping comparator now uses the raw compare instead of 
     object compare.
  6. The number of maps in FileInputFormat is controlled by min and max
     split size rather than min size and the desired number of maps.
(omalley)

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counters.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/WordCount.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/
    hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
    hadoop/core/trunk/src/core/org/apache/hadoop/io/BytesWritable.java
    hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableUtils.java
    hadoop/core/trunk/src/core/org/apache/hadoop/util/GenericOptionsParser.java
    hadoop/core/trunk/src/core/org/apache/hadoop/util/ReflectionUtils.java
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileSplit.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InputSplit.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConfigurable.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobProfile.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapReduceBase.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunnable.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Mapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/OutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Partitioner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RawKeyValueIterator.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reducer.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/RunningJob.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/HashPartitioner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/IdentityMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/IdentityReducer.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InverseMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/LongSumReducer.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NullOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TokenCountMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillCompletedJob.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
    hadoop/core/trunk/src/test/testjar/ClassWordCount.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Dec 15 14:21:32 2008
@@ -87,8 +87,26 @@
     HADOOP-4826. Introduce admin command saveNamespace. (shv)
 
     HADOOP-3063  BloomMapFile - fail-fast version of MapFile for sparsely
-                 populated key space (Andrzej Bialecki via stack)
+    populated key space (Andrzej Bialecki via stack)
 
+    HADOOP-1230. Add new map/reduce API and deprecate the old one. Generally,
+    the old code should work without problem. The new api is in 
+    org.apache.hadoop.mapreduce and the old classes in org.apache.hadoop.mapred
+    are deprecated. Differences in the new API:
+      1. All of the methods take Context objects that allow us to add new
+         methods without breaking compatability.
+      2. Mapper and Reducer now have a "run" method that is called once and
+         contains the control loop for the task, which lets applications
+         replace it.
+      3. Mapper and Reducer by default are Identity Mapper and Reducer.
+      4. The FileOutputFormats use part-r-00000 for the output of reduce 0 and
+         part-m-00000 for the output of map 0.
+      5. The reduce grouping comparator now uses the raw compare instead of 
+         object compare.
+      6. The number of maps in FileInputFormat is controlled by min and max
+         split size rather than min size and the desired number of maps.
+      (omalley)
+    
   IMPROVEMENTS
 
     HADOOP-4749. Added a new counter REDUCE_INPUT_BYTES. (Yongqiang He via 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java Mon Dec 15 14:21:32 2008
@@ -352,6 +352,17 @@
     getProps().setProperty(name, value);
   }
   
+  /**
+   * Sets a property if it is currently unset.
+   * @param name the property name
+   * @param value the new value
+   */
+  public void setIfUnset(String name, String value) {
+    if (get(name) == null) {
+      set(name, value);
+    }
+  }
+  
   private synchronized Properties getOverlay() {
     if (overlay==null){
       overlay=new Properties();
@@ -522,6 +533,15 @@
   }
 
   /**
+   * Set the given property, if it is currently unset.
+   * @param name property name
+   * @param value new value
+   */
+  public void setBooleanIfUnset(String name, boolean value) {
+    setIfUnset(name, Boolean.toString(value));
+  }
+
+  /**
    * A class that represents a set of positive integer ranges. It parses 
    * strings of the form: "2-3,5,7-" where ranges are separated by comma and 
    * the lower/upper bounds are separated by dash. Either the lower or upper 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/BytesWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/BytesWritable.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/BytesWritable.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/BytesWritable.java Mon Dec 15 14:21:32 2008
@@ -22,6 +22,9 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /** 
  * A byte sequence that is usable as a key or value.
  * It is resizable and distinguishes between the size of the seqeunce and
@@ -30,6 +33,7 @@
  */
 public class BytesWritable extends BinaryComparable
     implements WritableComparable<BinaryComparable> {
+  private static final Log LOG = LogFactory.getLog(BytesWritable.class);
   private static final int LENGTH_BYTES = 4;
   private static final byte[] EMPTY_BYTES = {};
 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableUtils.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableUtils.java Mon Dec 15 14:21:32 2008
@@ -197,30 +197,6 @@
   }
 
   /**
-   * A pair of input/output buffers that we use to clone writables.
-   */
-  private static class CopyInCopyOutBuffer {
-    DataOutputBuffer outBuffer = new DataOutputBuffer();
-    DataInputBuffer inBuffer = new DataInputBuffer();
-    /**
-     * Move the data from the output buffer to the input buffer.
-     */
-    void moveData() {
-      inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
-    }
-  }
-  
-  /**
-   * Allocate a buffer for each thread that tries to clone objects.
-   */
-  private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers
-      = new ThreadLocal<CopyInCopyOutBuffer>() {
-      protected synchronized CopyInCopyOutBuffer initialValue() {
-        return new CopyInCopyOutBuffer();
-      }
-    };
-
-  /**
    * Make a copy of a writable object using serialization to a buffer.
    * @param orig The object to copy
    * @return The copied object
@@ -229,7 +205,7 @@
     try {
       @SuppressWarnings("unchecked") // Unchecked cast from Class to Class<T>
       T newInst = ReflectionUtils.newInstance((Class<T>) orig.getClass(), conf);
-      cloneInto(newInst, orig);
+      ReflectionUtils.copy(conf, orig, newInst);
       return newInst;
     } catch (IOException e) {
       throw new RuntimeException("Error writing/reading clone buffer", e);
@@ -241,14 +217,11 @@
    * @param dst the object to copy from
    * @param src the object to copy into, which is destroyed
    * @throws IOException
+   * @deprecated use ReflectionUtils.cloneInto instead.
    */
+  @Deprecated
   public static void cloneInto(Writable dst, Writable src) throws IOException {
-    CopyInCopyOutBuffer buffer = cloneBuffers.get();
-    buffer.outBuffer.reset();
-    src.write(buffer.outBuffer);
-    buffer.moveData();
-    dst.readFields(buffer.inBuffer);
-    return;
+    ReflectionUtils.cloneWritableInto(dst, src);
   }
 
   /**

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/GenericOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/GenericOptionsParser.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/GenericOptionsParser.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/GenericOptionsParser.java Mon Dec 15 14:21:32 2008
@@ -104,9 +104,26 @@
 public class GenericOptionsParser {
 
   private static final Log LOG = LogFactory.getLog(GenericOptionsParser.class);
-
+  private Configuration conf;
   private CommandLine commandLine;
 
+  /**
+   * Create an options parser with the given options to parse the args.
+   * @param opts the options
+   * @param args the command line arguments
+   */
+  public GenericOptionsParser(Options opts, String[] args) {
+    this(new Configuration(), new Options(), args);
+  }
+
+  /**
+   * Create an options parser to parse the args.
+   * @param args the command line arguments
+   */
+  public GenericOptionsParser(String[] args) {
+    this(new Configuration(), new Options(), args);
+  }
+  
   /** 
    * Create a <code>GenericOptionsParser<code> to parse only the generic Hadoop  
    * arguments. 
@@ -134,6 +151,7 @@
    */
   public GenericOptionsParser(Configuration conf, Options options, String[] args) {
     parseGeneralOptions(options, conf, args);
+    this.conf = conf;
   }
 
   /**
@@ -147,6 +165,14 @@
   }
 
   /**
+   * Get the modified configuration
+   * @return the configuration that has the modified parameters.
+   */
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
    * Returns the commons-cli <code>CommandLine</code> object 
    * to process the parsed arguments. 
    * 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/ReflectionUtils.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/ReflectionUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/ReflectionUtils.java Mon Dec 15 14:21:32 2008
@@ -27,6 +27,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
 
 /**
  * General reflection utils
@@ -34,7 +40,9 @@
 
 public class ReflectionUtils {
     
-  private static final Class[] emptyArray = new Class[]{};
+  private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
+  private static SerializationFactory serialFactory = null;
+
   /** 
    * Cache of constructors for each class. Pins the classes so they
    * can't be garbage collected until ReflectionUtils can be collected.
@@ -98,7 +106,7 @@
     try {
       Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
       if (meth == null) {
-        meth = theClass.getDeclaredConstructor(emptyArray);
+        meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
         meth.setAccessible(true);
         CONSTRUCTOR_CACHE.put(theClass, meth);
       }
@@ -217,5 +225,67 @@
   static int getCacheSize() {
     return CONSTRUCTOR_CACHE.size();
   }
+  /**
+   * A pair of input/output buffers that we use to clone writables.
+   */
+  private static class CopyInCopyOutBuffer {
+    DataOutputBuffer outBuffer = new DataOutputBuffer();
+    DataInputBuffer inBuffer = new DataInputBuffer();
+    /**
+     * Move the data from the output buffer to the input buffer.
+     */
+    void moveData() {
+      inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
+    }
+  }
+  
+  /**
+   * Allocate a buffer for each thread that tries to clone objects.
+   */
+  private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers
+      = new ThreadLocal<CopyInCopyOutBuffer>() {
+      protected synchronized CopyInCopyOutBuffer initialValue() {
+        return new CopyInCopyOutBuffer();
+      }
+    };
+
+  private static SerializationFactory getFactory(Configuration conf) {
+    if (serialFactory == null) {
+      serialFactory = new SerializationFactory(conf);
+    }
+    return serialFactory;
+  }
+  
+  /**
+   * Make a copy of the writable object using serialization to a buffer
+   * @param dst the object to copy from
+   * @param src the object to copy into, which is destroyed
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T copy(Configuration conf, 
+                                T src, T dst) throws IOException {
+    CopyInCopyOutBuffer buffer = cloneBuffers.get();
+    buffer.outBuffer.reset();
+    SerializationFactory factory = getFactory(conf);
+    Class<T> cls = (Class<T>) src.getClass();
+    Serializer<T> serializer = factory.getSerializer(cls);
+    serializer.open(buffer.outBuffer);
+    serializer.serialize(src);
+    buffer.moveData();
+    Deserializer<T> deserializer = factory.getDeserializer(cls);
+    deserializer.open(buffer.inBuffer);
+    dst = deserializer.deserialize(dst);
+    return dst;
+  }
 
+  @Deprecated
+  public static void cloneWritableInto(Writable dst, 
+                                       Writable src) throws IOException {
+    CopyInCopyOutBuffer buffer = cloneBuffers.get();
+    buffer.outBuffer.reset();
+    src.write(buffer.outBuffer);
+    buffer.moveData();
+    dst.readFields(buffer.inBuffer);
+  }
 }

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Mon Dec 15 14:21:32 2008
@@ -17,6 +17,7 @@
  */
 
 package org.apache.hadoop.examples;
+
 import org.apache.hadoop.examples.dancing.DistributedPentomino;
 import org.apache.hadoop.examples.dancing.Sudoku;
 import org.apache.hadoop.examples.terasort.TeraGen;

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java Mon Dec 15 14:21:32 2008
@@ -45,6 +45,7 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.LongSumReducer;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -234,7 +235,7 @@
       return 1;
     }
 
-    JobConf job = new JobConf(getConf(), WordCount.class);
+    JobConf job = new JobConf(getConf(), MultiFileWordCount.class);
     job.setJobName("MultiFileWordCount");
 
     //set the InputFormat of the job to our InputFormat
@@ -248,8 +249,8 @@
     //use the defined mapper
     job.setMapperClass(MapClass.class);
     //use the WordCount Reducer
-    job.setCombinerClass(WordCount.Reduce.class);
-    job.setReducerClass(WordCount.Reduce.class);
+    job.setCombinerClass(LongSumReducer.class);
+    job.setReducerClass(LongSumReducer.class);
 
     FileInputFormat.addInputPaths(job, args[0]);
     FileOutputFormat.setOutputPath(job, new Path(args[1]));

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/WordCount.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/WordCount.java Mon Dec 15 14:21:32 2008
@@ -1,159 +1,69 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.hadoop.examples;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * This is an example Hadoop Map/Reduce application.
- * It reads the text input files, breaks each line into words
- * and counts them. The output is a locally sorted list of words and the 
- * count of how often they occurred.
- *
- * To run: bin/hadoop jar build/hadoop-examples.jar wordcount
- *            [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i> 
- */
-public class WordCount extends Configured implements Tool {
-  
-  /**
-   * Counts the words in each line.
-   * For each line of input, break the line into words and emit them as
-   * (<b>word</b>, <b>1</b>).
-   */
-  public static class MapClass extends MapReduceBase
-    implements Mapper<LongWritable, Text, Text, IntWritable> {
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class WordCount {
+
+  public static class TokenizerMapper 
+       extends Mapper<Object, Text, Text, IntWritable>{
     
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();
-    
-    public void map(LongWritable key, Text value, 
-                    OutputCollector<Text, IntWritable> output, 
-                    Reporter reporter) throws IOException {
-      String line = value.toString();
-      StringTokenizer itr = new StringTokenizer(line);
+      
+    public void map(Object key, Text value, Context context
+                    ) throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
       while (itr.hasMoreTokens()) {
         word.set(itr.nextToken());
-        output.collect(word, one);
+        context.write(word, one);
       }
     }
   }
   
-  /**
-   * A reducer class that just emits the sum of the input values.
-   */
-  public static class Reduce extends MapReduceBase
-    implements Reducer<Text, IntWritable, Text, IntWritable> {
-    
-    public void reduce(Text key, Iterator<IntWritable> values,
-                       OutputCollector<Text, IntWritable> output, 
-                       Reporter reporter) throws IOException {
+  public static class IntSumReducer 
+       extends Reducer<Text,IntWritable,Text,IntWritable> {
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values, 
+                       Context context
+                       ) throws IOException, InterruptedException {
       int sum = 0;
-      while (values.hasNext()) {
-        sum += values.next().get();
+      for (IntWritable val : values) {
+        sum += val.get();
       }
-      output.collect(key, new IntWritable(sum));
+      result.set(sum);
+      context.write(key, result);
     }
   }
-  
-  static int printUsage() {
-    System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
-    ToolRunner.printGenericCommandUsage(System.out);
-    return -1;
-  }
-  
-  /**
-   * The main driver for word count map/reduce program.
-   * Invoke this method to submit the map/reduce job.
-   * @throws IOException When there is communication problems with the 
-   *                     job tracker.
-   */
-  public int run(String[] args) throws Exception {
-    JobConf conf = new JobConf(getConf(), WordCount.class);
-    conf.setJobName("wordcount");
- 
-    // the keys are words (strings)
-    conf.setOutputKeyClass(Text.class);
-    // the values are counts (ints)
-    conf.setOutputValueClass(IntWritable.class);
-    
-    conf.setMapperClass(MapClass.class);        
-    conf.setCombinerClass(Reduce.class);
-    conf.setReducerClass(Reduce.class);
-    
-    List<String> other_args = new ArrayList<String>();
-    for(int i=0; i < args.length; ++i) {
-      try {
-        if ("-m".equals(args[i])) {
-          conf.setNumMapTasks(Integer.parseInt(args[++i]));
-        } else if ("-r".equals(args[i])) {
-          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
-        } else {
-          other_args.add(args[i]);
-        }
-      } catch (NumberFormatException except) {
-        System.out.println("ERROR: Integer expected instead of " + args[i]);
-        return printUsage();
-      } catch (ArrayIndexOutOfBoundsException except) {
-        System.out.println("ERROR: Required parameter missing from " +
-                           args[i-1]);
-        return printUsage();
-      }
-    }
-    // Make sure there are exactly 2 parameters left.
-    if (other_args.size() != 2) {
-      System.out.println("ERROR: Wrong number of parameters: " +
-                         other_args.size() + " instead of 2.");
-      return printUsage();
-    }
-    FileInputFormat.setInputPaths(conf, other_args.get(0));
-    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
-        
-    JobClient.runJob(conf);
-    return 0;
-  }
-  
-  
+
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new WordCount(), args);
-    System.exit(res);
+    Configuration conf = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    if (otherArgs.length != 2) {
+      System.err.println("Usage: wordcount <in> <out>");
+      System.exit(2);
+    }
+    Job job = new Job(conf, "word count");
+    job.setJarByClass(WordCount.class);
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
+    System.exit(job.waitForCompletion() ? 0 : 1);
   }
-
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java Mon Dec 15 14:21:32 2008
@@ -47,7 +47,9 @@
  * 
  * <p><code>Counters</code> are bunched into {@link Group}s, each comprising of
  * counters from a particular <code>Enum</code> class. 
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.Counters} instead.
  */
+@Deprecated
 public class Counters implements Writable, Iterable<Counters.Group> {
   private static final Log LOG = LogFactory.getLog(Counters.class);
   private static final char GROUP_OPEN = '{';
@@ -65,69 +67,18 @@
   /**
    * A counter record, comprising its name and value. 
    */
-  public static class Counter implements Writable {
-
-    private String name;
-    private String displayName;
-    private long value;
+  public static class Counter extends org.apache.hadoop.mapreduce.Counter {
     
     Counter() { 
-      value = 0L;
     }
 
     Counter(String name, String displayName, long value) {
-      this.name = name;
-      this.displayName = displayName;
-      this.value = value;
-    }
-    
-    /**
-     * Read the binary representation of the counter
-     */
-    public synchronized void readFields(DataInput in) throws IOException {
-      name = Text.readString(in);
-      if (in.readBoolean()) {
-        displayName = Text.readString(in);
-      } else {
-        displayName = name;
-      }
-      value = WritableUtils.readVLong(in);
-    }
-    
-    /**
-     * Write the binary representation of the counter
-     */
-    public synchronized void write(DataOutput out) throws IOException {
-      Text.writeString(out, name);
-      boolean distinctDisplayName = (! name.equals(displayName));
-      out.writeBoolean(distinctDisplayName);
-      if (distinctDisplayName) {
-        Text.writeString(out, displayName);
-      }
-      WritableUtils.writeVLong(out, value);
+      super(name, displayName);
+      increment(value);
     }
     
-    /**
-     * Get the internal name of the counter.
-     * @return the internal name of the counter
-     */
-    public synchronized String getName() {
-      return name;
-    }
-    
-    /**
-     * Get the name of the counter.
-     * @return the user facing name of the counter
-     */
-    public synchronized String getDisplayName() {
-      return displayName;
-    }
-    
-    /**
-     * Set the display name of the counter.
-     */
-    public synchronized void setDisplayName(String displayName) {
-      this.displayName = displayName;
+    public void setDisplayName(String newName) {
+      super.setDisplayName(newName);
     }
     
     /**
@@ -150,7 +101,7 @@
       
       // Add the value
       buf.append(UNIT_OPEN);
-      buf.append(this.value);
+      buf.append(this.getValue());
       buf.append(UNIT_CLOSE);
       
       buf.append(COUNTER_CLOSE);
@@ -159,10 +110,9 @@
     }
     
     // Checks for (content) equality of two (basic) counters
+    @Deprecated
     synchronized boolean contentEquals(Counter c) {
-      return name.equals(c.getName())
-             && displayName.equals(c.getDisplayName())
-             && value == c.getCounter();
+      return this.equals(c);
     }
     
     /**
@@ -170,16 +120,9 @@
      * @return the current value
      */
     public synchronized long getCounter() {
-      return value;
+      return getValue();
     }
     
-    /**
-     * Increment this counter by the given value
-     * @param incr the value to increase this counter by
-     */
-    public synchronized void increment(long incr) {
-      value += incr;
-    }
   }
   
   /**
@@ -297,8 +240,8 @@
      */
     public synchronized long getCounter(String counterName) {
       for(Counter counter: subcounters.values()) {
-        if (counter != null && counter.displayName.equals(counterName)) {
-          return counter.value;
+        if (counter != null && counter.getDisplayName().equals(counterName)) {
+          return counter.getValue();
         }
       }
       return 0L;
@@ -459,7 +402,7 @@
    * @param amount amount by which counter is to be incremented
    */
   public synchronized void incrCounter(Enum key, long amount) {
-    findCounter(key).value += amount;
+    findCounter(key).increment(amount);
   }
   
   /**
@@ -470,7 +413,7 @@
    * @param amount amount by which counter is to be incremented
    */
   public synchronized void incrCounter(String group, String counter, long amount) {
-    getGroup(group).getCounterForName(counter).value += amount;
+    getGroup(group).getCounterForName(counter).increment(amount);
   }
   
   /**
@@ -478,7 +421,7 @@
    * does not exist.
    */
   public synchronized long getCounter(Enum key) {
-    return findCounter(key).value;
+    return findCounter(key).getValue();
   }
   
   /**
@@ -492,8 +435,8 @@
       group.displayName = otherGroup.displayName;
       for (Counter otherCounter : otherGroup) {
         Counter counter = group.getCounterForName(otherCounter.getName());
-        counter.displayName = otherCounter.displayName;
-        counter.value += otherCounter.value;
+        counter.setDisplayName(otherCounter.getDisplayName());
+        counter.increment(otherCounter.getValue());
       }
     }
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java Mon Dec 15 14:21:32 2008
@@ -22,7 +22,6 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.LinkedList;
@@ -35,7 +34,6 @@
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.net.NetworkTopology;
@@ -53,7 +51,10 @@
  * Subclasses of <code>FileInputFormat</code> can also override the 
  * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
  * not split-up and are processed as a whole by {@link Mapper}s.
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
+ *  instead.
  */
+@Deprecated
 public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
 
   public static final Log LOG =

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileSplit.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileSplit.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileSplit.java Mon Dec 15 14:21:32 2008
@@ -21,16 +21,19 @@
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.File;                              // deprecated
 
 import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /** A section of an input file.  Returned by {@link
  * InputFormat#getSplits(JobConf, int)} and passed to
- * {@link InputFormat#getRecordReader(InputSplit,JobConf,Reporter)}. */
-public class FileSplit implements InputSplit {
+ * {@link InputFormat#getRecordReader(InputSplit,JobConf,Reporter)}. 
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.input.FileSplit}
+ *  instead.
+ */
+@Deprecated
+public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit 
+                       implements InputSplit {
   private Path file;
   private long start;
   private long length;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ID.java Mon Dec 15 14:21:32 2008
@@ -18,12 +18,6 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
 /**
  * A general identifier, which internally stores the id
  * as an integer. This is the super class of {@link JobID}, 
@@ -33,57 +27,15 @@
  * @see TaskID
  * @see TaskAttemptID
  */
-public abstract class ID implements WritableComparable<ID> {
-  protected static final char SEPARATOR = '_';
-  protected int id;
+@Deprecated
+public abstract class ID extends org.apache.hadoop.mapreduce.ID {
 
   /** constructs an ID object from the given int */
   public ID(int id) {
-    this.id = id;
+    super(id);
   }
 
   protected ID() {
   }
 
-  /** returns the int which represents the identifier */
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public String toString() {
-    return String.valueOf(id);
-  }
-
-  @Override
-  public int hashCode() {
-    return Integer.valueOf(id).hashCode();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o)
-      return true;
-    if(o == null)
-      return false;
-    if (o.getClass() == this.getClass()) {
-      ID that = (ID) o;
-      return this.id == that.id;
-    }
-    else
-      return false;
-  }
-
-  /** Compare IDs by associated numbers */
-  public int compareTo(ID that) {
-    return this.id - that.id;
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    this.id = in.readInt();
-  }
-
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(id);
-  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InputFormat.java Mon Dec 15 14:21:32 2008
@@ -61,7 +61,9 @@
  * @see RecordReader
  * @see JobClient
  * @see FileInputFormat
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.InputFormat} instead.
  */
+@Deprecated
 public interface InputFormat<K, V> {
 
   /** 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InputSplit.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InputSplit.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InputSplit.java Mon Dec 15 14:21:32 2008
@@ -31,7 +31,9 @@
  * 
  * @see InputFormat
  * @see RecordReader
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.InputSplit} instead.
  */
+@Deprecated
 public interface InputSplit extends Writable {
 
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Mon Dec 15 14:21:32 2008
@@ -159,7 +159,9 @@
    * Run a single task
    * @param args the first argument is the task directory
    */
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args
+                          ) throws ClassNotFoundException, IOException, 
+                                   InterruptedException {
     if (args.length != 1) {
       System.out.println("Usage: IsolationRunner <path>/job.xml");
       System.exit(1);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java Mon Dec 15 14:21:32 2008
@@ -68,7 +68,7 @@
   /**Compare TaskInProgressIds by first jobIds, then by tip numbers. Reduces are 
    * defined as greater then maps.*/
   @Override
-  public int compareTo(ID o) {
+  public int compareTo(org.apache.hadoop.mapreduce.ID o) {
     JVMId that = (JVMId)o;
     int jobComp = this.jobId.compareTo(that.jobId);
     if(jobComp == 0) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Mon Dec 15 14:21:32 2008
@@ -21,6 +21,7 @@
 import java.io.BufferedWriter;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -38,7 +39,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.Random;
+import java.util.List;
 
 import javax.security.auth.login.LoginException;
 
@@ -59,11 +60,14 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -155,7 +159,7 @@
   private static final Log LOG = LogFactory.getLog(JobClient.class);
   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
   private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
-  static long MAX_JOBPROFILE_AGE = 1000 * 2;
+  private static final long MAX_JOBPROFILE_AGE = 1000 * 2;
 
   /**
    * A NetworkedJob is an implementation of RunningJob.  It holds
@@ -371,14 +375,17 @@
     public Counters getCounters() throws IOException {
       return jobSubmitClient.getJobCounters(getID());
     }
+    
+    @Override
+    public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
+      return jobSubmitClient.getTaskDiagnostics(id);
+    }
   }
 
-  JobSubmissionProtocol jobSubmitClient;
-  Path sysDir = null;
+  private JobSubmissionProtocol jobSubmitClient;
+  private Path sysDir = null;
   
-  FileSystem fs = null;
-
-  static Random r = new Random();
+  private FileSystem fs = null;
 
   /**
    * Create a job client.
@@ -710,11 +717,34 @@
    * @return a handle to the {@link RunningJob} which can be used to track the
    *         running-job.
    * @throws FileNotFoundException
-   * @throws InvalidJobConfException
    * @throws IOException
    */
-  public RunningJob submitJob(JobConf job) throws FileNotFoundException, 
-                                  InvalidJobConfException, IOException {
+  public RunningJob submitJob(JobConf job) throws FileNotFoundException,
+                                                  IOException {
+    try {
+      return submitJobInternal(job);
+    } catch (InterruptedException ie) {
+      throw new IOException("interrupted", ie);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("class not found", cnfe);
+    }
+  }
+
+  /**
+   * Internal method for submitting jobs to the system.
+   * @param job the configuration to submit
+   * @return a proxy object for the running job
+   * @throws FileNotFoundException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public 
+  RunningJob submitJobInternal(JobConf job
+                               ) throws FileNotFoundException, 
+                                        ClassNotFoundException,
+                                        InterruptedException,
+                                        IOException {
     /*
      * configure the command line options correctly on the submitting dfs
      */
@@ -725,12 +755,53 @@
     Path submitSplitFile = new Path(submitJobDir, "job.split");
     configureCommandLineOptions(job, submitJobDir, submitJarFile);
     Path submitJobFile = new Path(submitJobDir, "job.xml");
+    int reduces = job.getNumReduceTasks();
+    JobContext context = new JobContext(job, jobId);
     
     // Check the output specification
-    job.getOutputFormat().checkOutputSpecs(fs, job);
+    if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
+      org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
+        ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
+      output.checkOutputSpecs(context);
+    } else {
+      job.getOutputFormat().checkOutputSpecs(fs, job);
+    }
 
     // Create the splits for the job
     LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
+    int maps;
+    if (job.getUseNewMapper()) {
+      maps = writeNewSplits(context, submitSplitFile);
+    } else {
+      maps = writeOldSplits(job, submitSplitFile);
+    }
+    job.set("mapred.job.split.file", submitSplitFile.toString());
+    job.setNumMapTasks(maps);
+        
+    // Write job file to JobTracker's fs        
+    FSDataOutputStream out = 
+      FileSystem.create(fs, submitJobFile,
+                        new FsPermission(JOB_FILE_PERMISSION));
+
+    try {
+      job.writeXml(out);
+    } finally {
+      out.close();
+    }
+
+    //
+    // Now, actually submit the job (using the submit name)
+    //
+    JobStatus status = jobSubmitClient.submitJob(jobId);
+    if (status != null) {
+      return new NetworkedJob(status);
+    } else {
+      throw new IOException("Could not launch job");
+    }
+  }
+
+  private int writeOldSplits(JobConf job, 
+                             Path submitSplitFile) throws IOException {
     InputSplit[] splits = 
       job.getInputFormat().getSplits(job, job.getNumMapTasks());
     // sort the splits into order based on size, so that the biggest
@@ -753,36 +824,91 @@
         }
       }
     });
-    // write the splits to a file for the job tracker
-    FSDataOutputStream out = FileSystem.create(fs,
-        submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));
+    DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile, splits.length);
+    
     try {
-      writeSplitsFile(splits, out);
+      DataOutputBuffer buffer = new DataOutputBuffer();
+      RawSplit rawSplit = new RawSplit();
+      for(InputSplit split: splits) {
+        rawSplit.setClassName(split.getClass().getName());
+        buffer.reset();
+        split.write(buffer);
+        rawSplit.setDataLength(split.getLength());
+        rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+        rawSplit.setLocations(split.getLocations());
+        rawSplit.write(out);
+      }
     } finally {
       out.close();
     }
-    job.set("mapred.job.split.file", submitSplitFile.toString());
-    job.setNumMapTasks(splits.length);
-        
-    // Write job file to JobTracker's fs        
-    out = FileSystem.create(fs, submitJobFile,
-        new FsPermission(JOB_FILE_PERMISSION));
+    return splits.length;
+  }
+
+  private static class NewSplitComparator 
+    implements Comparator<org.apache.hadoop.mapreduce.InputSplit>{
+
+    @Override
+    public int compare(org.apache.hadoop.mapreduce.InputSplit o1,
+                       org.apache.hadoop.mapreduce.InputSplit o2) {
+      try {
+        long len1 = o1.getLength();
+        long len2 = o2.getLength();
+        if (len1 < len2) {
+          return 1;
+        } else if (len1 == len2) {
+          return 0;
+        } else {
+          return -1;
+        }
+      } catch (IOException ie) {
+        throw new RuntimeException("exception in compare", ie);
+      } catch (InterruptedException ie) {
+        throw new RuntimeException("exception in compare", ie);        
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T extends org.apache.hadoop.mapreduce.InputSplit> 
+  int writeNewSplits(JobContext job, Path submitSplitFile
+                     ) throws IOException, InterruptedException, 
+                              ClassNotFoundException {
+    JobConf conf = job.getJobConf();
+    org.apache.hadoop.mapreduce.InputFormat<?,?> input =
+      ReflectionUtils.newInstance(job.getInputFormatClass(), job.getJobConf());
+    
+    List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(job);
+    T[] array = (T[])
+      splits.toArray(new org.apache.hadoop.mapreduce.InputSplit[splits.size()]);
 
+    // sort the splits into order based on size, so that the biggest
+    // go first
+    Arrays.sort(array, new NewSplitComparator());
+    DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile, 
+                                                 array.length);
     try {
-      job.writeXml(out);
+      if (array.length != 0) {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        RawSplit rawSplit = new RawSplit();
+        SerializationFactory factory = new SerializationFactory(conf);
+        Serializer<T> serializer = 
+          factory.getSerializer((Class<T>) array[0].getClass());
+        serializer.open(buffer);
+        for(T split: array) {
+          rawSplit.setClassName(split.getClass().getName());
+          buffer.reset();
+          serializer.serialize(split);
+          rawSplit.setDataLength(split.getLength());
+          rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+          rawSplit.setLocations(split.getLocations());
+          rawSplit.write(out);
+        }
+        serializer.close();
+      }
     } finally {
       out.close();
     }
-
-    //
-    // Now, actually submit the job (using the submit name)
-    //
-    JobStatus status = jobSubmitClient.submitJob(jobId);
-    if (status != null) {
-      return new NetworkedJob(status);
-    } else {
-      throw new IOException("Could not launch job");
-    }
+    return array.length;
   }
 
   /** 
@@ -878,7 +1004,21 @@
     
   private static final int CURRENT_SPLIT_FILE_VERSION = 0;
   private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-    
+
+  private DataOutputStream writeSplitsFileHeader(Configuration conf,
+                                                 Path filename,
+                                                 int length
+                                                 ) throws IOException {
+    // write the splits to a file for the job tracker
+    FileSystem fs = filename.getFileSystem(conf);
+    FSDataOutputStream out = 
+      FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
+    out.write(SPLIT_FILE_HEADER);
+    WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
+    WritableUtils.writeVInt(out, length);
+    return out;
+  }
+
   /** Create the list of input splits and write them out in a file for
    *the JobTracker. The format is:
    * <format version>
@@ -888,21 +1028,8 @@
    * @param splits the input splits to write out
    * @param out the stream to write to
    */
-  private void writeSplitsFile(InputSplit[] splits, FSDataOutputStream out) throws IOException {
-    out.write(SPLIT_FILE_HEADER);
-    WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
-    WritableUtils.writeVInt(out, splits.length);
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    RawSplit rawSplit = new RawSplit();
-    for(InputSplit split: splits) {
-      rawSplit.setClassName(split.getClass().getName());
-      buffer.reset();
-      split.write(buffer);
-      rawSplit.setDataLength(split.getLength());
-      rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-      rawSplit.setLocations(split.getLocations());
-      rawSplit.write(out);
-    }
+  private void writeOldSplitsFile(InputSplit[] splits, 
+                                  FSDataOutputStream out) throws IOException {
   }
 
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Mon Dec 15 14:21:32 2008
@@ -99,7 +99,9 @@
  * @see ClusterStatus
  * @see Tool
  * @see DistributedCache
+ * @deprecated Use {@link Configuration} instead
  */
+@Deprecated
 public class JobConf extends Configuration {
   
   private static final Log LOG = LogFactory.getLog(JobConf.class);
@@ -770,6 +772,40 @@
   }
 
   /**
+   * Should the framework use the new context-object code for running
+   * the mapper?
+   * @return true, if the new api should be used
+   */
+  public boolean getUseNewMapper() {
+    return getBoolean("mapred.mapper.new-api", false);
+  }
+  /**
+   * Set whether the framework should use the new api for the mapper.
+   * This is the default for jobs submitted with the new Job api.
+   * @param flag true, if the new api should be used
+   */
+  public void setUseNewMapper(boolean flag) {
+    setBoolean("mapred.mapper.new-api", flag);
+  }
+
+  /**
+   * Should the framework use the new context-object code for running
+   * the reducer?
+   * @return true, if the new api should be used
+   */
+  public boolean getUseNewReducer() {
+    return getBoolean("mapred.reducer.new-api", false);
+  }
+  /**
+   * Set whether the framework should use the new api for the reducer. 
+   * This is the default for jobs submitted with the new Job api.
+   * @param flag true, if the new api should be used
+   */
+  public void setUseNewReducer(boolean flag) {
+    setBoolean("mapred.reducer.new-api", flag);
+  }
+
+  /**
    * Get the value class for job outputs.
    * 
    * @return the value class for job outputs.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConfigurable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConfigurable.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConfigurable.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConfigurable.java Mon Dec 15 14:21:32 2008
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 /** That what may be configured. */
+@Deprecated
 public interface JobConfigurable {
   /** Initializes a new instance from a {@link JobConf}.
    *

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobContext.java Mon Dec 15 14:21:32 2008
@@ -19,18 +19,23 @@
 
 import org.apache.hadoop.util.Progressable;
 
-public class JobContext {
-
-  JobConf job;
+/**
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.JobContext} instead.
+ */
+@Deprecated
+public class JobContext extends org.apache.hadoop.mapreduce.JobContext {
+  private JobConf job;
   private Progressable progress;
 
-  JobContext(JobConf conf, Progressable progress) {
-    job = conf;
+  JobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId, 
+             Progressable progress) {
+    super(conf, jobId);
+    this.job = conf;
     this.progress = progress;
   }
 
-  JobContext(JobConf conf) {
-    this(conf, Reporter.NULL);
+  JobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId) {
+    this(conf, jobId, Reporter.NULL);
   }
   
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobID.java Mon Dec 15 14:21:32 2008
@@ -19,11 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.io.Text;
 
 /**
  * JobID represents the immutable and unique identifier for 
@@ -41,91 +37,33 @@
  * 
  * @see TaskID
  * @see TaskAttemptID
- * @see JobTracker#getNewJobId()
- * @see JobTracker#getStartTime()
  */
-public class JobID extends ID {
-  protected static final String JOB = "job";
-  private Text jtIdentifier = new Text();
-  
-  private static NumberFormat idFormat = NumberFormat.getInstance();
-  static {
-    idFormat.setGroupingUsed(false);
-    idFormat.setMinimumIntegerDigits(4);
-  }
-  
+@Deprecated
+public class JobID extends org.apache.hadoop.mapreduce.JobID {
   /**
    * Constructs a JobID object 
    * @param jtIdentifier jobTracker identifier
    * @param id job number
    */
   public JobID(String jtIdentifier, int id) {
-    super(id);
-    this.jtIdentifier.set(jtIdentifier);
+    super(jtIdentifier, id);
   }
   
   public JobID() { }
-  
-  public String getJtIdentifier() {
-    return jtIdentifier.toString();
-  }
-  
-  @Override
-  public boolean equals(Object o) {
-    if (!super.equals(o))
-      return false;
-
-    JobID that = (JobID)o;
-    return this.jtIdentifier.equals(that.jtIdentifier);
-  }
-  
-  /**Compare JobIds by first jtIdentifiers, then by job numbers*/
-  @Override
-  public int compareTo(ID o) {
-    JobID that = (JobID)o;
-    int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
-    if(jtComp == 0) {
-      return this.id - that.id;
-    }
-    else return jtComp;
-  }
-  
-  @Override
-  public String toString() {
-    return appendTo(new StringBuilder(JOB)).toString();
-  }
 
   /**
-   * Add the stuff after the "job" prefix to the given builder. This is useful,
-   * because the sub-ids use this substring at the start of their string.
-   * @param builder the builder to append to
-   * @return the builder that was passed in
+   * Downgrade a new JobID to an old one
+   * @param old a new or old JobID
+   * @return either old or a new JobID build to match old
    */
-  protected StringBuilder appendTo(StringBuilder builder) {
-    builder.append(SEPARATOR);
-    builder.append(jtIdentifier);
-    builder.append(SEPARATOR);
-    builder.append(idFormat.format(id));
-    return builder;
-  }
-
-  @Override
-  public int hashCode() {
-    return jtIdentifier.hashCode() + id;
-  }
-  
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    jtIdentifier.readFields(in);
+  public static JobID downgrade(org.apache.hadoop.mapreduce.JobID old) {
+    if (old instanceof JobID) {
+      return (JobID) old;
+    } else {
+      return new JobID(old.getJtIdentifier(), old.getId());
+    }
   }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    jtIdentifier.write(out);
-  }
-  
   @Deprecated
   public static JobID read(DataInput in) throws IOException {
     JobID jobId = new JobID();
@@ -138,19 +76,7 @@
    * @throws IllegalArgumentException if the given string is malformed
    */
   public static JobID forName(String str) throws IllegalArgumentException {
-    if(str == null)
-      return null;
-    try {
-      String[] parts = str.split(Character.toString(SEPARATOR));
-      if(parts.length == 3) {
-        if(parts[0].equals(JOB)) {
-          return new JobID(parts[1], Integer.parseInt(parts[2]));
-        }
-      }
-    }catch (Exception ex) {//fall below
-    }
-    throw new IllegalArgumentException("JobId string : " + str 
-        + " is not properly formed");
+    return (JobID) org.apache.hadoop.mapreduce.JobID.forName(str);
   }
   
   /** 
@@ -187,5 +113,5 @@
       .append(jobId != null ? idFormat.format(jobId) : "[0-9]*");
     return builder;
   }
-  
+
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobProfile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobProfile.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobProfile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobProfile.java Mon Dec 15 14:21:32 2008
@@ -66,7 +66,8 @@
    * @param url link to the web-ui for details of the job.
    * @param name user-specified job name.
    */
-  public JobProfile(String user, JobID jobid, String jobFile, String url,
+  public JobProfile(String user, org.apache.hadoop.mapreduce.JobID jobid, 
+                    String jobFile, String url,
                     String name) {
     this(user, jobid, jobFile, url, name, JobConf.DEFAULT_QUEUE_NAME);
   }
@@ -82,10 +83,11 @@
    * @param name user-specified job name.
    * @param queueName name of the queue to which the job is submitted
    */
-  public JobProfile(String user, JobID jobid, String jobFile, String url,
-                      String name, String queueName) {
+  public JobProfile(String user, org.apache.hadoop.mapreduce.JobID jobid, 
+                    String jobFile, String url,
+                    String name, String queueName) {
     this.user = user;
-    this.jobid = jobid;
+    this.jobid = JobID.downgrade(jobid);
     this.jobFile = jobFile;
     this.url = url;
     this.name = name;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java Mon Dec 15 14:21:32 2008
@@ -48,7 +48,7 @@
   public static final int PREP = 4;
   public static final int KILLED = 5;
 
-  private final JobID jobid;
+  private JobID jobid;
   private float mapProgress;
   private float reduceProgress;
   private float cleanupProgress;
@@ -62,7 +62,6 @@
   /**
    */
   public JobStatus() {
-    jobid = new JobID();
   }
 
   /**
@@ -288,7 +287,7 @@
   }
 
   public synchronized void readFields(DataInput in) throws IOException {
-    jobid.readFields(in);
+    this.jobid = JobID.read(in);
     this.setupProgress = in.readFloat();
     this.mapProgress = in.readFloat();
     this.reduceProgress = in.readFloat();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java Mon Dec 15 14:21:32 2008
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -35,7 +34,10 @@
 
 /**
  * Treats keys as offset in file and value as line. 
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} instead.
  */
+@Deprecated
 public class LineRecordReader implements RecordReader<LongWritable, Text> {
   private static final Log LOG
     = LogFactory.getLog(LineRecordReader.class.getName());

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Mon Dec 15 14:21:32 2008
@@ -101,13 +101,13 @@
     
     @Override
     public void run() {
-      JobContext jContext = new JobContext(conf);
+      JobID jobId = profile.getJobID();
+      JobContext jContext = new JobContext(conf, jobId);
       OutputCommitter outputCommitter = job.getOutputCommitter();
       try {
         // split input into minimum number of splits
         InputSplit[] splits;
         splits = job.getInputFormat().getSplits(job, 1);
-        JobID jobId = profile.getJobID();
         
         int numReduceTasks = job.getNumReduceTasks();
         if (numReduceTasks > 1 || numReduceTasks < 0) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapReduceBase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapReduceBase.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapReduceBase.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapReduceBase.java Mon Dec 15 14:21:32 2008
@@ -29,6 +29,7 @@
  * <p>Provides default no-op implementations for a few methods, most non-trivial
  * applications need to override some of them.</p>
  */
+@Deprecated
 public class MapReduceBase implements Closeable, JobConfigurable {
 
   /** Default implementation that does nothing. */

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunnable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunnable.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunnable.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunnable.java Mon Dec 15 14:21:32 2008
@@ -27,7 +27,9 @@
  * control on map processing e.g. multi-threaded, asynchronous mappers etc.</p>
  * 
  * @see Mapper
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.Mapper} instead.
  */
+@Deprecated
 public interface MapRunnable<K1, V1, K2, V2>
     extends JobConfigurable {